Skip to main content

Agent Models and Methods

User-defined code examples by Agent type

This section contains a cookbook of user-defined code examples, which can be helpful for expediting Agent definition.

Watch for files locally and run Flow

In the user-defined function for Agents executing Flows, the get_param_mapping function is run whenever a file is added or modified in the directory that the Agent is watching. The execute function is run whenever all files are observed. The get_param_mapping function is used to specify the parameters to be passed to the Flow, and the execute function specifies the input files and parameters to the Flow that is executed.

Below is a skeleton example for the user-defined code with associated types:

from typing import Callable

def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
"""
This function returns a function that performs pattern matching against a file path.
Use this function as a template for creating your own pattern matching functions, which
you can then use in the values of the return object in the get_param_mapping function.

Returns
-------
Callable[[str], bool]
Function that takes a file as input and returns True if the file matches the pattern.
"""
pass

def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> dict[str, Callable[[str], bool]]:
"""
This function is called when a file is added or modified in the watch directory.
Modify this function to capture the files you want to trigger the flow;
the function should return a dictionary where the keys are <node name>.<param name>
and values are functions for performing pattern matching against the target file.

For nodes that accept multiple inputs, specify a list of functions to match against;
each specified function should uniquely match 1 file.

This function returns a dictionary of fp objects indexed by `node name`.`param name`
"""
pass

def execute(flow_params_fw: FileWatcherResult) -> TriggerFlowParams:
"""
Called when all glob patterns specified by get_param_mapping have been matched.

Parameters
----------
flow_params_fw : FileWatcherResult
Dict of FileParam objects indexed by <node name>.<param name>
"""
pass

Delivering files to a Flow with single file input Nodes

This example shows an Agent that delivers a csv file to the Bioreactor_File Node, an excel file containing the word 'medium' to the Medium_Composition Node, and an excel file containing the word 'eventlog' to the Event_Log Node.

import glob
import os
import re
from typing import Callable
from urllib import parse

from ganymede_sdk.agent.models import (
FileParam,
FileWatcherResult,
MultiFileParam,
TriggerFlowParams,
)


def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
def fp_res(x: str):
x = parse.unquote(x)
return x in glob.glob(os.path.join(watch_dir, pattern), recursive=True)

return fp_res


def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> dict[str, Callable[[str], bool] | list[Callable[[str], bool]]]:
id_group = re.search(r"^(\w+)", file_name)
if id_group is None:
return {}
id = id_group.group()
return {
"Bioreactor_File.csv": fp(watch_dir, parent_dir, f"*.csv"),
"Medium_Composition.excel": fp(watch_dir, parent_dir, f"*medium*.xlsx"),
"Event_Log.excel": fp(watch_dir, parent_dir, f"*eventlog*.xlsx"),
}


def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
return TriggerFlowParams(
single_file_params=flow_params_fw.files,
multi_file_params=None,
benchling_tag=None,
additional_params={},
)

Deliver files to a Flow with a multi-input Node

This example shows an Agent configured to work with an flow with a Node taking multiple inputs, picking up filenames starting with 'Yeast_B1', 'Yeast_B2', 'Yeast_C1', 'Yeast_C2' and delivering the observed files to the Read_FCS_Files node. The Agent also delivers an input parameter of "exp234" to the Experiment_ID Node, which is an Input_Param node.

import glob
import os
from typing import Callable

from agent_sdk import info

from ganymede_sdk.agent.models import (
FileWatcherResult,
MultiFileParam,
TriggerFlowParams,
)


def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
def fp_res(x: str):
return x in glob.glob(os.path.join(watch_dir, pattern), recursive=True)

return fp_res

def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> dict[str, Callable[[str], bool]]:
match_dict = dict()

# Match files associated with wells B1, B2, C1, and C2 from a flow cytometry run
for well_row in range(1, 3):
for well_col in ["B", "C"]:
well_row_zfill = str(well_row).zfill(2)
match_key = f"Yeast_{well_col}{well_row}*"
match_dict[f"Yeast_{well_col}{well_row}.fcs"] = fp(
watch_dir, parent_dir, f"*{match_key}"
)

return match_dict


# Required Function
def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
fcs_param = "Read_FCS_Files.fcs"

file_param_list = []
for file_param in list(flow_params_fw.files.values()):
file_param.content_type = 'application/octet-stream'
file_param.param = fcs_param
file_param_list.append(file_param)

m = MultiFileParam.from_file_param(file_param_list)
m.param = fcs_param


return TriggerFlowParams(
single_file_params=None,
multi_file_params={fcs_param: m},
benchling_tag=None,
additional_params={"Experiment_ID": "exp234"},
)

Watch for flow outputs then save locally

Deliver worklist to a liquid handler PC

from agent_sdk import info, error
from ganymede_sdk.agent.models import FileParam
from pathlib import Path
import os


def execute(new_file: FileParam, **kwargs) -> None:
filename = new_file.filename.split("/")[-1]
filename_split = filename.split("_")
exp_id = f"{filename_split[0]}_{filename_split[1]}"

default_path = "C:/Users/dev/liquid_handler/worklist/"

# The vars parameter can be updated by adding variables on the associated Connection page or upon Connection installation using a -v flag for each variable.
path = Path(kwargs.get("vars", {}).get("output_path", default_path))

# add experiment ID to the path variable
path = path / exp_id

# Create directory for writing file if necessary
try:
if not os.path.exists(path):
os.makedirs(path)
except PermissionError:
error(f"Permission denied: Cannot create directory at {path}.")
return None
except FileExistsError:
error(f"File exists: A file with the name '{path}' already exists and is not a directory.")
return None
except OSError as e:
error(f"OS error occurred: {e}")
return None

# full_path is C:/Users/dev/liquid_handler/worklist/<exp_id>/<filename>
full_path = path / filename

# Write file to full_path
try:
with open(full_path, "wb") as fp:
fp.write(new_file.body)
except FileNotFoundError:
error(f"Error: The directory '{path}' does not exist.")
return None
except PermissionError:
print(f"Error: Permission denied. Cannot write to '{full_path}'.")
return None
except IsADirectoryError:
print(f"Error: '{full_path}' is a directory, not a file.")
return None
except IOError as e:
print(f"IO error occurred: {e}")
return None
except AttributeError:
print("Error: 'new_file.body' is not a bytes-like object.")
return None
except Exception as e:
print(f"An unexpected error occurred while writing to {full_path}: {e}")
return None

if not os.access(full_path, os.R_OK):
info(f"Unable to read {full_path} after file write.")
else:
info(f"File write to {full_path} successful.")

return None

Cron Agent

Upload file if modified in the last day

This Agent would require watch_dir to be configured. Recency of last modified date can be configured if desired; if it isn't configured, then the watched file would be uploaded if it was modified in the last hour.

from ganymede_sdk.agent.models import FileParam, UploadFileParams
from pathlib import Path
import os
import time
from agent_sdk import info, error

# Required Function
def execute(**kwargs) -> UploadFileParams | None:
"""
Executes on specified cadence.

Returns
-------
UploadFileParams | None
Files to upload; if set to None, then no files will be uploaded.
"""

agent_variables = kwargs.get('vars', {})
if not agent_variables:
info('Connection requires parent_dir and file_to_watch to be specified.')
return None

# grab watch_dir, recency_min, and recency_max from variables configurable in UI
watch_dir = agent_variables.get('parent_dir', None)
recency_min = float(agent_variables.get('recency_min', 0))
recency_max = float(agent_variables.get('recency_max', 86400)) # last day

if not watch_dir:
error('Connection requires watch_dir to be specified.')
return None

list_of_files = glob.glob(f'{watch_dir}/*.csv')

file_params = []
for file_path in list_of_files:
file_elapsed_time = time.time() - os.stat(file_path).st_mtime
if file_elapsed_time >= recency_min and file_elapsed_time < recency_max:
if not os.access(file_path, os.R_OK):
error(f'Do not have read permissions for file {file_path}')
return None
with open(file_path, 'rb') as f:
body = f.read()
file_params.append(FileParam(filename=file_path, body=body))
if (len(file_params) > 0):
return TriggerFlowParams(single_file_params=file_params)

return None

Variables can be configured during the installation by passing additional variables during the installation:

# example variable configuration at installation
-v parent_dir=C:\Users\dev\experiment_results\output\ -v file_to_watch="output.log" -v recency_min=60 -v recency_max=1800

Post-installation, for Agents v4.8+, the parameters for Windows Connections can be updated in the Connection UI.

Classes for Agent-triggered flows

Objects for triggering a Flow from an Agent can generally be found in ganymede_sdk.agent.models.

FileWatcherResult Class

FileWatcherResult is a dictionary of FileParam objects indexed by node name.param name.

  • param files: dict[str, fileParam] - Dictionary of FileParam objects indexed by node name.param name
  • param tags: Optional[List[FileTag]] - List of tags to be applied to all files

TriggerFlowParams Class

TriggerFlowParams specifies the inputs for the Flow executed when all files are observed. It includes the following parameters:

  • param single_file_params: Optional[dict[str, FileParam]] - Dict of FileParam objects indexed by node name.param name. These parameters are used for Nodes that accept a single file as input.
  • param multi_file_params: Optional[dict[str, MultiFileParam]] - Dict of MultiFileParam objects indexed by node name.param name. These parameters are used for Nodes that accept multiple files as input.
  • param benchling_tag: Optional[Tag] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Benchling node.
  • param additional_params: Optional[dict[str, str]] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Param node; the key is the name if the Node name for the input parameter, and the value is the string to pass into the Node.

FileParam Class

FileParam specifies files to be uploaded to Ganymede Cloud and their corresponding Flow parameters. These parameters are provided to the execute function once all files are detected.

  • param filename: str - Name of the file, e.g. "my_file.txt"
  • param content_type: str - Content type of the file, e.g. "text/plain". If not specified, the content type of the first file in the files dict will be used.
  • param body: bytes - File contents in bytes
  • param param: str - Name of parameter to be used in Flow, e.g. node_name.parameter_field_name
  • param parent_dir: str - Path within the Agent watch directory containing the file. For example. if C:/Users/username/watch_dir/ is being watched and C:/Users/username/watch_dir/abc/def/my_file.txt is found, then parent_dir would be "abc/def"
  • param upload_ts: str - Timestamp string in ISO format of when file was uploaded to the Agent watch directory, e.g. "2021-01-01T00:00:00Z"
  • param upload_path: Optional[str] - Path in Ganymede storage where file will be uploaded
  • param tags: Optional[List[FileTag]] - List of tags to be applied to the file
  • param bucket_name: str - Bucket associated with file
  • param files: str - Alternative method for specifying file contents, where the key is the filename and the value is the file body.

MultiFileParam Class

MultiFileParam is used for submitting multiple files to a single node. It includes the following parameters:

  • param files: str - Alternative method for specifying file contents, where the key is the filename and the value is the file body.
  • param content_type: str - Content type of file, e.g. "text/plain". If not specified, the content type of the first file in the files dict will be used.
  • param param: str - Name of parameter to be used in flow, e.g. node_name.parameter_field_name
  • param parent_dir: str - Path within Agent watch directory that contains file. For example. if C:/Users/username/watch_dir/ is being watched and C:/Users/username/watch_dir/abc/def/my_file.txt is found, then parent_dir would be "abc/def"
  • param upload_ts: str - Timestamp string in ISO format of when file was uploaded to Agent watch directory, e.g. "2021-01-01T00:00:00Z"
  • param upload_paths: Optional[List[str]] - Path in Ganymede storage where file will be uploaded
  • param tags: Optional[List[FileTag]] - List of tags to be applied to file
  • param bucket_name: str - Bucket associated with file

The MultiFileParam object contains a method for initiation from a list of FileParam objects as shown below. The content type of the object is assumed to take on the content type of the first item in the list.

# assume fp1 and fp2 are FileParam objects
m = MultiFileParam.from_file_param([fp1, fp2])

Utility functions

Agent utility functions are provided in ganymede_sdk.agent.utils for validating data integrity and interacting with file systems.

Computing file checksums

Ganymede provides functions to validate file integrity, accessible via ganymede_sdk.agent.utils. These values can be used to verify the integrity of a file uploaded to cloud storage:

from ganymede_sdk.agent.utils import calculate_md5, calculate_crc32c

file_path = "path/to/local/file"

# either md5 or crc32c can be used to validate the integrity of a file
md5 = calculate_md5(file_path)
crc32c = calculate_crc32c(file_path)

You can also calculate the checksum of a file uploaded to Ganymede Cloud by creating a a tempfile.TemporaryFile object, writing the file contents to it, and then calculating the checksum:

from ganymede_sdk.agent.utils import calculate_md5, calculate_crc32c
import os
import tempfile

data = b"Example data to calculate checksum"

with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(data)
tmp_file_name = tmp_file.name

md5 = calculate_md5(tmp_file_name)
crc32c = calculate_crc32c(tmp_file_name)

os.remove(tmp_file_name)

File system utilities

ganymede_sdk.agent.utils provides a number of convenience functions, which can be helpful to use with cron Agents that involve more complex logic prior to invoking a flow. Some examples of this are when a file is written to multiple times before being processed, or if there is a variable number of files being processed, such that the trigger for invoking a flow requires more than just the presence of a file.

ScanResult Dataclass

ScanResult stores file paths for files of interest. It includes:

  • param file_path: str - Path to file
  • param modified_time: datetime - Datetime of when file was last modified

Functions

list_files_recursive returns a list of all files in a directory and its subdirectories.

  • param file_path: str - Path to directory to list files from

matches_pattern returns True if a file path matches at least one of the specified regex patterns specified and False otherwise.

  • param filename: str - Name of file
  • param pattern: re.Pattern | list[re.Pattern] - Regex pattern or list of regex patterns to match against

is_file_ready returns True if a file has the modified time is within the last interval_in_seconds seconds, or if the size of the file has changed in that same timespan.

  • param file_path: str - Path to file to watch
  • param threshold_seconds: int - Number of seconds to wait between checks, by default 0.1

get_most_recent_access_result returns a ScanResult object referencing the most recently accessed file in a directory. Access time is updated when a file is read from or written to.

  • param directory: str - Path to directory to watch

filter_by_age returns a list of files that have not been modified within the last age_in_minutes minutes.

  • param scan_results: List[ScanResult] - List of ScanResult objects
  • param age_in_minutes: int - Minimum age in minutes

zip_directory creates a zip file of a directory and its contents.

  • param directory: str - Path to directory to zip
  • param zip_file: str - Path to zip file to create

scan_for_finished_files scans a directory, returning paths to files with a modified date older than the specified number of minutes

  • param directory: str - Path to directory to scan
  • param age_in_minutes: int - Minimum age in minutes for files to be included in the results
  • param pattern: re.Pattern | list[re.Pattern] - Regex pattern to match files against; only files that match against at least one of the specified patterns will be included in results

Example Use Case

You can use scan_for_finished_files to continuously scan a directory for files, uploading them to Ganymede Cloud for processing when they are older than a specified number of minutes. The Flow could query previously uploaded files using the list_files method to avoid uploading the same file multiple times.

Agent SDK

The Agent SDK offers access to query the Ganymede database and add logging messages to the web app (for Agents v4.8+).

Querying Ganymede from Agent Code

from agent_sdk.query import read_sql_query

df = read_sql_query('SELECT * FROM instrument_methods')

Logging Methods

Ganymede Agents (v4.9+) support user-defined logging messages in the agent_sdk, aligning with logging level for Agent messages. Each level corresponds with a separate method in agent_sdk.

from agent_sdk import internal, debug, info, activity, error

# log internal
internal('Display internal message')

# log debug
debug('Display debug message')

# log info
info('Display info message')

# log activity
activity('Display activity message')

# log error
error('Display error message')

In the UI, these log messages are viewable and filterable on the corresponding Connections page.