AWS: Orchestrating Dynamic Concurrent Glue Jobs for Partitioned Data Sets with Step Functions and Lambda

glue job arch.jpg
In this article, we'll discuss how to utilize AWS Glue, Step Function, and Lambda to enable you to run ETL workloads at any scale.
I will assume a basic understanding of some common AWS services, like Lambda, Glue, and Step Functions.
This is particularly useful if your data is partitioned by some criteria and you need to do some kind of ETL on this data. For example, if you had data in S3 partitioned by organization/provider in CSV/XML/JSON, and needed to convert this into a standardized data model for processing. These practices could be taken and used in other areas as well.

Create the Glue Script

Our glue script (Python) will read a DynamoDB change log that exists in S3 and then send a message per changed row. The creation of this change log is beyond the scope of this article, however it could be covered later in another article.

change_log_dir = args["change_log_dir"]
provider = args["provider"]
change_log_dynf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": [change_log_dir + "/provider=" + provider],
        "useS3ListImplementation": True,
        "recurse": True,
        "groupFiles": "inPartition",
        "compressionType": "gzip"
    },
    format="json",
    transformation_ctx=args['env'] + "-change_log_bookmark_ctx"
)

// you can now convert the Dynamic Frame to a Dataframe 
// and use native PySpark to do ETL operations
conv_df = change_log_dynf.toDF()

// ETL...

This is just a small snippet to give context, the main focus here will be on the orchestration pieces. Create your Glue Job however you need it.
change_log_dir and env can be set as environment variables when you create your glue job so that you can create multiple glue jobs if you have different environments. provider will be passed from the Step Function so we can run these dynamically.
TIP: The great part here about AWS's Dynamic Frame bookmarks is that they keep track of what data has and hasn't been processed yet for you! The transformation_ctx here is literally all you need to implement a robust Glue Job bookmark into your job.

Create the Step Function

AWS provides a fantastic interactive GUI for creating state machines, so I highly recommend you use that in order to create a step function that best fits your needs. Here is a JSON snippet for invoking the Glue script

"Start change_log Glue Job": {
    "Type": "Task",
    "Resource": "arn:aws:states:::glue:startJobRun.sync",
    "Parameters": {
        "JobName": "etl-${dev}-change_log",
        "Arguments": {
            "--provider.$": "$.provider"
        }
    },
    "Next": "Update Status Success",
    "Catch": [
        {
            "ErrorEquals": [
                "States.ALL"
            ],
            "Next": "Update Status Failed",
            "ResultPath": "$.result"
        }
    ],
    "ResultPath": null
}

This piece of the state machine will start our glue job we created in the last step (note that the name in JobName and the name of your Glue Job must match). It will go to the next step Update Status Success if the Glue Job completes successfully, otherwise it will go to Update Status Failed. You can modify this as necessary.
The provider variable will be passed in from the lambda in the next step.
${dev} is a substitution variable and can be replaced via DefinitionSubstitutions in a CloudFormation template. You can replace it with your hard coded env or exclude it if you do not have multiple envs.

Create the Lambda

Your lambda will need some way to know which partitions (providers) to run the data on. We handle this through an external API that keeps configurations of how often each partition needs to run. Again, this is outside the scope of this article.

import os
from datetime import datetime, timedelta

import boto3
import requests
import json

url = os.environ["api_url"]
headers = {"x-api-key": os.environ["api_key"]}

def lambda_handler(event, context):
    start = datetime.now()
    client = boto3.client('stepfunctions')
    response = requests.get(url, headers=headers)
    response = response.json()
    for resp in response:
        if response[resp]['status'] != 'running':
            interval = int(response[resp]['frequency'])
            last_success = datetime.strptime(
                response[resp]['last_success'], "%a, %d %b %Y %H:%M:%S %Z"
            )
            if start - last_success > timedelta(minutes=interval):
                client.start_execution(
                    stateMachineArn=os.environ["stepfunc_arn"],
                    input=json.dumps({"provider": resp})
                )


// An example API response for context
{
    "providerA": {
         "frequency": "120",
         "last_success": "Tue, 08 Nov 2022 15:47:17 GMT",
         "status": "succeeded"
    },
    "providerB": {
         "frequency": "30",
         "last_success": "Tue, 08 Nov 2022 16:31:09 GMT",
         "status": "succeeded"
    }
)

Our API gives us a list of the S3 partitions we need to execute our Glue Job on as well as how often it needs to run and when the last time it successfully ran was. Those pieces are updated via the Step Function in the previous step and can easily be configured using the AWS state machine GUI. However, architect and configure those pieces however you see fit.
The Lambda will iterate through the API response of S3 partitions, figure out if that partition needs to run now, and then trigger a concurrent step function per-partition.
stepfunc_arn is an environment variable that can be configured when the lambda is created.

Create the Event Rule for Triggering the Lambda

This step is pretty simple. Just go to AWS' EventBridge service > Rules > Create rule. Name it whatever you'd like, put it on a schedule, and set the cron to whatever schedule you need it to run on (I choose every 15 minutes). Then target the Lambda service we just created.

Conclusion

This method of orchestrating ETL on partitioned data sets is extremely fast through a threaded approach, and cost effective with the orchestration pieces costing you pennies to run every year! I hope you've learned something and I hope you'll be able to apply this to create something incredible!

Sort:  

Congratulations @fritolays! You have completed the following achievement on the Hive blockchain and have been rewarded with new badge(s):

You received more than 10 upvotes.
Your next target is to reach 50 upvotes.

You can view your badges on your board and compare yourself to others in the Ranking
If you no longer want to receive notifications, reply to this comment with the word STOP

Check out the last post from @hivebuzz:

CBRS Hive Infographic Contest - Get your badge and win 1000 HIVE
Our Hive Power Delegations to the October PUM Winners
Feedback from the November 1st Hive Power Up Day - New Turnout Record
Support the HiveBuzz project. Vote for our proposal!

@fritolays - Welcome to the Hive Blockchain! Hope you can find people to connect with and can mutually support.

Thanks @sumatranate ! Glad to be here and excited to continue contributing!