There are plenty of use cases when AWS Data Pipelines could save a fortune and speed up business decisions. Firstly, it is serverless that means you do not have anything on your server if you do not want to. Everything starts entirely on AWS side and you pay only per execution.
Advantages of Data Pipeline
- Analyse daily users’ behaviour through extracting data from logs
- Analyse transactions for payment system
- Analyse stock exchange reports. And many more
So, Data Pipeline allows you to spin up entire infrastructure needed for Hadoop cluster. Run all logic you desire to process your data with and shut down.
Main steps of implementing Data Pipeline
- Bootstrapping cluster. At this point settings which are organizing how many core and slave instances are gonna be launched. What kind of memory settings for instances and how much memory JVMs will use while running Hadoop tasks.
[code language=”groovy”] //Creating object to define EMR cluster properties
static PipelineObject getEMRCluster() {
Field type = new Field()
.withKey("type")
.withStringValue(PipelineConstants.EMR_CLUSTER)
Field amiVersion = new Field()
.withKey("amiVersion")
.withStringValue("#{${PipelineConstants.MY_AMI_VERSION}}")
Field masterInstanceType = new Field()
.withKey("masterInstanceType")
.withStringValue("#{${PipelineConstants.MY_MASTER_INSTANCE_TYPE}}")
Field coreInstanceType = new Field()
.withKey("coreInstanceType")
.withStringValue("#{${PipelineConstants.MY_CORE_INSTANCE_TYPE}}")
Field coreInstanceCount = new Field()
.withKey("coreInstanceCount")
.withStringValue("#{${PipelineConstants.MY_CORE_INSTANCE_COUNT}}")
Field region = new Field()
.withKey("region")
.withStringValue("#{${PipelineConstants.MY_DDBREGION}}")
Field terminateAfter = new Field()
.withKey("terminateAfter")
.withStringValue("50 Minutes")
Field bootstrapAction = new Field()
.withKey("bootstrapAction")
.withStringValue("s3://elasticmapreduce" +
"/bootstrap-actions/configure-hadoop, " +
"–yarn-key-value,yarn.nodemanager.resource.memory-mb=11520," +
"–yarn-key-value,yarn.scheduler.maximum-allocation-mb=11520," +
"–yarn-key-value,yarn.scheduler.minimum-allocation-mb=1440," +
"–yarn-key-value,yarn.app.mapreduce.am.resource.mb=2880," +
"–mapred-key-value,mapreduce.map.memory.mb=5760," +
"–mapred-key-value,mapreduce.map.java.opts=-Xmx4608M," +
"–mapred-key-value,mapreduce.reduce.memory.mb=2880," +
"–mapred-key-value,mapreduce.reduce.java.opts=-Xmx2304m," +
"–mapred-key-value,mapreduce.map.speculative=false")
List fieldsList = Lists.newArrayList(type,
amiVersion,
masterInstanceType,
coreInstanceCount,
coreInstanceType,
region,
terminateAfter,
bootstrapAction)
return new PipelineObject()
.withName(PipelineConstants.EMR_CLUSTER_NAME)
.withId(PipelineConstants.EMR_CLUSTER_NAME)
.withFields(fieldsList)
}[/code]
- Running steps defined in settings. Steps in data pipeline terminology means Hadoop Jar applications.
[code language=”groovy”]
//As example of operation of restoring s3 resources in dynamo db
fieldsList << new Field()
.withKey("inputImport")
.withRefValue(PipelineConstants.INPUT_NODE_NAME)
fieldsList << new Field()
.withKey("outputImport")
.withRefValue(PipelineConstants.OUTPUT_NODE_NAME)
stepValue = "s3://dynamodb-emr-#{${PipelineConstants.MY_DDBREGION}}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,"
+"org.apache.hadoop.dynamodb.tools.DynamoDbImport,"
+"#{inputImport.${PipelineConstants.INPUT_NODE_DIRECTORY_NAME}},"
+"#{outputImport.tableName},"
+"#{outputImport.${PipelineConstants.WRITE_THROUGHPUT_PERCENT}}"
fieldsList << new Field()
.withKey("step_${step.name}")
.withStringValue(stepValue)
objects << PipelineImportObjectCreator.getS3SourceLocation()
objects << PipelineImportObjectCreator.getDDBDestinationTable()
//Example of exporting dynamo db to s3 step
stepValue = "#{${PipelineConstants.MY_EMRSTEP + step.id}}"
if (step.operation == PipelineStepOperation.ARCHIVE_OPERATION) {
fieldsList &amp;amp;amp;amp;lt;&amp;amp;amp;amp;lt; new Field()
.withKey("input")
.withRefValue(PipelineConstants.DDBSOURCE_TABLE)
fieldsList &amp;amp;amp;amp;lt;&amp;amp;amp;amp;lt; new Field()
.withKey("output")
.withRefValue(PipelineConstants.S3_BACKUP_LOCATION)
stepValue = "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-" +
"handler/2.1.0/emr-ddb-2.1.0.jar," +
"org.apache.hadoop.dynamodb.tools.DynamoDbExport," +
"#{output.${PipelineConstants.INPUT_NODE_DIRECTORY_NAME}}," +
"#{input.tableName}," +
"#{input.readThroughputPercent}"
objects << PipelineExportObjectCreator.getDDBSourceTable()
objects << PipelineExportObjectCreator.getS3BackupLocation()
[/code]
- Complete event at which cluster is getting terminated
Keep in mind that each of these listed steps could be joined with another SNS service. This is very powerful because it could lead to joining your app with data pipeline progress and results. So every step whether it succeeded or failed will generate an event which will send HTTP request to your app server informing about that step status. The app in order could take according action like informing users that results are ready or start another Data Pipeline based on results of previous. Important to remember that data pipeline could perform once or on some scheduled period. This means your Death Star cluster starting every midnight does important work and shuts itself down.
Steps could be various and with any complexity you wish. In simple words, every step is a Hadoop Java application. This also gives an advantage. Each step could be run separately in testing environment making sure it’s properly tested and ready to perform on live data.
Embedding into your app
could be fairly simple if you know what to do.
The steps are:
- Create empty pipeline.
[code]DataPipelineClient client = new DataPipelineClient(credentials)
CreatePipelineRequest request = new CreatePipelineRequest()
request.setName(pipelineName)
request.setUniqueId(pipelineUniqueId)
CreatePipelineResult result = client.createPipeline(request)
String pipelineId = result.pipelineId [/code] - Put pipeline definition. Here where you’re putting all your details and configure what should actually happen. Important thing here is when you assign your pipeline to SNS to track what happens, your app has to be able to confirm SNS subscription.
[code] PutPipelineDefinitionRequest putPipelineDefinition = new PutPipelineDefinitionRequest()
.withPipelineId(pipelineId)
.withParameterValues(parameterValues)
.withPipelineObjects(pipelineObjectList)
PutPipelineDefinitionResult putPipelineResult = dataPipelineClient.putPipelineDefinition(putPipelineDefinition) [/code] - Activate pipeline.
[code]
CampaignPipelineActivity activity = CampaignPipelineActivity.get(activityId)
PipelineCreator creator = new PipelineCreator()
ActivatePipelineResult result = creator.activatePipeline(client, activity.pipelineId)[/code]
- Enjoy
For further details and consultations please contact.