Building Apache Beam Data Pipeline (Source: Pixabay) |
In introduction article of this series Streaming Analytics in Google Cloud Platform (GCP) - Introduction, we have seen the basics of streaming analytics, its importance and example uses cases, and short introduction about the Google Cloud Services, we will be using to build Streaming Analytics system in Google Cloud Platform.
The second article Streaming Analytics in Google Cloud Platform (GCP) - Setting Up The Environment, covers the instructions for setting up development and deployment environment in Google Cloud Platform. In this current article, we will be building a data pipeline using Apache Beam Python SDK, we will cover the theoretical aspects of Apache Beam pipeline components first and then discuss the relevant code.
What is a Pipeline in data processing?
In simple terms, a data pipeline is a series of connected steps or stages through which raw data is transformed or processed in order to extract insights or perform specific tasks. Data pipelines can be used to perform a wide range of tasks such as data ingestion, cleaning and transformation, analysis and visualisation. The data can be either bounded (batch) or unbounded (stream) and typically involves extracting data from one or more sources, cleaning and processing, and then storing the results in a target location.
Pipeline in Apache Beam
In Apache Beam, a pipeline is created by building a Directed Acyclic Graph (DAG) of data transformation, where the input is passed through a series of steps or transforms, before being output to the desired location.
Directed Acyclic Graph (DAG) is a graph which consists of nodes (also known as vertices) and edges, where the edges have directions and do not form any cycles or loops and the edges always point in the same direction.
In a data processing pipeline, nodes represent the different transformations or operations that are applied to the input data and the edges represent the flow of data from one transformation to another.
As we have seen in the first part - Streaming Analytics in Google Cloud Platform (GCP) - Introduction, Beam provides many SDKs to build a pipeline and it supports various runners to execute the pipeline. The following pipeline reads data from a text file, does some transformation and writes it back to a text file.
# Import the Apache Beam
import apache_beam as beam
# Create a Pipeline object
pipeline = beam.Pipeline()
# Input: Read raw input data from source
input_data = pipeline | "read input" >> beam.io.ReadFromText('./input.txt')
# Transformation: Perform a simple transformation
transformed_data = input_data | 'transform data' >> beam.Map(lambda x: int(x) * 2)
# Output: write the transformed data to an output file
transformed_data | 'write output' >> beam.io.WriteToText('output.txt')
# Run the pipeline
pipeline.run()
This pipeline reads data from the input.txt file and performs a transformation of converting to integer and multiplying by 2 and finally stores the result in another text file output.txt
Create text file input.txt and insert a number and run the pipeline by executing this code as follows:
touch input.txt
echo 2 >> input.txt
Python first_pipeline.py
This will create an output text file output.txt, lets's read the output file - it should contain 4, lets verify:
cat output.txt
Now, you have successfully created your first pipeline in Apache Beam!!!
Let's go back to our code and review it again, you may have noticed, we have used some variables to store intermediate results. For example, input_data holds the data from the ReadFromtext() class and transformed_data holds Map() class. In Apache Beam, these variables are called pcollections and operations are called ptransforms. Having a good understanding of pcollection and transforms are very important for building a streaming pipeline, let's cover those topics, before moving to the next items.
PCollection in Apache Beam
PCollection (Parallel Collection) is an abstract representation of a distributed dataset, that can be processed in parallel. In Apache beam, it is the main dataset for representing a set of data that can be processed in parallel.
Reading from files, and reading from PubSub subscriptions are examples of creating a pcollection, also, transforming an existing pcollection also will create a new pcollection. Transformations such as filtering, mapping, grouping and aggregation are applied to the pcollection.
Example pcollection:
input_data = pipeline | "read input" >> beam.io.ReadFromText('./input.txt')
transformed_data = input_data | 'transform data' >> beam.Map(lambda x: int(x) * 2)
Some of the properties of pcollection are given below for your reference:
Immutable - once created, cannot be modified, to modify, apply a transformation on the original pcollection
pcollection may be distributed across multiple machines in a distributed processing environment - allowing parallel processing.
pcollection is not processed until is executed, such as writing to an output file - lazily evaluated.
pcollection can be bounded (batch - ex: file) or unbounded (stream - ex:pubsub )
pcollection can be partitioned into logical chunks called Windows based on event timestamp.
PTransforms in Apache Beam
PTransform (Parallel Transform) in Apache Beam holds the processing logic, it takes a pcollection as input, applies the transformation logic and gives a transformed pcollection as output.
pcollection1 -> ptransform1 -> pcollection2 -> ptransform2 -> pcollection3 -> ptransform3
In a data pipeline, input raw data as pcollection goes through multiple ptransform and the final ptransform produce a final pcollection which will be the output for the sink.
Example:
# Transformation: Perform a simple transformation
transformed_data = input_data | 'transform data' >> beam.Map(lambda x: int(x) * 2)
In this example, input pcollection input_data will be processed using Map transform and output will be stored in a pcollection called transformed_data.
Note in Apache Beam reading from source and writing to sink operations are ptransforms. See the below example:
# Input: Read raw input data from source
input_data = pipeline | "read input" >> beam.io.ReadFromText('./input.txt')
transformed_data | 'write output' >> beam.io.WriteToText(‘output.txt')
Apache Beam provides many pre-built ptransform with some of the commonly used ptransforms including:
Map: Applies a function to each element of a PCollection and returns a new PCollection containing the transformed elements.
Filter: Keeps only elements from a PCollection that satisfy a certain predicate and returns a new PCollection containing the filtered elements.
FlatMap: Applies a function to each element of a PCollection and returns a new PCollection containing the transformed elements, which may be of a different size than the input PCollection.
Combine: Aggregates elements of a PCollection using a specific combiner function and returns a single output element.
GroupByKey: Groups the elements of a PCollection by key and returns a PCollection of key-value pairs, where the values are grouped into an iterable object.
We have a good understanding of the building blocks of the Apache Beam data pipeline, let's put together all our learning till this point, and design a streaming pipeline in Apache Beam.
Streaming Pipeline in Apache Beam
A streaming pipeline is a pipeline that processes data in real-time as it is received at the system. Typically used to process data from streaming sources, such as sensors, logs, social media feeds, financial ticker logs, etc.
In the streaming pipeline, the input data is typically from an unbounded data source which has an unlimited number of elements and a series of PTransform are applied to an unbounded PCollection.
Let's build a streaming pipeline in Apache Beam, using the pcollection, and ptransform as discussed above:
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
# define the Pub/Sub subscription idSUBSCRIPTION = "projects/projectname/subscriptions/subscriptionname"
#Set the BigQuery schema.SCHEMA = 'name:STRING,age:INTEGER'
# Set the table name.TABLE_NAME = 'project:dataset.table'
def parse_json_message(message):"""Parse the input json message """
row = json.loads(message)
return {
"name": row["name"],
"age": row["age"]
}
# Define pipeline options - to enable streaming modeoptions = PipelineOptions()
options.view_as(StandardOptions).streaming = True
# Create a pipelinepipeline = beam.Pipeline()
pipeline = beam.Pipeline(options=options)
# Read from a Cloud Pub/Sub topic and create a PCollectioninput_pcoll = pipeline | 'Read message from PubSub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION)
message_string = input_pcoll | "UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode("utf-8"))
json_message = message_string | "Parse JSON messages" >> beam.Map(parse_json_message)
output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(table=TABLE_NAME, schema=SCHEMA)
#Run the pipelinepipeline.run().wait_until_finish()
The above code reads messages from Pub/Sub subscription (unbounded data source) and writes to BigQuery table (sink). The code defines pcollections, ptransforms and additionally pipeline options, BigQuery streaming options.
Pipeline options used to configure different aspects of pipeline such as runner and runner specific configurations, GCP project, region, job name, etc. In our code, we have specified options.view_as(StandardOptions).streaming = True to enable streaming.
We use BigQueryIO to write messages to BigQuery table, WriteToBigQuery transform requires table name, schema, table’s create disposition and tables write disposition.
Table name - name of the destination table
Schema - destination table schema
Create disposition - specifies whether the destination table must exist or can be created by the write operation
Write disposition - specifies whether write will be truncate and write or append rows to an existing table or write only to an empty table
output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery( table=TABLE_NAME schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
Here, table name is a fully-qualified BigQuery table name and it consist of three parts:
Project ID: Google Cloud Project ID
Dataset ID: BigQuery dataset ID
Table ID: BigQuery table ID
And it is specified as follows:
[project_id]:[dataset_id].[table_id]
Deploying Apache Beam Pipeline on GCP Cloud Dataflow
You have successfully build the data pipeline using Apache Beam Python SDK, it is time to deploy the Pipeline on GCP Cloud Dataflow. In the first introductory article we have discussed about Cloud Dataflow, if you have not read that article, it is time to review quickly. To deploy a pipeline as dataflow job, we will be using command line arguments and we will pass following arguments to our pipeline as pipeline options.
Jobname - Name of dataflow job
Runner - Name of runner (dataflow runner)
Project - GCP project ID
Region - GCP Compute Engine region
Staging Location - Cloud Storage bucket name for staging binary and temporary files
Temporary Location - Path for temporary files
Network - The Compute Engine network for launching Compute Engine instances to run your pipeline
Subnetwork - The Compute Engine subnetwork for launching Compute Engine instances to run your pipeline
Lets update the Pipeline options in our code:
Import the Pipeline option related libraries:
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
Import Argument Parser to get command line input to our pipeline code:
import argparse
Update our pipeline option to get Pub/Sub subscription for input, output_table to specify the BigQuery table name, and other GCP options as listed above.
parser = argparse.ArgumentParser()
parser.add_argument("--subscription",help="Input PubSub subscription of the form " '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."')
parser.add_argument("--output_table", help="BigQuery table of the form " '"project:dataset:table"')
parser.add_argument('--project',required=True, help='Specify Google Cloud project')
parser.add_argument('--region', required=True, help='Specify Google Cloud region')
parser.add_argument('--staging_location', required=True, help='Specify Cloud Storage bucket for staging')
parser.add_argument('--temp_location', required=True, help='Specify Cloud Storage bucket for temp')
parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')
parser.add_argument('--network', required=True, help='Specify network')
parser.add_argument('--subnetwork', required=True, help='Specify subnetwork')
opts, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
options.view_as(GoogleCloudOptions).project = opts.project
options.view_as(GoogleCloudOptions).region = opts.region
options.view_as(GoogleCloudOptions).staging_location = opts.staging_location
options.view_as(GoogleCloudOptions).temp_location = opts.temp_location
options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('stream-analytics-pipeline',time.time_ns())
options.view_as(StandardOptions).runner = opts.runner
options.view_as(WorkerOptions).network = opts.network
options.view_as(WorkerOptions).subnetwork = opts.subnetwork
options.view_as(WorkerOptions).use_public_ips = False
p = beam.Pipeline(options=options)
Change the pipeline transformation code to get subscription name from command line argument:
input_pcoll = pipeline | 'Read message from PubSub' >> beam.io.ReadFromPubSub(subscription=opts.subscription)
output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(
table=opts.output_table,
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method='STREAMING_INSERTS')
output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(table=opts.output_table, schema=SCHEMA)
import jsonimport apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionsfrom apache_beam.options.pipeline_options import StandardOptionsfrom apache_beam.options.pipeline_options import SetupOptionsfrom apache_beam.options.pipeline_options import GoogleCloudOptionsfrom apache_beam.options.pipeline_options import WorkerOptionsfrom apache_beam.io.gcp.bigquery import WriteToBigQueryimport argparseimport loggingimport time# Set the BigQuery schema.SCHEMA = 'name:STRING,age:INTEGER'def parse_json_message(message):"""Parse the input json message """row = json.loads(message)return {"name": row["name"],"age": row["age"]}def run(argv):parser = argparse.ArgumentParser()parser.add_argument("--subscription",help="Input PubSub subscription of the form " '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."')parser.add_argument("--output_table", help="BigQuery table of the form " '"project:dataset:table"')parser.add_argument('--project',required=True, help='Specify Google Cloud project')parser.add_argument('--region', required=True, help='Specify Google Cloud region')parser.add_argument('--staging_location', required=True, help='Specify Cloud Storage bucket for staging')parser.add_argument('--temp_location', required=True, help='Specify Cloud Storage bucket for temp')parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')parser.add_argument('--network', required=True, help='Specify network')parser.add_argument('--subnetwork', required=True, help='Specify subnetwork')opts, pipeline_args = parser.parse_known_args(argv)options = PipelineOptions(pipeline_args)options.view_as(SetupOptions).save_main_session = Trueoptions.view_as(StandardOptions).streaming = Trueoptions.view_as(GoogleCloudOptions).project = opts.projectoptions.view_as(GoogleCloudOptions).region = opts.regionoptions.view_as(GoogleCloudOptions).staging_location = opts.staging_locationoptions.view_as(GoogleCloudOptions).temp_location = opts.temp_locationoptions.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('stream-analytics-pipeline',time.time_ns())options.view_as(StandardOptions).runner = opts.runneroptions.view_as(WorkerOptions).network = opts.networkoptions.view_as(WorkerOptions).subnetwork = opts.subnetworkoptions.view_as(WorkerOptions).use_public_ips = Falsep = beam.Pipeline(options=options)# Create a pipelinepipeline = beam.Pipeline()pipeline = beam.Pipeline(options=options)# Read from a Cloud Pub/Sub topic and create a PCollectioninput_pcoll = pipeline | 'Read message from PubSub' >> beam.io.ReadFromPubSub(subscription=opts.subscription)string_message = input_pcoll | "UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode("utf-8"))json_message = string_message | "Parse JSON messages" >> beam.Map(parse_json_message)output_pcoll = json_message | 'Write to BigQuery table' >> WriteToBigQuery(table=opts.output_table, schema=SCHEMA)# Run the pipelinepipeline.run().wait_until_finish()if __name__=="__main__":logging.getLogger().setLevel(logging.INFO)run()
Deploy our code to Google Cloud Dataflow:
export BQTABLE="your_big_query_table_name_in_fully_qualified_name"export SUBSCRIPTION="your_pubsub_subscription"export PROJECT="$(gcloud config get-value project)"export BUCKET="gcs_bucket_name"export REGION="asia-southeast1"export NETWORK="your_network"export SUBNETWORK="https://www.googleapis.com/compute/v1/projects/your_project_id/regions/asia-southeast1/subnetworks/your_subnet"export TEMPLOCATION="gs://$BUCKET/tmp"export STAGELOCATION="gs://$BUCKET/dev"python -m streaming \--output_table "$BQTABLE" \--subscription "$SUBSCRIPTION" \--runner DataflowRunner \--project "$PROJECT" \--region "$REGION" \--subnetwork="$SUBNETWORK" \--network="$NETWORK" \--staging_location="$TEMPLOCATION" \--temp_location="$STAGELOCATION"
After few minutes, your can check your job named stream_analytics in Google Cloud Console under Dataflow jobs section.
Summary
We have covered what is data pipelines in the context of data processing, building data pipeline in Apache Beam and various components of Apache Beam data pipeline such as pcollection, ptransform and pre-built core transforms. Also, we have covered streaming pipeline and defined a streaming pipeline using Apache Beam Python SDK and gone through the pipeline options, PubSub IO and BigQuery IO features. And finally, we have deployed our data pipeline on Google Cloud Dataflow runner.
That’s all for this part, we will introduce new concept in next article of this series. Until then, have a wonderful day!”
References
Directed Acyclic Graph (DAG): https://en.wikipedia.org/wiki/Directed_acyclic_graph
Beam Concepts: https://beam.apache.org/documentation/
Stream Analytics in GCP - introduction: https://www.rathishkumar.com/2023/01/streaming-analytics-in-google-cloud.html
Stream Analytics in GCP - setting up the environment: https://www.rathishkumar.com/2023/01/streaming-analytics-in-gcp-setting-up-environment.html