Ganymede Class
Overview
The Ganymede object is a powerful tool used within editor notebooks to access and interact with data from previous runs, allowing users to test code changes effectively. It contains the following attributes related to run context:
- flow_run_id: str - Epoch time associated with run, which corresponds to runs shown in Flow View
- initiator: str - Flow run initiator; agent name + MAC address for agent-initiated Flows, user email for user-initiated flows, event name for event-triggered Flows, or Flow name if triggered from another Flow
- initiator_type: str - Type of the user who initiated the flow run (AGENT, USER, EVENT, FLOW)
- ganymede_context: GanymedeContext - Run context information, detailed in the GanymedeContext section of this page
As an example, you can create a Ganymede object associated with the most recent run to mirror the prior execution in user-defined code:
import pandas as pd
from ganymede_sdk import Ganymede
# Retrieves the Ganymede object associated with the most recent run
g = Ganymede()
Displaying SDK Version and Run Context Parameters
You can easily display the current Ganymede SDK version and run context parameters:
display(g)
Accessing Run Context Parameters
To access specific run context parameters, such as the flow run ID, use the following command:
print(g.flow_run_id)
# Returns "1693156227387"
Retrieving a GanymedeContext for a Specific Run ID
If you need to retrieve the GanymedeContext associated with a specific run ID, you can do so by passing the ID to the Ganymede constructor:
g = Ganymede(flow_run_id='1693156227387')
Creating a Ganymede Object in the Execute Function
Within an execute function, you can create a Ganymede object using the ganymede_context parameter:
def execute(ganymede_context: GanymedeContext):
g = Ganymede(ganymede_context)
Methods for accessing tables
method
retrieve_sql
retrieve_sql allows you to query tabular data from the Ganymede data lake. You can provide a single SQL query or a semicolon-delimited string of queries.
The results are returned as a DataFrame if a single query is used or as a list of DataFrames if multiple queries are provided.
- param query_str: str - Semicolon-delimited query string(s).
- param render_dict: dict[str, str] | None - Dictionary used for rendering Jinja template variables in query. Not used if context is provided.
from ganymede_sdk import Ganymede
query_sql = 'SELECT * FROM (SELECT "sample", "query"); SELECT * FROM (SELECT "sample", "query2")'
df_query1, df_query2 = g.retrieve_sql(query_sql)
method
retrieve_tables
The retrieve_tables method retrieves tabular data from the Ganymede data lake while preserving field names with special characters. The result is returned as a dictionary of DataFrames, keyed by table name.
- param table_names: str | list[str] - The name of the table(s) to retrieve.
- param run_id_filter_field: str | None - If provided, filter for records with run ID associated with Ganymede Context
- param get_measurement_unit_flag: bool, by default False - Whether to return measurement unit information
- param use_cache_in_notebook: bool, by default False - If set to True, data is retrieved from the local cache in notebooks, speeding up development. Note that flow runs will not use cached data. Cache can be cleared with the clear_cache function in
ganymede_sdk
.
from ganymede_sdk import Ganymede
# example with no measurement units
df_query1 = g.retrieve_tables(['table1', 'table2'])
# example if measurement units are captured
df_measurement_units, df_query1 = g.retrieve_tables(['table_with_measurement_units1'])
method
list_tables
The list_tables method returns a Pandas DataFrame listing tables, column names, and associated flows. By default, it retrieves tables associated with the current flow.
- param current_flow_flag: bool, by default True - Whether to filter for tables associated with current flow or not
- param current_run_id: int | None, by default None - If set to specific run ID, filter to include records for specified run ID.
from ganymede_sdk import Ganymede
g = Ganymede()
# retrieves tables associated with flow
tables_in_current_flow = g.list_tables(context):
print(tables_in_current_flow)
# retrieves all tables in environment
for table_name in g.list_tables(context, current_flow_flag=False):
print(table_name)
method
list_tables_current_run
list_tables_current_run returns a listing of all tables associated with the current Flow and run ID as a Pandas DataFrame.
method
list_tables_current_flow
_list_tables_current_flow returns a listing of all tables associated with the current Flow as a Pandas DataFrame.
method
list_tables_all
list_tables_all returns a listing of all tables associated with the current environment as a Pandas DataFrame.
Methods for accessing files
method
retrieve_files
The retrieve_files method allows you to retrieve files from Ganymede cloud storage. By default, it retrieves the most recent file associated with the current run.
- param file_names: str | list[str] - The file(s) to retrieve.
- param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to Flows or outputs from Flow Nodes
- param current_flow_flag: bool, by default True - Filters for files associated with the current Flow
- param run_id: Optional[int], by default None - If set, filters for results with specified flow_run_id; if not specified, retrieve file(s) associated with most recent run ID
- param use_cache_in_notebook: bool, by default False - If set to True, retrieves files from local cache when in editor or analysis notebooks rather than querying Ganymede to expedite development. Note that flow runs will not reference cached files. Cache can be cleared by calling the clear_cache() function.
- param use_full_path: bool, by default True - Whether to return files keyed by full path or file name
from ganymede_sdk import Ganymede
g = Ganymede()
# Retrieves a file from the input bucket called "sample_test.csv" associated with the current flow
input_file = retrieve_files("sample_test.csv", flow_input_or_output="input")
# Retrieves 2 files from the output bucket in the environment associated with most recent run of flow
output_files = retrieve_files(['sample_output.xlsx', 'sample_validation.txt'],
flow_input_or_output="output", current_flow_flag=False)
method
retrieve_files_current_run
retrieve_files_current_run retrieves the files associated with the current run
- param file_names: str | list[str] - The file(s) to retrieve.
- param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to Flows or outputs from Flow Nodes
- param use_cache_in_notebook: bool, by default False - If set to True, retrieves files from local cache when in editor or analysis notebooks rather than querying Ganymede to expedite development. Note that flow runs will not reference cached files. Cache can be cleared by calling the clear_cache() function.
- param use_full_path: bool, by default True - Whether to return files keyed by full path or file name
method
get_last_run_input_files
get_last_run_input_files retrieves files uploaded to Ganymede cloud storage during the most recent flow execution.
from ganymede_sdk import Ganymede
g = Ganymede()
# Returns a dictionary of the files uploaded during the most recent flow execution
most_recent_input_files = g.get_last_run_input_files()
method
list_files
list_files lists files in the environment
- param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
- param flow_name: str, optional, by default True - Flow name to filter for. If set to specific flow name, filter to include records based on
- param current_flow_flag: bool, by default True - Whether to filter for files associated with current flow or not
- param current_run_id: int | None - If set, filters for results with specified flow_run_id
from ganymede_sdk import Ganymede
g = Ganymede()
# Lists all files in the input bucket of the current flow
df_input_files_cur_flow = g.list_files(flow_input_or_output="input")
display(df_input_files_cur_flow)
# Lists all files in the output bucket of the environment
df_output_files = g.list_files(flow_input_or_output="output", current_flow_flag=False):
display(df_output_files)
method
list_files_current_run
list_files_current_run returns a listing of available files in the specified bucket associated with the run ID provided by Ganymede context
- param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
method
list_files_current_flow
_list_files_current_flow returns a listing of available files in the specified bucket associated with the current Flow provided by Ganymede context
- param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
method
list_files_all
list_files_all returns a listing of all files in the specified bucket in the environment
- param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
- param flow_name: str, by default None - Flow name to filter for, if input
Sending emails from execute function
Templated email notifications can be sent from the execute function by using the GanymedeEmailAlert object in ganymede_sdk.util.
GanymedeEmailAlert takes 2 parameters
- param ganymede_context: GanymedeContext - GanymedeContext object associated with the flow run
- param html_template: str - HTML Jinja template for the email, rendered using the ganymede_context object. A Ganymede-specific template is used by default.
from ganymede_sdk import Ganymede
from ganymede_sdk.util.email import GanymedeEmailAlert
g = Ganymede()
email_alert = GanymedeEmailAlert(g.ganymede_context)
email_alert.send_email(
"user@email.com", "My subject", "My message to send",
cc="user@ganymede.bio",
bcc="user@email.com"
)
method
send_email
send_email sends an email notification to the specified recipient(s). The method returns the HTML object of the email sent.
- param to : Union[str, Iterable[str]] - The recipient(s) of the email. This can be a single email address (str) or a list of email addresses (Iterable).
- param subject : str - The subject of the email.
- param message : str - The plain text message content.
- param cc : Optional[Union[str, Iterable[str]]] - The recipient(s) to be copied on the email (CC), by default None. This can be a single email address or a list of email addresses.
- param bcc : Optional[Union[str, Iterable[str]]] - The recipient(s) to be blindly copied on the email (BCC), by default None. This can be a single email address or a list of email addresses.
- param custom_headers : Optional[Dict[str, Any]] - A dictionary of custom email headers, by default None.
Other methods
method
get_table_schema
get_table_schema returns the schema of a table in the Ganymede data lake
- param table_name: str - Name of table to retrieve schema for
method
get_secret
get_secret returns an environment secret, which can be useful for testing API calls that require authentication.
- param secret_name: str - Name of secret to retrieve
method
clear_cache
clear_cache clears files and tables stored in local cache for the editor or analysis notebook that this method is called from.
method
get_file_url
_get_file_url returns an HTTPS URL for referencing files stored in Ganymede cloud storage from external apps. More documentation on usage can be found on the deep links section of the File Browser page
- param filename: str - Path to the file within the bucket, without a leading slash
- param bucket: str - Bucket that the file is in; either 'input' or 'output'
method
get_gcs_uri
get_gcs_uri returns a Google Cloud Storage URI for referencing files. This method is useful for referencing files for tags, which is described in further detail on the Tagging Files page.
class
GanymedeContext
GanymedeContext stores Flow metadata such as user ID, input filenames, input parameters, and execution timestamp upon Flow execution. The GanymedeContext object from the most recent run can be mimicked in testing code via the Ganymede object.
from ganymede_sdk import Ganymede
g = Ganymede()
# Access GanymedeContext object with parameters from most recent run
display(g.ganymede_context)
A Ganymede class can be created from the GanymedeContext object within the execute function for pipeline execution.
from ganymede_sdk import Ganymede
# Create Ganymede object from ganymede_context within an execute function
def execute(ganymede_context: GanymedeContext):
g = Ganymede(ganymede_context)
Attributes
- dag.dag_id: str - The name of the Flow
- flow_run_id: str - Timestamp in ms when Flow was kicked off for manually triggered Flows; guaranteed to be unique per Flow run
- created: str - Timestamp in ms when Flow was kicked off for event-triggered Flows
- task.task_id: str - type of Node (e.g. - or )
Methods
- get_param(self, node_name:
str
, parameter_type:str
) ->str
:- Obtain input parameter or input filename in Flow. Node name can be found in the indigo bar at the top of each node. Parameter type generally references the file extension for nodes that ingest files, and can be found on the Flow Editor as values with green background and white text.
- set_param(self, parameter_type:
str
, parameter_value: Any, node_name:str
= None) -> None:- Set parameter value
Example
In the code below, the Flow run timestamp, filename uploaded, and experiment_id are incorporated into the input CSV as new columns in a table written to the data lake.
import pandas as pd
from typing import Union, Dict, List
def execute(
df_sql_result: Union[pd.DataFrame, List[pd.DataFrame]], ganymede_context=None
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""Add flow_run_id and input parameter as columns to Pandas DataFrame
"""
df_out = df_sql_result.copy()
df_out['__run_id'] = ganymede_context.flow_run_id
df_out['__input_file_name'] = ganymede_context.get_param('CSV_Read', 'csv')
df_out['experiment_id'] = ganymede_context.get_param('Input_Param', 'param')
return df_out