Custom Transforms in Benchling Connect
Background Knowledge
Custom Transforms are a way to inject arbitrary logic into Benchling Connect’s processing pipeline. The user will upload a file to a Run's Automation Output Processor, invoke the transform by attempting to "process" the uploaded-file, and then Benchling will wait until an integration writes back a blob
.
Once the transformed file has been written to Benchling, the Automation Output Processor will continue to process the file according to the configuration in the Run's outputFileConfigs
.
Custom Transform
Custom transforms use a Benchling Application to do the transformation, and are configured as part of the processingSteps
in the outputFileConfigs
of a Benchling Connect Run schema.
Below is an excerpt of a Run Schema that only executes a Custom Transformation.
"processingSteps": [
{
"inputFileType": "CUSTOM",
"transformationProperties": {
"transformId": "app_<your_id_here>"
}
}
],
User Flow
A user will upload a file to the Automation Output Processor via inserting a Run into a Notebook entry, or via API.
When the user clicks "Process", the "Processing" indicator will spin until an integration writes back via the patchLabAutomationTransform API-endpoint.
If the integration writes back a blob
, then the processor pipeline continues on.
However, if the integration writes back an error, this will cancel the Automation Output Processor.
Integration Walkthrough
Now that we have seen how we would use a Custom Transform, let’s discuss what’s going on under the hood.
A Top-Down View
The core functionality of any Custom Transform integration, built as an Event-driven AWS Lambda, will look like this:
def lambda_handler(event, context):
transform = event["transform"]
input_blob = fetch_blob(transform["blobId"])
try:
processed_data = do_business_logic(input_blob)
output_blob = create_blob(processed_data)
set_transform_output(transform, output_blob)
except Exception as e:
set_transform_error(transform, str(e))
Let’s break that down a little.
fetch_blob
This helper function simply retrieves the original file uploaded to the Automation Output Processor.
def fetch_blob(blobId : str):
return benchling.blobs.download_url(blobId)
do_business_logic
Here, any programatic transformation of the input blob is possible. You can look things up from external sources, collate metadata, report out to other systems; anything you deem necessary.
Please refer to the example below, which calculates the "Percentage of Whole" for each row's Result value.
def do_business_logic(input_data: pd.DataFrame):
input_data["Percentage of Whole"] = round(input_data["result"]/sum(input_data["result"]) * 100)
return input_data
With an updated Result schema (to capture the additional column we just created), we would expect to see:
create_blob
This function takes the processed_data
from do_business_logic
and generates a blob
in Benchling.
Assuming that our processed_data
is a pandas
DataFrame, the function could look like:
def create_blob(data: pd.DataFrame):
return benchling.blobs.create_from_bytes(data.to_csv(index=False))
set_transform_output
This function performs the action of writing the newly-created blob
back to the Automation Output Processor in Benchling.
Notice that the body
of the PATCH
simply includes the blob
and has no definition of errors
.
def set_transform_output(transform, blob_id):
return benchling.api.patch_response(
f'api/v2/automation-file-transforms/{transform_id}',
body={"blobId": blob_id},
)
set_transform_error
If the Transform fails, the integration writes back details of the failure to the Automation Output Processor.
Notice that the body
of the call only includes errors
, and does not provide a blob
to the Automation Output Processor.
def set_transform_error(transform, error_string):
return benchling.api.patch_response(
f'api/v2/automation-file-transforms/{transform_id}',
body={"errors": [{'message': error_string}]},
)
Full Code Example
Here is a lambda handler that responds to a v2-beta.automationFileTransform.updated.status
event and writes a blob
back.
from benchling_sdk.benchling import Benchling
from benchling_sdk.auth.api_key_auth import ApiKeyAuth
import pandas as pd
# Replace [subdomain] and [api_key] with your tenant values (also remove brackets)
benchling = Benchling(url="https://[subdomain].benchling.com", auth_method=ApiKeyAuth("[api_key]"))
def create_blob(data: pd.DataFrame):
data_bytes = data.to_csv(index=False).encode('utf-8')
return benchling.blobs.create_from_bytes(
data_bytes, name="Custom Transform Output.csv"
)
def set_transform_output(transform_id, blob):
return benchling.api.patch_response(
f'api/v2/automation-file-transforms/{transform_id}',
body={"blobId": blob.id},
)
def set_transform_error(transform_id, error_string):
return benchling.api.patch_response(
f'api/v2/automation-file-transforms/{transform_id}',
body={"errors": [{'message': error_string}]},
)
def do_business_logic(input_data: pd.DataFrame):
input_data["Percentage of Whole"] = round(input_data["result"]/sum(input_data["result"]) * 100)
return input_data
def lambda_handler(event, context):
transform = event['detail']["transform"]
input_blob_download_url_object = benchling.blobs.download_url(transform["blobId"])
try:
input_data = pd.read_csv(input_blob_download_url_object.download_url)
processed_data = do_business_logic(input_data)
output_blob = create_blob(processed_data)
set_transform_output(transform["id"], output_blob)
except Exception as e:
set_transform_error(transform["id"], str(e))
Appendix
AWS EventBridge Rule
Here is an example EventBridgeRule
to invoke the above code:
{
"detail-type": [
"v2-beta.automationFileTransform.updated.status"
],
"detail": {
"transform": {
"customTransformId": ["app_FHD6INW2pyvPEXK1"] # this is the ID for your Benchling App
}
}
}
Benchling Connect Run Schema
Here is an example of a Benchling Connect Run's schema, written in JSON
.
{
"processingSteps": [
{
"inputFileType": "CUSTOM",
"outputFileType": "FINAL_DATA_FRAME",
"transformationProperties": {
"transformId": "app_0HLHMi6WnyhQRSYq"
}
}
],
"resultSchemaInfo": {
"fields": {
"schema_field_77": {
"type": "PRIMARY_SAMPLE_ENTITY"
},
"percentage_of_whole": {
"columnName": "Percentage of Whole",
"type": "DIRECT"
},
"result": {
"columnName": "result",
"type": "DIRECT"
}
},
"schema": "assaysch_aGnTdxCz"
},
"tableType": "GRID",
"columnTypes": [
"ENTITY_NAME",
"SCHEMA_FIELD",
"SCHEMA_FIELD"
],
"delimiter": ",",
"fileExtension": "csv",
"name": "Records Results",
"volumeUnits": "uL"
}
Updated about 1 month ago