Flow Run Automation
Programmatically Running Flows
Flows can be run programmatically using the GanymedeFlowRun object in ganymede_sdk.flow_run.
GanymedeFlowRun requires API configuration and a Flow ID to run a Flow. Reach out to Ganymede Support for API configuration assistance if needed.
In the example below, an API key called "ganymede_api_key_demo" is configured in the environment. A string with value 'EXP001' is attached to the Input_Param node, and a CSV file is attached to the CSV_Read node in a Flow with an ID of demo-flow.
from ganymede_sdk.flow_runtime import GanymedeFlowRun
from ganymede_sdk import get_secret
flow_run = GanymedeFlowRun(api_key=get_secret("ganymede_api_key_demo"))
flow_run.attach_param(node='Input_Param', value='EXP001')
flow_run.attach_file(node="CSV_Read", param="csv", files={"input_file.txt": b"Hello World"})
# Trigger the Flow Run using the parameters attached to the GanymedeFlowRun object
flow_run.trigger_flow_run("demo-flow")
To monitor the status of the Flow Run, use the get_flow_run_status
method.
# retrieve Flow run status for the latest execution of trigger_flow_run
flow_run.get_flow_run_status()
# retrieve Flow run status for a specific Flow run
flow_run.get_flow_run_status(flow_name="demo-flow", run_id="1234567890")
If you would like to poll for Flow run completion, use the wait_for_flow_run_completion
method.
flow_run.wait_for_flow_run_completion()
method
attach_file
Upload a file to Ganymede and associate with Node for Flow run. The uploaded file will be stored in the Ganymede input bucket as _api/{file_name}.
- param node : str - Name of the node
- param param : str - Name of the parameter (typically, file extension associated with the node)
- param files : dict[str, bytes | BytesIO] - Dictionary keyed by the name of the file(s) to upload, with values as the file contents
- param is_multi : bool, optional - Whether input node references single or multiple files. Defaults to False.
method
attach_param
Attach an input string parameter to the Flow run
- param node : str - Name of the node
- param value : str - Value of the parameter
method
reference_file
Reference a file in the Ganymede input bucket to a node for Flow run.
- param node : str - Name of the node
- param param : str - Name of the parameter (typically, file extension associated with the node)
- param file_name : str | list[str] - Name of the file in the Ganymede input bucket
- param is_multi: bool, optional - Whether input node references single or multiple files. Defaults to False.
method
trigger_flow_run
Trigger a Flow run with the attached parameters. Returns a RunConf object containing the Flow run configuration.
- param flow : str - ID of the Flow to run
- param run_conf: RunConf | None - Run configuration for the Flow run if specified. If not specified, a new run configuration will be created based on calls to attach_param, attach_file, and reference_file.
method
get_flow_run_status
Obtain Flow run status for specified flow and run ID. If not specified, retrieve status for latest execution of trigger_flow_run.
- param flow_name : str, optional - Name of the Flow to retrieve status for. If not specified, retrieve status for latest execution of trigger_flow_run.
- param run_id : str, optional - Run ID of the Flow run to retrieve status for. If not specified, retrieve run ID for latest execution of trigger_flow_run.
method
wait_for_flow_run_completion_async
Asynchronously polls the get_flow_run_status endpoint every check_interval
seconds until the Flow is in a terminal state (Success/Failed). Returns the final status dictionary.
- param flow_name : str, optional - Name of the Flow to retrieve status for. If not specified, retrieve status for latest execution of trigger_flow_run.
- param run_id : str, optional - Run ID of the Flow run to retrieve status for. If not specified, retrieve run ID for latest execution of trigger_flow_run.
- param check_interval : int, optional - Interval in seconds to poll for Flow status. Defaults to 5 seconds.
- param timeout_seconds : int, optional - Timeout in seconds to wait for Flow completion. Defaults to 3600 seconds (1 hour).
- param verbose : bool, optional - Whether to print status updates to console at each check interval. Defaults to False.
method
wait_for_flow_run_completion
Synchronously polls for flow run completion by running the async method in an event loop. This method can be run in either a Jupyter notebook or a Python script.
- param flow_name : str, optional - Name of the Flow to retrieve status for. If not specified, retrieve status for latest execution of trigger_flow_run.
- param run_id : str, optional - Run ID of the Flow run to retrieve status for. If not specified, retrieve run ID for latest execution of trigger_flow_run.
- param check_interval : int, optional - Interval in seconds to poll for Flow status. Defaults to 5 seconds.
- param timeout_seconds : int, optional - Timeout in seconds to wait for Flow completion. Defaults to 3600 seconds (1 hour).
- param verbose : bool, optional - Whether to print status updates to console at each check interval. Defaults to False.