Skip to content
IT
GithubLinkedInMastodon

Data Migration With AWS Glue

AWS, AWS Glue, tech12 min read

I find this image both beautiful and ugly at the same time, thank you AI

Data migration is a complex task that requires a lot of planning and execution. There are many ways and tools to migrate data, and I believe there is no one-size-fits-all solution. It depends on the data you have, infrastructure involved, and the requirements you have.

In this post, we will look at how to use AWS Glue to migrate data from one MySQL database to another. We will extract data from the source database, transform it, and load it into the target database. And, in order to preserve sequence of steps involved, we will use AWS Step Functions to orchestrate the process. We will be using AWS CloudFormation to create the resources needed, but you can also create them manually via AWS Console if you prefer. However I recommend using CloudFormation as it allows you to version control your infrastructure and easily replicate it across different environments. I'm using AWS Sam to quickly create the CloudFormation template and we will add the necessary resources to it.

Permissions and connections:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: ETL Workflow using AWS Glue and AWS Step Functions
Parameters:
Stack:
Type: String
Default: etl-stack-using-aws-glue
Resources:
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref Stack
SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub ${Stack}
GroupDescription: !Sub ${Stack} Security Group to permit VPC connection
VpcId: # specify your VPC ID, or import it from another stack if available
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 3306 # default MySQL port
ToPort: 3306
CidrIp: # specify your VPC CIDR
SecurityGroupEgress:
- IpProtocol: -1
CidrIp: 0.0.0.0/0
SecurityGroupIngressSelfReference:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Ref SecurityGroup
IpProtocol: tcp
FromPort: 0
ToPort: 65535
SourceSecurityGroupId: !Ref SecurityGroup
Description: Allow TCP connection from the same security group
ServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: glue.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: GlueJobPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
# as S3 is used to store exported and transformed data we need to allow Glue to access it
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
Resource: !Sub arn:aws:s3:::${Stack}
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
Resource: !Sub arn:aws:s3:::${Stack}/*
# Glue jobs need to be able to create logs
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:log-group:/aws-glue/*
# Glue needs to be able to start and get job runs
- Effect: Allow
Action:
- ec2:CreateNetworkInterface
- ec2:DeleteNetworkInterface
- ec2:DescribeNetworkInterfaces
- ec2:CreateTags
- ec2:DescribeVpcs
- ec2:DescribeVpcEndpoints
- ec2:DescribeRouteTables
- ec2:DescribeSubnets
- ec2:DescribeSecurityGroups
- ec2:tagResource
Resource: "*"
# Glue jobs will be using connections to the databases that will be defined in the next steps
- Effect: Allow
Action:
- glue:GetConnection
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:connection/${Stack}-${Environment}-source-connection
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:connection/${Stack}-${Environment}-target-connection
SourceConnection:
Type: AWS::Glue::Connection
Properties:
CatalogId: !Ref AWS::AccountId
ConnectionInput:
Name: !Sub ${Stack}-source-connection
ConnectionType: JDBC
ConnectionProperties:
JDBC_CONNECTION_URL: !Sub
- "jdbc:mysql://${host}:${port}/${dbname}"
- host: # specify your source database host or import it or use AWS SSM/Secrets Manager if possible
port: 3306
dbname: # specify your source database name
USERNAME: # resolve your source database username from AWS SSM/Secrets Manager
PASSWORD: # resolve your source database password from AWS SSM/Secrets Manager
PhysicalConnectionRequirements:
AvailabilityZone: !Sub ${AWS::Region}a # specify your availability zone, I'm using the first one in the region
SecurityGroupIdList:
- !Ref SecurityGroup
SubnetId: # same as above, specify your subnet or import it
TargetConnection:
Type: AWS::Glue::Connection
# same as above, but for the target database
TargetDatabaseAccess:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: # specify your security group ID of the source database
IpProtocol: tcp
FromPort: 3306
ToPort: 3306
SourceSecurityGroupId: !Ref SecurityGroup
SourceDatabaseAccess:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: # specify your security group ID of the target database
IpProtocol: tcp
FromPort: 3306
ToPort: 3306
SourceSecurityGroupId: !Ref SecurityGroup

First, we are creating an S3 bucket that will be used to store extracted and transformed data. In order to connect to the databases we need to create source and target AWS Glue connections, specifying all the properties required, we will later pass these connections to AWS Glue jobs. The databases itself should allow traffic on the port specified for ingress connections. We are also creating a role that will be used by AWS Glue to access the resources it needs. One thing to note - a security group that is used by jobs should allow ingress traffic from itself. Having permissions part of the template covered, we can move on to the Glue jobs.

AWS Glue Job definitions:
ExtractionJob:
Type: AWS::Glue::Job
Properties:
Name: !Sub ${Stack}-extraction-job
Role: !GetAtt ServiceRole.Arn
Command:
Name: glueetl
ScriptLocation: !Sub s3://${Stack}/src/extraction.py
ExecutionProperty:
MaxConcurrentRuns: 1
GlueVersion: "4.0"
WorkerType: Standard
NumberOfWorkers: 1
Timeout: 60
DefaultArguments:
--connection-name: !Sub ${Stack}-source-connection
--continuous-log-logGroup: !Ref LogGroup
--continuous-log-logStreamPrefix: !Sub /${Stack}
--extra-py-files: !Sub "s3://${Stack}/src/libs.zip,s3://${Stack}/src/connector-0.1-py3-none-any.whl"
Connections:
Connections:
- !Ref SourceConnection
TransformationJob:
Type: AWS::Glue::Job
Properties:
Name: !Sub ${Stack}-transformation-job
Role: !GetAtt ServiceRole.Arn
Command:
Name: glueetl
ScriptLocation: !Sub s3://${Stack}/src/transformation.py
ExecutionProperty:
MaxConcurrentRuns: 1
GlueVersion: "4.0"
WorkerType: Standard
NumberOfWorkers: 1
Timeout: 60
DefaultArguments:
--continuous-log-logGroup: !Ref LogGroup
--continuous-log-logStreamPrefix: !Sub /${Stack}
--extra-py-files: !Sub "s3://${Stack}/src/libs.zip"
LoadJob:
Type: AWS::Glue::Job
Properties:
Name: !Sub ${Stack}-load-job
Role: !GetAtt ServiceRole.Arn
Command:
Name: glueetl
ScriptLocation: !Sub s3://${Stack}/src/load.py
ExecutionProperty:
MaxConcurrentRuns: 1
GlueVersion: "4.0"
WorkerType: Standard
NumberOfWorkers: 1
Timeout: 60
DefaultArguments:
--connection-name: !Sub ${Stack}-target-connection
--continuous-log-logGroup: !Ref LogGroup
--continuous-log-logStreamPrefix: !Sub /${Stack}
--extra-py-files: !Sub "s3://${Stack}/src/libs.zip,s3://${Stack}/src/connector-0.1-py3-none-any.whl"
Connections:
Connections:
- !Ref TargetConnection

Here we are creating three Glue jobs: extraction, transformation, and load. Each job has various properties, but the most important ones are the role, command, and connections. You can see the complete list of properties and their meaning here. The scripts for the jobs are stored in an S3 bucket, and we pass the location of the scripts to the jobs. We also pass the location of the libraries that will be used by these scripts. The jobs run in a serverless environment, so there is no need to worry about scaling them up or down. Once an AWS Glue job is finished, the resources it used will be released automatically. The connections specify the source and target resources that will be used by the job.

AWS State Machine:
StepFunctionsServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: GlueJobExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- glue:StartJobRun
- glue:GetJobRun
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/${ExtractionJob}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/${TransformationJob}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/${LoadJob}
StateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: !Sub ${Stack}
RoleArn: !GetAtt StepFunctionsServiceRole.Arn
DefinitionString:
!Sub |
{
"StartAt": "ExtractionJob",
"States": {
"ExtractionJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "${Stack}-extraction-job"
},
"Next": "TransformationJob"
},
"TransformationJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "${Stack}-transformation-job"
},
"Next": "LoadJob"
},
"LoadJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "${Stack}-load-job"
},
"End": true
}
}
}

Here we are creating a state machine that will orchestrate the Glue jobs. The state machine contains a set of steps that will be executed sequentially, in parnell or both. We need to pass it a role that will be used to execute the jobs. Enough with the CloudFormation, let's move on to the Glue jobs code.

AWS Glue Jobs walkthrough:

As you got the idea of how to create the resources, we can now proceed with the jobs code. You can choose Python, Scala, or Java for your Glue jobs as of now. Maybe some other programming languages will be added later. But, for now, we will be using Python to create scripts for the jobs, and we will start with the extraction job.

import json
import boto3
from awsglue.utils import getResolvedOptions
import connector
s3 = boto3.client("s3")
s3_bucket = "bucket_name"
s3_prefix = "extracted-data/"
table_name = "table_name"
def extract_data():
connection = connector.get_connection()
cursor = connection.cursor()
last_id = 0
page_count = 0
page_size = 1000
while True:
query = f"SELECT * FROM {table_name} WHERE id > {last_id} ORDER BY id ASC LIMIT {page_size}"
cursor.execute(query)
rows = cursor.fetchall()
last_id = rows[-1]["id"]
file_name = f"{s3_prefix}page_{page_count}.json"
body = json.dumps(rows).encode()
# Upload compressed data to S3
s3.put_object(Bucket=s3_bucket, Key=file_name, Body=body)
page_count += 1
cursor.close()
connection.close()
extract_data()

As you can see we simply extract data from a MySQL database and upload it to an S3 bucket in json format. We are fetching data in batches an iterate over potentially a large table, that's why I'm suing WHERE > {last_id} statement. However, your table may not have similar kind of column to iterate over, so you can use LIMIT and OFFSET for the pagination. Play around with your extraction query and make sure that it utilizes indexes to avoid full table scans and performs in the most optimal way. This post is not about optimizing SQL queries, so I will leave it to you. I haven't explained connector.get_connection() call yet (we will get there), but for now, let's move on to transformation job.

import boto3
import json
from awsglue.utils import getResolvedOptions
s3 = boto3.client("s3")
s3_bucket = "bucket_name"
s3_extracted_prefix = "extracted-data/" # here we store extracted, original data
s3_transformed_prefix = "transformed-data/" # here we store transformed data
def transform_data():
response = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_extracted_prefix)
while response.get("Contents"):
for obj in response["Contents"]:
key = obj["Key"]
data = s3.get_object(Bucket=s3_bucket, Key=key)["Body"].read().decode("utf-8")
# Perform transformation on data
transformed_data = [
transformed_row
for row in json.loads(data)
transformed_row := transform_row(row)
]
transformed_key = key.replace(s3_extracted_prefix, s3_transformed_prefix)
s3.put_object(Bucket=s3_bucket, Key=transformed_key, Body=json.dumps(transformed_data).encode("utf-8"))
def transform_row(row):
# just converting id that is a number in the original data to a string,
# so it's compatible with the target schema,
# but as you can imagine anything can be done here
transformed_row["id"] = str(row["id"])
return transformed_row
transform_data()

In the transformation job, we are transforming the data from the source format to the target format. We are fetching the data from the S3 bucket, transforming it, and uploading it back to another folder within the same S3 bucket. The transformation is just an example in this case, but you can make it as complex as needed. Few things to note, when you work with large S3 buckets, where you have thousands of objects - list_objects_v2 call won't return all of them in one go, but just a page of 1000 objects. You need to iterate over the pages to get all the objects (check NextContinuationToken property inside each response). Also, it's always nice to have a safe-net and validate transformation results before uploading them to S3, for example checking it against a JSON schema. Now let's move to the load job.

import boto3
import json
from awsglue.utils import getResolvedOptions
import connector
s3 = boto3.client("s3")
s3_bucket = "bucket_name"
s3_prefix = "transformed-data/"
table_name = "transformed_table_name"
def load_data():
response = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
connection = connector.get_connection()
cursor = connection.cursor()
while response.get("Contents"):
for obj in response["Contents"]:
key = obj["Key"]
data = s3.get_object(Bucket=s3_bucket, Key=key)["Body"].read().decode("utf-8")
for row in json.loads(data):
query = f"INSERT INTO {table_name} ({', '.join(row.keys())}) VALUES ({', '.join(['%s'] * len(row.values()))})"
cursor.execute(query, list(row.values()))
connection.commit()
cursor.close()
connection.close()
load_data()

In the load job, we are loading the transformed data into the target database. It looks very similar to the extraction job, so there is not much to go though. Except, as we need to specify row names inside insertion query - I get them from row.keys(), which is nice as we rely on the transformed data format instead of hardcoding row names somewhere. But it's just a personal preference. Also, we are using the same connector to connect to the database, now it's time to review it.

import pymysql
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
def get_connection():
sc = SparkContext()
glueContext = GlueContext(sc)
args = getResolvedOptions(sys.argv, ["JOB_NAME", "connection-name"])
connection_options = glueContext.extract_jdbc_conf(args["connection_name"])
url = connection_options["fullUrl"]
# DB's host, port and name are extracted from the full url defined in the connection,
# it follows the format like "jdbc:mysql://host:port/name", i will leave this part to you
# user and password are extracted from the connection options
db_user = connection_options["user"]
db_password = connection_options["password"]
# now we can return the connection to the jobs that use this module
return pymysql.connect(
host=db_host,
user=db_user,
password=db_password,
database=db_name,
port=int(db_port),
cursorclass=pymysql.cursors.DictCursor,
)
Packaging & Deployment:

I thought it would be a nice bonus to cover the topic of now to include custom modules inside scripts used in AWS Glue. As both extraction and load jobs work with with the database we may want to provide a reusable way of doing it. So instead of repeating the same code in both jobs we will rather create a connector module. The module will be uploaded to S3 bucket and then we will pass the location of the module to the jobs. The module itself is a simple script that uses PyMySQL to interact with the databases. We are using GlueContext to extract the connection options from the connection passed to the job. In order to package connector module we need to install Python setup tools and run a few commands to create a wheel file. Create a setup.py file with the following content:

from setuptools import setup
setup(name="connector", version="0.1", py_modules=["connector"])

Then run the following commands to create a wheel file:

pip install setuptools wheel
python setup.py bdist_wheel

It will create a dist folder with a wheel file inside, upload it so S3 bucket taking into account what path is used in the jobs.

As we are using PyMySQL or potentially any other library that is not available in the Glue environment by default, we need to package it and upload to S3 bucket. The process is similar to the one we used for the connector module. Create requirements.txt file with the following content:

pymysql
fastjsonschema

Then run the following commands to create zip file:

pip install -r requirements.txt --target package
zip -r libs.zip package

To deploy the CloudFormation template, you first need to upload the scripts and libraries to the S3 bucket. In this case, the scripts and dependencies are uploaded to the "src" folder within the bucket. Once CloudFormation stack is successfully created, you can start the state machine to run the jobs and monitor their progress in the AWS Glue console. The details of the jobs can also be viewed in CloudWatch Logs. It is recommended to use AWS Step Functions to orchestrate the jobs and ensure they are executed in the correct order, although individual AWS Glue jobs can also be run manually.

Conclusion:

And there you have it, a simple yet comprehensive story of how to migrate data of different formats between different storages. I described some niche nuances that emphasize how flexible and powerful AWS Glue is. It's a great tool for ETL tasks, and it can be used for a wide range of use cases. However, I didn't cover all the features of this AWS resource, it could be a subject for another post... Happy coding!

© 2024 by Igor Tereshchenko. All rights reserved.