Skip to main content

Agent Templates

This section contains a cookbook of user-defined code examples, which can be helpful for expediting Agent definition.

Watch for files locally and run Flow

In the user-defined function for Agents executing Flows, the get_param_mapping function is run whenever a file is added or modified in the directory that the Agent is watching. The execute function is run whenever all files are observed. The get_param_mapping function is used to specify the parameters to be passed to the Flow, and the execute function specifies the input files and parameters to the Flow that is executed.

Below is a skeleton example for the user-defined code with associated types:

from typing import Callable

def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
"""
This function returns a function that performs pattern matching against a file path.
Use this function as a template for creating your own pattern matching functions, which
you can then use in the values of the return object in the get_param_mapping function.

Returns
-------
Callable[[str], bool]
Function that takes a file as input and returns True if the file matches the pattern.
"""
pass

def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> dict[str, Callable[[str], bool]]:
"""
This function is called when a file is added or modified in the watch directory.
Modify this function to capture the files you want to trigger the flow;
the function should return a dictionary where the keys are <node name>.<param name>
and values are functions for performing pattern matching against the target file.

For nodes that accept multiple inputs, specify a list of functions to match against;
each specified function should uniquely match 1 file.

This function returns a dictionary of fp objects indexed by `node name`.`param name`
"""
pass

def execute(flow_params_fw: FileWatcherResult) -> TriggerFlowParams:
"""
Called when all glob patterns specified by get_param_mapping have been matched.

Parameters
----------
flow_params_fw : FileWatcherResult
Dict of FileParam objects indexed by <node name>.<param name>
"""
pass

Delivering files to a Flow with single file input Nodes

This example shows an Agent that delivers a csv file to the Bioreactor_File Node, an excel file containing the word 'medium' to the Medium_Composition Node, and an excel file containing the word 'eventlog' to the Event_Log Node.

import glob
import os
import re
from typing import Callable
from urllib import parse

from agent_sdk import (
FileParam,
FileWatcherResult,
MultiFileParam,
TriggerFlowParams,
)


def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
def fp_res(x: str):
x = parse.unquote(x)
return x in glob.glob(os.path.join(watch_dir, pattern), recursive=True)

return fp_res


def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> dict[str, Callable[[str], bool] | list[Callable[[str], bool]]]:
id_group = re.search(r"^(\w+)", file_name)
if id_group is None:
return {}
id = id_group.group()
return {
"Bioreactor_File.csv": fp(watch_dir, parent_dir, f"*.csv"),
"Medium_Composition.excel": fp(watch_dir, parent_dir, f"*medium*.xlsx"),
"Event_Log.excel": fp(watch_dir, parent_dir, f"*eventlog*.xlsx"),
}


def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
return TriggerFlowParams(
single_file_params=flow_params_fw.files,
multi_file_params=None,
benchling_tag=None,
additional_params={},
)

Deliver files to a Flow with a multi-input Node

This example shows an Agent configured to work with an flow with a Node taking multiple inputs, picking up filenames starting with 'Yeast_B1', 'Yeast_B2', 'Yeast_C1', 'Yeast_C2' and delivering the observed files to the Read_FCS_Files node. The Agent also delivers an input parameter of "exp234" to the Experiment_ID Node, which is an Input_Param node.

import glob
import os
from typing import Callable

from agent_sdk import (
FileWatcherResult,
MultiFileParam,
TriggerFlowParams,
info,
file_params_list_to_multi,
)


def fp(watch_dir: str, parent_dir: str, pattern: str) -> Callable[[str], bool]:
def fp_res(x: str):
return x in glob.glob(os.path.join(watch_dir, pattern), recursive=True)

return fp_res

def get_param_mapping(
watch_dir: str,
parent_dir: str = "",
file_name: str = "",
modified_time: str = "",
body: bytes = bytes(),
) -> dict[str, Callable[[str], bool]]:
match_dict = dict()

# Match files associated with wells B1, B2, C1, and C2 from a flow cytometry run
for well_row in range(1, 3):
for well_col in ["B", "C"]:
well_row_zfill = str(well_row).zfill(2)
match_key = f"Yeast_{well_col}{well_row}*"
match_dict[f"Yeast_{well_col}{well_row}.fcs"] = fp(
watch_dir, parent_dir, f"*{match_key}"
)

return match_dict


# Required Function
def execute(flow_params_fw: FileWatcherResult, **kwargs) -> TriggerFlowParams:
fcs_param = "Read_FCS_Files.fcs"

file_param_list = []
for file_param in list(flow_params_fw.files.values()):
file_param.content_type = 'application/octet-stream'
file_param.param = fcs_param
file_param_list.append(file_param)

m = file_params_list_to_multi(file_param_list)
m.param = fcs_param


return TriggerFlowParams(
single_file_params=None,
multi_file_params={fcs_param: m},
benchling_tag=None,
additional_params={"Experiment_ID": "exp234"},
)

Watch for flow outputs then save locally

Deliver worklist to a liquid handler PC

from agent_sdk import FileParam, info, error
from pathlib import Path
import os


def execute(new_file: FileParam, **kwargs) -> None:
filename = new_file.filename.split("/")[-1]
filename_split = filename.split("_")
exp_id = f"{filename_split[0]}_{filename_split[1]}"

default_path = "C:/Users/dev/liquid_handler/worklist/"

# The vars parameter can be updated by adding variables on the associated Connection page or upon Connection installation using a -v flag for each variable.
path = Path(kwargs.get("vars", {}).get("output_path", default_path))

# add experiment ID to the path variable
path = path / exp_id

# Create directory for writing file if necessary
try:
if not os.path.exists(path):
os.makedirs(path)
except PermissionError:
error(f"Permission denied: Cannot create directory at {path}.")
return None
except FileExistsError:
error(f"File exists: A file with the name '{path}' already exists and is not a directory.")
return None
except OSError as e:
error(f"OS error occurred: {e}")
return None

# full_path is C:/Users/dev/liquid_handler/worklist/<exp_id>/<filename>
full_path = path / filename

# Write file to full_path
try:
with open(full_path, "wb") as fp:
fp.write(new_file.body)
except FileNotFoundError:
error(f"Error: The directory '{path}' does not exist.")
return None
except PermissionError:
print(f"Error: Permission denied. Cannot write to '{full_path}'.")
return None
except IsADirectoryError:
print(f"Error: '{full_path}' is a directory, not a file.")
return None
except IOError as e:
print(f"IO error occurred: {e}")
return None
except AttributeError:
print("Error: 'new_file.body' is not a bytes-like object.")
return None
except Exception as e:
print(f"An unexpected error occurred while writing to {full_path}: {e}")
return None

if not os.access(full_path, os.R_OK):
info(f"Unable to read {full_path} after file write.")
else:
info(f"File write to {full_path} successful.")

return None

Cron Agent

Upload file if modified in the last day

This Agent would require watch_dir to be configured. Recency of last modified date can be configured if desired; if it isn't configured, then the watched file would be uploaded if it was modified in the last hour.

from pathlib import Path
import os
import time
from agent_sdk import info, error, FileParam, UploadFileParams

# Required Function
def execute(**kwargs) -> UploadFileParams | None:
"""
Executes on specified cadence.

Returns
-------
UploadFileParams | None
Files to upload; if set to None, then no files will be uploaded.
"""

agent_variables = kwargs.get('vars', {})
if not agent_variables:
info('Connection requires parent_dir and file_to_watch to be specified.')
return None

# grab watch_dir, recency_min, and recency_max from variables configurable in UI
watch_dir = agent_variables.get('parent_dir', None)
recency_min = float(agent_variables.get('recency_min', 0))
recency_max = float(agent_variables.get('recency_max', 86400)) # last day

if not watch_dir:
error('Connection requires watch_dir to be specified.')
return None

list_of_files = glob.glob(f'{watch_dir}/*.csv')

file_params = []
for file_path in list_of_files:
file_elapsed_time = time.time() - os.stat(file_path).st_mtime
if file_elapsed_time >= recency_min and file_elapsed_time < recency_max:
if not os.access(file_path, os.R_OK):
error(f'Do not have read permissions for file {file_path}')
return None
with open(file_path, 'rb') as f:
body = f.read()
file_params.append(FileParam(filename=file_path, body=body))
if (len(file_params) > 0):
return TriggerFlowParams(single_file_params=file_params)

return None

Variables (e.g. - parent_dir and recency_min) can be configured during the installation by passing additional variables during the installation:

# example variable configuration at installation
-v parent_dir=C:\Users\dev\experiment_results\output\ -v file_to_watch="output.log" -v recency_min=60 -v recency_max=1800

Post-installation, for Agents v4.8+, the parameters for Windows Connections can be updated in the Connection UI.

Querying OPC server

The following example demonstrates how to query an OPC server for bioreactor data on a regular cadence, and places the results in Ganymede for querying and visualizing.

from agent_sdk import FileParam, TriggerFlowParams
from agent_sdk import info
import pandas as pd

import asyncio
from asyncua import Client, Node

import time
from functools import reduce

OPC_SERVER_URL = "opc.tcp://localhost:4840/ganymede/server/"
OPC_SERVER_URI = "http://examples.ganymede.github.io"

EXECUTION_TIME = 10 # seconds
PUBLISHING_INTERVAL = 500 # milliseconds
SAMPLING_INTERVAL = 2000 # milliseconds

BIOREACTORS = [
"Cytiva Wave",
"Cytiva XDR",
]




async def opc_client():
client = Client(url=OPC_SERVER_URL)
async with client:
idx = await client.get_namespace_index(uri=OPC_SERVER_URI)
info(f"Namespace index for '{OPC_SERVER_URI}': '{idx}'")

node_display_names = {}
nodes_all = []
for bioreactor in BIOREACTORS:
object_node = await client.nodes.objects.get_child(f"{idx}:{bioreactor}")
nodes = await object_node.get_children()

for node in nodes:
display_name = await node.read_display_name()
node_display_names[node] = f"Bioreactor {bioreactor} - {display_name.Text}"

nodes_all.append(nodes)
nodes_all_flattened = reduce(lambda x, y: x + y, nodes_all)

# manual
start_time = time.time()
collected_data = []

while (time.time() - start_time) < EXECUTION_TIME:
snapshot = []

tasks = [node.read_data_value() for node in nodes_all_flattened]
data_values = await asyncio.gather(*tasks)

for node, data_value in zip(nodes_all_flattened, data_values):
value = data_value.Value.Value
timestamp = data_value.SourceTimestamp

instrument, sensor_category, sensor_name = [
n.strip() for n in node_display_names[node].split("-")
]

snapshot.append(
{
"instrument": instrument,
"sensor_category": sensor_category,
"sensor_name": sensor_name,
"value": value,
"timestamp": timestamp,
}
)

collected_data.extend(snapshot)
await asyncio.sleep(SAMPLING_INTERVAL / 1000)

return collected_data


# Required Function
def execute(**kwargs) -> TriggerFlowParams | None:
"""
Function to execute on specified cadence

Returns
-------
TriggerFlowParams | None
Parameters to use in triggered Flow. If None is specified, then Flow will not be triggered.
"""

collected_data = asyncio.run(opc_client())
collected_data_str = pd.DataFrame(collected_data).to_csv(header=True, index=False)
info(f"Collected data: {collected_data_str}")

current_time = int(time.time() * 1000)
filename = f"opc_cron{current_time}.txt"

param = "Ingest_Bioreactor_Snapshot.file_pattern"
new_file_param = FileParam(filename=filename, body=collected_data_str, param=param)

return TriggerFlowParams(
single_file_params={param: new_file_param},
multi_file_params=None,
benchling_tag=None,
additional_params=None,
)