Skip to content

Commit

Permalink
Make flow version available in runtime (#14897)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Aug 12, 2024
1 parent 9d39d68 commit aad6693
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
20 changes: 19 additions & 1 deletion src/prefect/runtime/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `scheduled_start_time`: the flow run's expected scheduled start time; defaults to now if not present
- `name`: the name of the flow run
- `flow_name`: the name of the flow
- `flow_version`: the version of the flow
- `parameters`: the parameters that were passed to this run; note that these do not necessarily
include default values set on the flow function, only the parameter values explicitly passed for the run
- `parent_flow_run_id`: the ID of the flow run that triggered this run, if any
Expand All @@ -35,6 +36,7 @@
"scheduled_start_time",
"name",
"flow_name",
"flow_version",
"parameters",
"parent_flow_run_id",
"parent_deployment_id",
Expand Down Expand Up @@ -119,7 +121,7 @@ async def _get_flow_from_run(flow_run_id):
return await client.read_flow(flow_run.flow_id)


def get_id() -> str:
def get_id() -> Optional[str]:
flow_run_ctx = FlowRunContext.get()
task_run_ctx = TaskRunContext.get()
if flow_run_ctx is not None:
Expand Down Expand Up @@ -190,6 +192,21 @@ def get_flow_name() -> Optional[str]:
return flow_run_ctx.flow.name


def get_flow_version() -> Optional[str]:
flow_run_ctx = FlowRunContext.get()
run_id = get_id()
if flow_run_ctx is None and run_id is None:
return None
elif flow_run_ctx is None:
flow = from_sync.call_soon_in_loop_thread(
create_call(_get_flow_from_run, run_id)
).result()

return flow.version
else:
return flow_run_ctx.flow.version


def get_scheduled_start_time() -> pendulum.DateTime:
flow_run_ctx = FlowRunContext.get()
run_id = get_id()
Expand Down Expand Up @@ -313,4 +330,5 @@ def get_flow_run_ui_url() -> Optional[str]:
"run_count": get_run_count,
"api_url": get_flow_run_api_url,
"ui_url": get_flow_run_ui_url,
"flow_version": get_flow_version,
}
29 changes: 28 additions & 1 deletion tests/results/test_flow_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import pytest

from prefect import flow, task
from prefect.blocks.core import Block
from prefect.context import get_run_context
from prefect.exceptions import MissingResult
from prefect.filesystems import LocalFileSystem
from prefect.results import PersistedResultBlob, UnpersistedResult
from prefect.results import (
PersistedResultBlob,
UnpersistedResult,
)
from prefect.serializers import (
CompressedSerializer,
JSONSerializer,
Expand Down Expand Up @@ -440,3 +444,26 @@ async def foo():
result = await foo()

assert_blocks_equal(result, await LocalFileSystem.load("explicit-storage"))


def test_flow_version_result_storage_key():
@task(result_storage_key="{prefect.runtime.flow_run.flow_version}")
def some_task():
return "hello"

@flow(version="somespecialflowversion")
def some_flow() -> Block:
some_task()
return get_run_context().result_factory.storage_block

storage_block = some_flow()

assert isinstance(storage_block, LocalFileSystem)
result = pickle.loads(
base64.b64decode(
PersistedResultBlob.model_validate_json(
storage_block.read_path("somespecialflowversion")
).data
)
)
assert result == "hello"

0 comments on commit aad6693

Please sign in to comment.