Data Migration With AWS Glue
— AWS, AWS Glue, tech — 12 min read
Data migration is a complex task that requires a lot of planning and execution. There are many ways and tools to migrate data, and I believe there is no one-size-fits-all solution. It depends on the data you have, infrastructure involved, and the requirements you have.
In this post, we will look at how to use AWS Glue to migrate data from one MySQL database to another. We will extract data from the source database, transform it, and load it into the target database. And, in order to preserve sequence of steps involved, we will use AWS Step Functions to orchestrate the process. We will be using AWS CloudFormation to create the resources needed, but you can also create them manually via AWS Console if you prefer. However I recommend using CloudFormation as it allows you to version control your infrastructure and easily replicate it across different environments. I'm using AWS Sam to quickly create the CloudFormation template and we will add the necessary resources to it.
Permissions and connections:
AWSTemplateFormatVersion: '2010-09-09'Transform: AWS::Serverless-2016-10-31Description: ETL Workflow using AWS Glue and AWS Step Functions
Parameters: Stack: Type: String Default: etl-stack-using-aws-glue
Resources: S3Bucket: Type: AWS::S3::Bucket Properties: BucketName: !Ref Stack
SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: !Sub ${Stack} GroupDescription: !Sub ${Stack} Security Group to permit VPC connection VpcId: # specify your VPC ID, or import it from another stack if available SecurityGroupIngress: - IpProtocol: tcp FromPort: 3306 # default MySQL port ToPort: 3306 CidrIp: # specify your VPC CIDR SecurityGroupEgress: - IpProtocol: -1 CidrIp: 0.0.0.0/0
SecurityGroupIngressSelfReference: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: tcp FromPort: 0 ToPort: 65535 SourceSecurityGroupId: !Ref SecurityGroup Description: Allow TCP connection from the same security group
ServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: glue.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: GlueJobPolicy PolicyDocument: Version: '2012-10-17' Statement: # as S3 is used to store exported and transformed data we need to allow Glue to access it - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:ListBucket Resource: !Sub arn:aws:s3:::${Stack} - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:ListBucket Resource: !Sub arn:aws:s3:::${Stack}/* # Glue jobs need to be able to create logs - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:log-group:/aws-glue/* # Glue needs to be able to start and get job runs - Effect: Allow Action: - ec2:CreateNetworkInterface - ec2:DeleteNetworkInterface - ec2:DescribeNetworkInterfaces - ec2:CreateTags - ec2:DescribeVpcs - ec2:DescribeVpcEndpoints - ec2:DescribeRouteTables - ec2:DescribeSubnets - ec2:DescribeSecurityGroups - ec2:tagResource Resource: "*" # Glue jobs will be using connections to the databases that will be defined in the next steps - Effect: Allow Action: - glue:GetConnection Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:connection/${Stack}-${Environment}-source-connection - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:connection/${Stack}-${Environment}-target-connection
SourceConnection: Type: AWS::Glue::Connection Properties: CatalogId: !Ref AWS::AccountId ConnectionInput: Name: !Sub ${Stack}-source-connection ConnectionType: JDBC ConnectionProperties: JDBC_CONNECTION_URL: !Sub - "jdbc:mysql://${host}:${port}/${dbname}" - host: # specify your source database host or import it or use AWS SSM/Secrets Manager if possible port: 3306 dbname: # specify your source database name USERNAME: # resolve your source database username from AWS SSM/Secrets Manager PASSWORD: # resolve your source database password from AWS SSM/Secrets Manager PhysicalConnectionRequirements: AvailabilityZone: !Sub ${AWS::Region}a # specify your availability zone, I'm using the first one in the region SecurityGroupIdList: - !Ref SecurityGroup SubnetId: # same as above, specify your subnet or import it
TargetConnection: Type: AWS::Glue::Connection # same as above, but for the target database
TargetDatabaseAccess: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: # specify your security group ID of the source database IpProtocol: tcp FromPort: 3306 ToPort: 3306 SourceSecurityGroupId: !Ref SecurityGroup
SourceDatabaseAccess: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: # specify your security group ID of the target database IpProtocol: tcp FromPort: 3306 ToPort: 3306 SourceSecurityGroupId: !Ref SecurityGroup
First, we are creating an S3 bucket that will be used to store extracted and transformed data. In order to connect to the databases we need to create source and target AWS Glue connections, specifying all the properties required, we will later pass these connections to AWS Glue jobs. The databases itself should allow traffic on the port specified for ingress connections. We are also creating a role that will be used by AWS Glue to access the resources it needs. One thing to note - a security group that is used by jobs should allow ingress traffic from itself. Having permissions part of the template covered, we can move on to the Glue jobs.
AWS Glue Job definitions:
ExtractionJob: Type: AWS::Glue::Job Properties: Name: !Sub ${Stack}-extraction-job Role: !GetAtt ServiceRole.Arn Command: Name: glueetl ScriptLocation: !Sub s3://${Stack}/src/extraction.py ExecutionProperty: MaxConcurrentRuns: 1 GlueVersion: "4.0" WorkerType: Standard NumberOfWorkers: 1 Timeout: 60 DefaultArguments: --connection-name: !Sub ${Stack}-source-connection --continuous-log-logGroup: !Ref LogGroup --continuous-log-logStreamPrefix: !Sub /${Stack} --extra-py-files: !Sub "s3://${Stack}/src/libs.zip,s3://${Stack}/src/connector-0.1-py3-none-any.whl" Connections: Connections: - !Ref SourceConnection
TransformationJob: Type: AWS::Glue::Job Properties: Name: !Sub ${Stack}-transformation-job Role: !GetAtt ServiceRole.Arn Command: Name: glueetl ScriptLocation: !Sub s3://${Stack}/src/transformation.py ExecutionProperty: MaxConcurrentRuns: 1 GlueVersion: "4.0" WorkerType: Standard NumberOfWorkers: 1 Timeout: 60 DefaultArguments: --continuous-log-logGroup: !Ref LogGroup --continuous-log-logStreamPrefix: !Sub /${Stack} --extra-py-files: !Sub "s3://${Stack}/src/libs.zip"
LoadJob: Type: AWS::Glue::Job Properties: Name: !Sub ${Stack}-load-job Role: !GetAtt ServiceRole.Arn Command: Name: glueetl ScriptLocation: !Sub s3://${Stack}/src/load.py ExecutionProperty: MaxConcurrentRuns: 1 GlueVersion: "4.0" WorkerType: Standard NumberOfWorkers: 1 Timeout: 60 DefaultArguments: --connection-name: !Sub ${Stack}-target-connection --continuous-log-logGroup: !Ref LogGroup --continuous-log-logStreamPrefix: !Sub /${Stack} --extra-py-files: !Sub "s3://${Stack}/src/libs.zip,s3://${Stack}/src/connector-0.1-py3-none-any.whl" Connections: Connections: - !Ref TargetConnection
Here we are creating three Glue jobs: extraction, transformation, and load. Each job has various properties, but the most important ones are the role, command, and connections. You can see the complete list of properties and their meaning here. The scripts for the jobs are stored in an S3 bucket, and we pass the location of the scripts to the jobs. We also pass the location of the libraries that will be used by these scripts. The jobs run in a serverless environment, so there is no need to worry about scaling them up or down. Once an AWS Glue job is finished, the resources it used will be released automatically. The connections specify the source and target resources that will be used by the job.
AWS State Machine:
StepFunctionsServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: GlueJobExecutionPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:StartJobRun - glue:GetJobRun Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/${ExtractionJob} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/${TransformationJob} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/${LoadJob}
StateMachine: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub ${Stack} RoleArn: !GetAtt StepFunctionsServiceRole.Arn DefinitionString: !Sub | { "StartAt": "ExtractionJob", "States": { "ExtractionJob": { "Type": "Task", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "${Stack}-extraction-job" }, "Next": "TransformationJob" }, "TransformationJob": { "Type": "Task", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "${Stack}-transformation-job" }, "Next": "LoadJob" }, "LoadJob": { "Type": "Task", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "${Stack}-load-job" }, "End": true } } }
Here we are creating a state machine that will orchestrate the Glue jobs. The state machine contains a set of steps that will be executed sequentially, in parnell or both. We need to pass it a role that will be used to execute the jobs. Enough with the CloudFormation, let's move on to the Glue jobs code.
AWS Glue Jobs walkthrough:
As you got the idea of how to create the resources, we can now proceed with the jobs code. You can choose Python, Scala, or Java for your Glue jobs as of now. Maybe some other programming languages will be added later. But, for now, we will be using Python to create scripts for the jobs, and we will start with the extraction job.
import jsonimport boto3from awsglue.utils import getResolvedOptions
import connector
s3 = boto3.client("s3")s3_bucket = "bucket_name"s3_prefix = "extracted-data/"table_name = "table_name"
def extract_data(): connection = connector.get_connection() cursor = connection.cursor() last_id = 0 page_count = 0 page_size = 1000
while True: query = f"SELECT * FROM {table_name} WHERE id > {last_id} ORDER BY id ASC LIMIT {page_size}" cursor.execute(query) rows = cursor.fetchall()
last_id = rows[-1]["id"] file_name = f"{s3_prefix}page_{page_count}.json" body = json.dumps(rows).encode()
# Upload compressed data to S3 s3.put_object(Bucket=s3_bucket, Key=file_name, Body=body) page_count += 1
cursor.close() connection.close()
extract_data()
As you can see we simply extract data from a MySQL database and upload it to an S3 bucket in json format. We are fetching data in batches an iterate over potentially a large table, that's why I'm suing WHERE > {last_id}
statement. However, your table may not have similar kind of column to iterate over, so you can use LIMIT
and OFFSET
for the pagination. Play around with your extraction query and make sure that it utilizes indexes to avoid full table scans and performs in the most optimal way. This post is not about optimizing SQL queries, so I will leave it to you.
I haven't explained connector.get_connection()
call yet (we will get there), but for now, let's move on to transformation job.
import boto3import jsonfrom awsglue.utils import getResolvedOptions
s3 = boto3.client("s3")s3_bucket = "bucket_name"s3_extracted_prefix = "extracted-data/" # here we store extracted, original datas3_transformed_prefix = "transformed-data/" # here we store transformed data
def transform_data(): response = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_extracted_prefix) while response.get("Contents"): for obj in response["Contents"]: key = obj["Key"] data = s3.get_object(Bucket=s3_bucket, Key=key)["Body"].read().decode("utf-8") # Perform transformation on data transformed_data = [ transformed_row for row in json.loads(data) transformed_row := transform_row(row) ] transformed_key = key.replace(s3_extracted_prefix, s3_transformed_prefix) s3.put_object(Bucket=s3_bucket, Key=transformed_key, Body=json.dumps(transformed_data).encode("utf-8"))
def transform_row(row): # just converting id that is a number in the original data to a string, # so it's compatible with the target schema, # but as you can imagine anything can be done here transformed_row["id"] = str(row["id"]) return transformed_row
transform_data()
In the transformation job, we are transforming the data from the source format to the target format. We are fetching the data from the S3 bucket, transforming it, and uploading it back to another folder within the same S3 bucket. The transformation is just an example in this case, but you can make it as complex as needed. Few things to note, when you work with large S3 buckets, where you have thousands of objects - list_objects_v2
call won't return all of them in one go, but just a page of 1000 objects. You need to iterate over the pages to get all the objects (check NextContinuationToken
property inside each response). Also, it's always nice to have a safe-net and validate transformation results before uploading them to S3, for example checking it against a JSON schema. Now let's move to the load job.
import boto3import jsonfrom awsglue.utils import getResolvedOptions
import connector
s3 = boto3.client("s3")s3_bucket = "bucket_name"s3_prefix = "transformed-data/"table_name = "transformed_table_name"
def load_data(): response = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
connection = connector.get_connection() cursor = connection.cursor()
while response.get("Contents"): for obj in response["Contents"]: key = obj["Key"] data = s3.get_object(Bucket=s3_bucket, Key=key)["Body"].read().decode("utf-8") for row in json.loads(data): query = f"INSERT INTO {table_name} ({', '.join(row.keys())}) VALUES ({', '.join(['%s'] * len(row.values()))})" cursor.execute(query, list(row.values())) connection.commit()
cursor.close() connection.close()
load_data()
In the load job, we are loading the transformed data into the target database. It looks very similar to the extraction job, so there is not much to go though. Except, as we need to specify row names inside insertion query - I get them from row.keys()
, which is nice as we rely on the transformed data format instead of hardcoding row names somewhere.
But it's just a personal preference. Also, we are using the same connector to connect to the database, now it's time to review it.
import pymysqlimport sys
from awsglue.context import GlueContextfrom awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContext
def get_connection(): sc = SparkContext() glueContext = GlueContext(sc) args = getResolvedOptions(sys.argv, ["JOB_NAME", "connection-name"]) connection_options = glueContext.extract_jdbc_conf(args["connection_name"]) url = connection_options["fullUrl"] # DB's host, port and name are extracted from the full url defined in the connection, # it follows the format like "jdbc:mysql://host:port/name", i will leave this part to you
# user and password are extracted from the connection options db_user = connection_options["user"] db_password = connection_options["password"]
# now we can return the connection to the jobs that use this module return pymysql.connect( host=db_host, user=db_user, password=db_password, database=db_name, port=int(db_port), cursorclass=pymysql.cursors.DictCursor, )
Packaging & Deployment:
I thought it would be a nice bonus to cover the topic of now to include custom modules inside scripts used in AWS Glue. As both extraction and load jobs work with with the database we may want to provide a reusable way of doing it. So instead of repeating the same code in both jobs we will rather create a connector module. The module will be uploaded to S3 bucket and then we will pass the location of the module to the jobs. The module itself is a simple script that uses PyMySQL to interact with the databases. We are using GlueContext to extract the connection options from the connection passed to the job. In order to package connector module we need to install Python setup tools and run a few commands to create a wheel file. Create a setup.py
file with the following content:
from setuptools import setupsetup(name="connector", version="0.1", py_modules=["connector"])
Then run the following commands to create a wheel file:
pip install setuptools wheelpython setup.py bdist_wheel
It will create a dist
folder with a wheel file inside, upload it so S3 bucket taking into account what path is used in the jobs.
As we are using PyMySQL or potentially any other library that is not available in the Glue environment by default, we need to package it and upload to S3 bucket. The process is similar to the one we used for the connector module. Create requirements.txt
file with the following content:
pymysqlfastjsonschema
Then run the following commands to create zip file:
pip install -r requirements.txt --target packagezip -r libs.zip package
To deploy the CloudFormation template, you first need to upload the scripts and libraries to the S3 bucket. In this case, the scripts and dependencies are uploaded to the "src" folder within the bucket. Once CloudFormation stack is successfully created, you can start the state machine to run the jobs and monitor their progress in the AWS Glue console. The details of the jobs can also be viewed in CloudWatch Logs. It is recommended to use AWS Step Functions to orchestrate the jobs and ensure they are executed in the correct order, although individual AWS Glue jobs can also be run manually.
Conclusion:
And there you have it, a simple yet comprehensive story of how to migrate data of different formats between different storages. I described some niche nuances that emphasize how flexible and powerful AWS Glue is. It's a great tool for ETL tasks, and it can be used for a wide range of use cases. However, I didn't cover all the features of this AWS resource, it could be a subject for another post... Happy coding!