Custom Transforms in Benchling Connect

Background Knowledge

Custom Transforms are a way to inject arbitrary logic into Benchling Connect’s processing pipeline. The user will upload a file to a Run's Automation Output Processor, invoke the transform by attempting to "process" the uploaded-file, and then Benchling will wait until an integration writes back a blob.

Once the transformed file has been written to Benchling, the Automation Output Processor will continue to process the file according to the configuration in the Run's outputFileConfigs.

Custom Transform

Custom transforms use a Benchling Application to do the transformation, and are configured as part of the processingSteps in the outputFileConfigs of a Benchling Connect Run schema.

Below is an excerpt of a Run Schema that only executes a Custom Transformation.

"processingSteps": [
    {
        "inputFileType": "CUSTOM",
        "transformationProperties": {
            "transformId": "app_<your_id_here>"
        }
    }
],

User Flow

A user will upload a file to the Automation Output Processor via inserting a Run into a Notebook entry, or via API.

1742

File uploaded to the Automation Output Processor

When the user clicks "Process", the "Processing" indicator will spin until an integration writes back via the patchLabAutomationTransform API-endpoint.

1656

Automation Output Processor has been invoked.

If the integration writes back a blob, then the processor pipeline continues on.

1668

Successful transform and processing of file.

However, if the integration writes back an error, this will cancel the Automation Output Processor.

1816

Automation Output Processor has been cancelled due to an error encountered during the transform.

Integration Walkthrough

Now that we have seen how we would use a Custom Transform, let’s discuss what’s going on under the hood.

A Top-Down View

The core functionality of any Custom Transform integration, built as an Event-driven AWS Lambda, will look like this:

def lambda_handler(event, context):
        transform = event["transform"]
        input_blob = fetch_blob(transform["blobId"])
    try:
        processed_data = do_business_logic(input_blob)
        output_blob = create_blob(processed_data)
        set_transform_output(transform, output_blob)
    except Exception as e:
        set_transform_error(transform, str(e))

Let’s break that down a little.

fetch_blob

This helper function simply retrieves the original file uploaded to the Automation Output Processor.

def fetch_blob(blobId : str):
	return benchling.blobs.download_url(blobId)

do_business_logic

Here, any programatic transformation of the input blob is possible. You can look things up from external sources, collate metadata, report out to other systems; anything you deem necessary.

Please refer to the example below, which calculates the "Percentage of Whole" for each row's Result value.

def do_business_logic(input_data: pd.DataFrame):
    input_data["Percentage of Whole"] = round(input_data["result"]/sum(input_data["result"]) * 100)
    return input_data

With an updated Result schema (to capture the additional column we just created), we would expect to see:

852

Transformed file includes the "Percentage of Whole" column

create_blob

This function takes the processed_data from do_business_logic and generates a blob in Benchling.

Assuming that our processed_data is a pandas DataFrame, the function could look like:

def create_blob(data: pd.DataFrame):
    return benchling.blobs.create_from_bytes(data.to_csv(index=False))

set_transform_output

This function performs the action of writing the newly-created blob back to the Automation Output Processor in Benchling.

Notice that the body of the PATCH simply includes the blob and has no definition of errors.

def set_transform_output(transform, blob_id):
    return benchling.api.patch_response(
        f'api/v2/automation-file-transforms/{transform_id}', 
        body={"blobId": blob_id},
    )

set_transform_error

If the Transform fails, the integration writes back details of the failure to the Automation Output Processor.

Notice that the body of the call only includes errors, and does not provide a blob to the Automation Output Processor.

def set_transform_error(transform, error_string):
    return benchling.api.patch_response(
        f'api/v2/automation-file-transforms/{transform_id}', 
        body={"errors": [{'message': error_string}]},
    )

Full Code Example

Here is a lambda handler that responds to a v2-beta.automationFileTransform.updated.status event and writes a blob back.

from benchling_sdk.benchling import Benchling
from benchling_sdk.auth.api_key_auth import ApiKeyAuth

import pandas as pd

# Replace [subdomain] and [api_key] with your tenant values (also remove brackets)
benchling = Benchling(url="https://[subdomain].benchling.com", auth_method=ApiKeyAuth("[api_key]")) 

def create_blob(data: pd.DataFrame):
    data_bytes = data.to_csv(index=False).encode('utf-8')
    return benchling.blobs.create_from_bytes(
        data_bytes, name="Custom Transform Output.csv"
    )

def set_transform_output(transform_id, blob):
    return benchling.api.patch_response(
        f'api/v2/automation-file-transforms/{transform_id}', 
        body={"blobId": blob.id},
    )

def set_transform_error(transform_id, error_string):
    return benchling.api.patch_response(
        f'api/v2/automation-file-transforms/{transform_id}', 
        body={"errors": [{'message': error_string}]},
    )

def do_business_logic(input_data: pd.DataFrame):
    input_data["Percentage of Whole"] = round(input_data["result"]/sum(input_data["result"]) * 100)
    return input_data

def lambda_handler(event, context):
    transform = event['detail']["transform"]
    input_blob_download_url_object = benchling.blobs.download_url(transform["blobId"])
    try:
        input_data = pd.read_csv(input_blob_download_url_object.download_url)
        processed_data = do_business_logic(input_data)
        output_blob = create_blob(processed_data)
        set_transform_output(transform["id"], output_blob)
    except Exception as e:
        set_transform_error(transform["id"], str(e))

Appendix

AWS EventBridge Rule

Here is an example EventBridgeRule to invoke the above code:

{
  "detail-type": [
    "v2-beta.automationFileTransform.updated.status"
  ],
  "detail": {
    "transform": {
      "customTransformId": ["app_FHD6INW2pyvPEXK1"] # this is the ID for your Benchling App
    }
  }
}

Benchling Connect Run Schema

Here is an example of a Benchling Connect Run's schema, written in JSON.

{
    "processingSteps": [
        {
            "inputFileType": "CUSTOM",
            "outputFileType": "FINAL_DATA_FRAME",
            "transformationProperties": {
                "transformId": "app_0HLHMi6WnyhQRSYq"
            }
        }
    ],
    "resultSchemaInfo": {
        "fields": {
            "schema_field_77": {
                "type": "PRIMARY_SAMPLE_ENTITY"
            },
            "percentage_of_whole": {
                "columnName": "Percentage of Whole",
                "type": "DIRECT"
            },
            "result": {
                "columnName": "result",
                "type": "DIRECT"
            }
        },
        "schema": "assaysch_aGnTdxCz"
    },
    "tableType": "GRID",
    "columnTypes": [
        "ENTITY_NAME",
        "SCHEMA_FIELD",
        "SCHEMA_FIELD"
    ],
    "delimiter": ",",
    "fileExtension": "csv",
    "name": "Records Results",
    "volumeUnits": "uL"
}