Skip to main content

Ganymede Class

Overview

The Ganymede object is used within editor notebooks to access data in Editor notebooks, enabling users to test the behavior of code changes on data from prior runs.

import pandas as pd
from ganymede_sdk import Ganymede

# Retrieves the Ganymede object associated with the most recent run
g = Ganymede()

Display Ganymede SDK version and run context parameters

display(g)
Ganymede object display

Access run context parameters

print(g.flow_run_id)

# Returns "1693156227387"

Retrieve GanymedeContext associated with a specific run ID

g = Ganymede(flow_run_id='1693156227387')

In the execute function, the Ganymede object can be created from the ganymede_context parameter

def execute(ganymede_context: GanymedeContext):
g = Ganymede(ganymede_context)

Methods for accessing tables

method retrieve_sql

retrieve_sql allows users to query tabular data from the Ganymede data lake. Input can either be single SQL query, or a semicolon-delimited string containing a set of queries to run.

The results are returned as a DataFrame in Python if a single DataFrame is specified, or a list of Pandas DataFrames if multiple queries are submitted.

  • param query_str: str - Semicolon-delimited query string(s) to retrieve.
  • param render_dict: Optional[Dict[str, str]] - Dictionary used for rendering Jinja template variables in query. Not used if context is provided.
from ganymede_sdk import Ganymede

query_sql = 'SELECT * FROM (SELECT "sample", "query"); SELECT * FROM (SELECT "sample", "query2")'

df_query1, df_query2 = g.retrieve_sql(query_sql)

method retrieve_tables

retrieve_tables allows users to retrieve tabular data from the Ganymede data lake, preserving field names with special characters.

The results are returned as a Dict of DataFrames in Python, keyed by table name. Returned DataFrames contain the entire contents of the specified table(s).

  • param table_names: Union[str, List[str]] - Table or list of tables to retrieve
  • param use_cache_in_notebook: bool, by default False - If set to True, retrieves files from local cache when in editor or analysis notebooks rather than querying Ganymede to expedite development. Note that flow runs will not reference cached files. Cache can be cleared by calling the clear_cache() function.
from ganymede_sdk import Ganymede

df_query1 = g.retrieve_tables(['table1', 'table2'])

method list_tables

list_tables returns a listing of tables, column names, and associated flows as a Pandas DataFrame. By default, the method retrieves tables associated with the current flow.

  • param current_flow_flag: bool, by default True - Whether to filter for tables associated with current flow or not
  • param current_run_id: Optional[int], by default None - If set to specific run ID, filter to include records for specified run ID.
from ganymede_sdk import Ganymede

g = Ganymede()

# retrieves tables associated with flow
tables_in_current_flow = g.list_tables(context):
print(tables_in_current_flow)

# retrieves all tables in environment
for table_name in g.list_tables(context, current_flow_flag=False):
print(table_name)

method list_tables_current_run

list_tables_current_run returns a listing of all tables associated with the current flow and run ID as a Pandas DataFrame.

method list_tables_current_flow

list_tables_current_flow returns a listing of all tables associated with the current flow as a Pandas DataFrame.

method list_tables_all

list_tables_all returns a listing of all tables associated with the current environment as a Pandas DataFrame.

Methods for accessing files

method retrieve_files

retrieve_files allows users to retrieve files from Ganymede cloud storage. By default, the most recent file associated with current run is retrieved.

  • param file_names: Union[str, List[str]] - File or list of files to retrieve
  • param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
  • param input_or_output_bucket: str, by default "output" - (Pending deprecation) Either "input" or "output" - to reference bucket to retrieve files from.
  • param current_flow_flag: bool, by default True - Whether to filter for files associated with current flow or not
  • param run_id: Optional[int], by default None - If set, filters for results with specified flow_run_id; if not specified, retrieve file(s) associated with most recent run ID
  • param use_cache_in_notebook: bool, by default False - If set to True, retrieves files from local cache when in editor or analysis notebooks rather than querying Ganymede to expedite development. Note that flow runs will not reference cached files. Cache can be cleared by calling the clear_cache() function.
from ganymede_sdk import Ganymede

g = Ganymede()

# Retrieves a file from the input bucket called "sample_test.csv" associated with the current flow
input_file = retrieve_files("sample_test.csv", flow_input_or_output="input")

# Retrieves 2 files from the output bucket in the environment associated with most recent run of flow
output_files = retrieve_files(['sample_output.xlsx', 'sample_validation.txt'],
flow_input_or_output="output", current_flow_flag=False)

method get_last_run_input_files

get_last_run_input_files retrieves files uploaded to Ganymede cloud storage during the most recent flow execution.

from ganymede_sdk import Ganymede

g = Ganymede()

# Returns a dictionary of the files uploaded during the most recent flow execution
most_recent_input_files = g.get_last_run_input_files()

method list_files

list_files lists files in the environment

  • param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
  • param input_or_output_bucket: str, by default "input" - (Pending deprecation) Either "input" or "output" - to reference whether to observe the input or output bucket of the flow
  • param flow_name: str, optional, by default True - Flow name to filter for. If set to specific flow name, filter to include records based on
  • param current_flow_flag: bool, by default True - Whether to filter for files associated with current flow or not
  • param current_run_id: Optional[int] - If set, filters for results with specified flow_run_id
from ganymede_sdk import Ganymede

g = Ganymede()

# Lists all files in the input bucket of the current flow
df_cur_flow_input_files = g.list_files(flow_input_or_output="input")
display(df_cur_flow_input_files)

# Lists all files in the output bucket of the environment
df_output_files = g.list_files(flow_input_or_output="output", current_flow_flag=False):
display(df_output_files)

method list_files_current_run

list_files_current_run returns a listing of available files in the specified bucket associated with the run ID provided by Ganymede context

  • param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
  • param input_or_output_bucket: str, by default "input" - (Pending deprecation) Either "input" or "output" - to reference whether to observe the input or output bucket of the flow

method list_files_current_flow

list_files_current_flow returns a listing of available files in the specified bucket associated with the current flow provided by Ganymede context

  • param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
  • param input_or_output_bucket: str, by default "input" - (Pending deprecation) Either "input" or "output" - to reference whether to observe the input or output bucket of the flow

method list_files_all

list_files_all returns a listing of all files in the specified bucket in the environment

  • param flow_input_or_output: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
  • param input_or_output_bucket: str, by default "input" - (Pending deprecation) Either "input" or "output" - to reference whether to observe the input or output bucket of the flow
  • param flow_name: str, by default None - Flow name to filter for, if input

Sending emails from execute function

Templated email notifications can be sent from the execute function by using the GanymedeEmailAlert object in ganymede_sdk.util.

GanymedeEmailAlert takes 2 parameters

  • param ganymede_context: GanymedeContext - GanymedeContext object associated with the flow run
  • param html_template: str - HTML Jinja template for the email, rendered using the ganymede_context object. A Ganymede-specific template is used by default.
from ganymede_sdk import Ganymede
from ganymede_sdk.util.email import GanymedeEmailAlert

g = Ganymede()

email_alert = GanymedeEmailAlert(g.ganymede_context)
email_alert.send_email(
"user@email.com", "My subject", "My message to send",
cc="user@ganymede.bio",
bcc="user@email.com"
)

method send_email

send_email sends an email notification to the specified recipient(s). The method returns the HTML object of the email sent.

  • param to : Union[str, Iterable[str]] - The recipient(s) of the email. This can be a single email address (str) or a list of email addresses (Iterable).
  • param subject : str - The subject of the email.
  • param message : str - The plain text message content of the email.
  • param cc : Optional[Union[str, Iterable[str]]] - The recipient(s) to be copied on the email (CC), by default None. This can be a single email address or a list of email addresses.
  • param bcc : Optional[Union[str, Iterable[str]]] - The recipient(s) to be blindly copied on the email (BCC), by default None. This can be a single email address or a list of email addresses.
  • param custom_headers : Optional[Dict[str, Any]] - A dictionary of custom email headers, by default None.

Other methods

method get_table_schema

get_table_schema returns the schema of a table in the Ganymede data lake

  • param table_name: str - Name of table to retrieve schema for

method get_secret

get_secret returns an environment secret, which can be useful for testing API calls that require authentication.

  • param secret_name: str - Name of secret to retrieve

method clear_cache

clear_cache clears files and tables stored in local cache for the editor or analysis notebook that this method is called from.

method get_file_url

_get_file_url returns an HTTPS URL for referencing files stored in Ganymede cloud storage from external apps. More documentation on usage can be found on the deep links section of the File Browser page

method get_gcs_uri

get_gcs_uri returns a Google Cloud Storage URI for referencing files. This method is useful for referencing files for tags, which is described in further detail on the Tagging Files page.

class GanymedeContext

GanymedeContext stores flow metadata such as user ID, input filenames, input parameters, and execution timestamp upon flow execution. This object is generated in pipeline runs by the workflow management system. The GanymedeContext object from the most recent run can be mimicked in testing code via the Ganymede object.

from ganymede_sdk import Ganymede
g = Ganymede()

# Access GanymedeContext object with parameters from most recent run
display(g.ganymede_context)

A Ganymede class can be created from the GanymedeContext object within the execute function for pipeline execution.

from ganymede_sdk import Ganymede

# Create Ganymede object from ganymede_context within an execute function
def execute(ganymede_context: GanymedeContext):
g = Ganymede(ganymede_context)

Attributes

  • flow_run_id: Timestamp in ms when flow was kicked off for manually triggered flows; guaranteed to be unique per flow run
  • created: Timestamp in ms when flow was kicked off for event-triggered flows
  • dag.dag_id: name of flow
  • task.task_id: type of node (e.g. -
    Input_File
    or
    Python
    )

Methods

  • get_param(self, node_name: str, parameter_type: str) -> str:
    • Obtain input parameter or input filename in flow. Node name can be found in the indigo bar at the top of each node. Parameter type generally references the file extension for nodes that ingest files, and can be found on the Flow Editor as values with green background and white text.
  • set_param(self, parameter_type: str, parameter_value: Any, node_name: str = None) -> None:
    • Set parameter value

Example

In the code below, the flow run timestamp, filename uploaded, and experiment_id flow input parameter are incorporated into the input CSV as new columns and written to the data lake.

import pandas as pd
from typing import Union, Dict, List

def execute(
df_sql_result: Union[pd.DataFrame, List[pd.DataFrame]], ganymede_context=None
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""Add flow_run_id and input parameter as columns to Pandas DataFrame
"""

df_out = df_sql_result.copy()
df_out['__run_id'] = ganymede_context.flow_run_id
df_out['__input_file_name'] = ganymede_context.get_param('CSV_Read', 'csv')
df_out['experiment_id'] = ganymede_context.get_param('Input_Param', 'param')

return df_out