Skip to main content

Agent Models and Methods

Objects associated with triggering a flow from an Agent can be found in ganymede_sdk.agent.models.

Within the user-defined function for Agents that execute flows, the get_param_mapping function is run whenever a file is added or modified in the directory that the Agent is watching, and the execute function is run whenever all files are observed. The get_param_mapping function is used to specify the parameters that will be passed to the flow, and the execute function is used to specify the input files and parameters to the flow that is executed.

A skeleton for the user-defined function with associated types is shown below:

def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
pass

def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> Dict[str, Callable[[str], bool]]:
# returns a dictionary of fp objects indexed by `node name`.`param name`
pass

def execute(flow_params_fw: FileWatcherResult) -> TriggerFlowParams:
# returns a TriggerFlowParams object
pass

Classes associated with Agent-triggered flows

The FileWatcherResult consists of a dictionary of FileParam objects indexed by node name.param name.

  • param files: dict[str, fileParam] - Dictionary of FileParam objects indexed by node name.param name
  • param tags: Optional[List[FileTag]] - List of tags to be applied to all files

TriggerFlowParams is used to specify the inputs to the flow that is executed when all files are observed. It contains the following parameters:

  • param single_file_params: Optional[Dict[str, FileParam]] - Dict of FileParam objects indexed by node name.param name. These parameters are used for nodes that accept a single file as input.
  • param multi_file_params: Optional[Dict[str, MultiFileParam]] - Dict of MultiFileParam objects indexed by node name.param name. These parameters are used for nodes that accept multiple files as input.
  • param benchling_tag: Optional[Tag] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Benchling node.
  • param additional_params: Optional[Dict[str, str]] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Param node.

The FileParam and MultiFileParam objects are used to specify the files to be uploaded to Ganymede Cloud and the corresponding flow parameters to be used in the flow. A dictionary of FileParam objects (FileWatcherResult) are provided to the execute function once all files to observe are detected.

The FileParam object contains the following parameters:

  • param filename: str - Name of file, e.g. "my_file.txt"
  • param content_type: str - Content type of file, e.g. "text/plain". If not specified, the content type of the first file in the files dict will be used.
  • param body: bytes - File contents in bytes
  • param param: str - Name of parameter to be used in flow, e.g. node_name.parameter_field_name
  • param parent_dir: str - Path within Agent watch directory that contains file. For example. if C:/Users/username/watch_dir/ is being watched and C:/Users/username/watch_dir/abc/def/my_file.txt is found, then parent_dir would be "abc/def"
  • param upload_ts: str - Timestamp string in ISO format of when file was uploaded to Agent watch directory, e.g. "2021-01-01T00:00:00Z"
  • param upload_path: Optional[str] - Path in Ganymede storage where file will be uploaded
  • param *tags: Optional[List[FileTag]] - List of tags to be applied to file
  • param bucket_name: str - Bucket associated with file
  • param files: str - Alternative method for specifying file contents. If specified, then the key of this dict is used as thefilename and the value is used as the body.

The MultiFileParam object is used for submitting multiple files to a single node. It contains the following parameters:

  • param files: str - Alternative method for specifying file contents. If specified, then the key of this dict is used as thefilename and the value is used as the body.
  • param content_type: str - Content type of file, e.g. "text/plain". If not specified, the content type of the first file in the files dict will be used.
  • param param: str - Name of parameter to be used in flow, e.g. node_name.parameter_field_name
  • param parent_dir: str - Path within Agent watch directory that contains file. For example. if C:/Users/username/watch_dir/ is being watched and C:/Users/username/watch_dir/abc/def/my_file.txt is found, then parent_dir would be "abc/def"
  • param upload_ts: str - Timestamp string in ISO format of when file was uploaded to Agent watch directory, e.g. "2021-01-01T00:00:00Z"
  • param upload_paths: Optional[List[str]] - Path in Ganymede storage where file will be uploaded
  • param tags: Optional[List[FileTag]] - List of tags to be applied to file
  • param bucket_name: str - Bucket associated with file

The MultiFileParam object contains a method for initiation from a list of FileParam objects as shown below. The content type of the object is assumed to take on the content type of the first item in the list.

# assume fp1 and fp2 are FileParam objects
m = MultiFileParam.from_file_param([fp1, fp2])

The TriggerFlowParams object is used to specify the inputs to the flow that is executed when all files are observed. It contains the following parameters:

  • param single_file_params: Optional[Dict[str, FileParam]] - Dict of FileParam objects indexed by node name.param name. These parameters are used for nodes that accept a single file as input.
  • param multi_file_params: Optional[Dict[str, MultiFileParam]] - Dict of MultiFileParam objects indexed by node name.param name. These parameters are used for nodes that accept multiple files as input.
  • param benchling_tag: Optional[Tag] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Benchling node.
  • param additional_params: Optional[Dict[str, str]] - Additional parameters to be passed to flow. This parameter is used for inputs to the Input_Param node.

Utility functions

Agent utility functions are provided in ganymede_sdk.agent.utils, which can be useful for validating data integrity and interacting with file systems.

Computing file checksums

Ganymede provides two functions for validating the integrity of files, which can be accessed from ganymede_sdk.agent.utils. The code below shows how these values can be accessed; these values can be used to:

  • verify the integrity of a file uploaded to cloud storage
from ganymede_sdk.agent.utils import calculate_md5, calculate_crc32c

file_path = "path/to/local/file"

# either md5 or crc32c can be used to validate the integrity of a file
md5 = calculate_md5(file_path)
crc32c = calculate_crc32c(file_path)

These methods can also be used to calculate the checksum of a file that has been uploaded to Ganymede Cloud. To do so, create a tempfile.TemporaryFile object and write the file contents to it before calculating the checksum.

from ganymede_sdk.agent.utils import calculate_md5, calculate_crc32c
import os
import tempfile

data = b"Example data to calculate checksum"

with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(data)
tmp_file_name = tmp_file.name

md5 = calculate_md5(tmp_file_name)
crc32c = calculate_crc32c(tmp_file_name)

os.remove(tmp_file_name)

File system utilities

ganymede_sdk.agent.utils provides a number of convenience functions, which can be helpful to use with cron Agents that involve more complex logic prior to invoking a flow. Some examples of this are when a file is written to multiple times before being processed, or if there is a variable number of files being processed, such that the trigger for invoking a flow requires more than just the presence of a file.

Dataclasses

The ScanResult object is a dataclass used to store the file paths for files of interest. It contains the following parameters:

  • param file_path: str - Path to file
  • param modified_time: datetime - Datetime of when file was last modified

Functions

list_files_recursive returns a list of all files in a directory and its subdirectories.

  • param file_path: str - Path to directory to list files from

matches_pattern returns True if a file path matches at least one of the specified regex patterns specified and False otherwise.

  • param filename: str - Name of file
  • param pattern: re.Pattern | list[re.Pattern] - Regex pattern or list of regex patterns to match against

is_file_ready returns True if a file has the modified time is within the last interval_in_seconds seconds, or if the size of the file has changed in that same timespan.

  • param file_path: str - Path to file to watch
  • param threshold_seconds: int - Number of seconds to wait between checks, by default 0.1

get_most_recent_access_result returns a ScanResult object referencing the most recently accessed file in a directory. Access time is updated when a file is read from or written to.

  • param directory: str - Path to directory to watch

filter_by_age returns a list of files that have not been modified within the last age_in_minutes minutes.

  • param scan_results: List[ScanResult] - List of ScanResult objects
  • param age_in_minutes: int - Minimum age in minutes

zip_directory creates a zip file of a directory and its contents.

  • param directory: str - Path to directory to zip
  • param zip_file: str - Path to zip file to create

scan_for_finished_files scans a directory, returning paths to files with a modified date older than the specified number of minutes

  • param directory: str - Path to directory to scan
  • param age_in_minutes: int - Minimum age in minutes for files to be included in the results
  • param pattern: re.Pattern | list[re.Pattern] - Regex pattern to match files against; only files that match against at least one of the specified patterns will be included in results

One example of how these functions can be used is to use scan_for_finished_files to scan a directory for files continuously, uploading them to Ganymede Cloud for processing when they are older than a specified number of minutes. Such a flow would query previously uploaded files to Ganymede using the list_files method to ensure that the same file is not uploaded multiple times.