Artificial Intelligence and Machine learning is at the forefront of a revolution in the businesses of almost every industry.

According to the global survey by Mckinsey (November 2020), 22% of the respondents attributed at least 5% of the earnings before interests and taxes to AI adoption. The earnings percentage of the respondents who are considered as high performers in AI-ML transformation, is even higher – close to 20%.

If we step into the back stage of AI adoption, it goes without saying that such state of the art systems can only be built on reliable and resilient data plumbing foundations. In other words, the quintessential data pipelines with smart controls is the backbone of smart analytics.

The data pipelines can be defined as “systems that consolidate the data from disparate sources into a warehouse , where the data can be further transformed to infer some insights”.

The main actions involved in the data pipelines are Extracting the data (E), Transforming the data (T) and Loading the data (L) . The order of these actions is defined by business requirements and technical capabilities and therefore a data pipeline can be an ETL or ELT pipeline.

Due to the numerous types of sources, technologies and must-have resilience characteristics, the traditional development and implementation of such data pipelines demands an awful amount of time and ingenuity of developers.

In this blog, we will look at one such ELT pipeline requirement, the problems that needed to be solved, the challenges in the design and implementation. We will then look at a reference architecture that can solve the challenge in under 60 minutes!

Assume you have an OLTP database that records event transactions – it is this data that we want to stream into a data warehouse. Of course, there are numerous ELT tools in the marketplace most of which entail additional cost/subscription or place inherent restrictions around the flexibility or scalability.

Here comes the Meltano for rescue. An open source ELT pipeline framework. Here is the architecture pattern that we are going to implement.

Architecture

  1. We will build a container image with Meltano tap (component that extracts data) and target (component that loads the data) configuration. The image is maintained in the Cloud Container Registry service.  The image can also be maintained in your own corporate registry.
  2. We use Cloud Composer to orchestrate the execution of the container image. 
  3. Cloud PostGre SQL is used to maintain the replication status of the pipeline. 
  4. Cloud Build is used to enable CI/CD of the DAG.
  5. Secrets Manager is used to hold the secrets.

Connectivity between Cloud composer and Cloud SQL

In this demonstration, I have ring fenced both the Cloud composer and Cloud SQL within the same vpc network and used Cloud SQL private IP in the Composer DAG to connect to SQL database.  This way, the egress traffic will not leave the network. 

However if you are already using Cloud SQL service in your estate, you might have Cloud SQL Auth Proxy up and running. This auth proxy can be used to connect to the SQL service. If you have a road map to use Cloud SQL for transactional load and are thinking of building services that access this data, I highly recommend setting up Cloud Auth Proxy.  You can find the details here.

Security

  1. Add ingress rules to the AWS RDS and allow GCP Kubernetes pod address range. ( You can get these from Cloud Composer Environment details).
  2. Meltano pipeline will need service account credentials to load the data to Big query. I have created a service account and assigned a  ‘big query job user’ role with edit access to the target dataset.
  3. We need AWS RDS user account credentials and GCP Service account (created above)  to run the pipeline. There are multiple ways to manage secrets safely instead of adding the credentials to container image. In this demonstration, I wanted to show two different ways. I prefer the first approach as it comes with all the industry standard secrets management.

a) I used GCP Secrets Manager to store the AWS RDS account details. Cloud Composer natively integrates with Secrets Manager and extracts the details. Look Here.

b) The GCP Service Account credential file is stored on the Cloud Composer cluster as a Kubernetes Secret The secret object is passed to the Pod during execution.  I recommend an alternate approach where we add the credential file as well to the GCP Secrets Manager and add an additional predecessor Airflow task in the DAG to extract the file from Secrets Manager and set up Kubernetes Secret. Add another task as successor to the Pod execution to delete the Kubernetes Secret object. This way, we will maintain the secret on the Kubernetes only for Pod execution.

Development

meltano.yml – a configuration file which contains the end point details of source and destination.  Here is the configuration file.

version: 1
send_anonymous_usage_stats: false
project_id: e96bc4d0-e673-466d-9a8a-80ae5db8fbdd
plugins:
 extractors:
 - name: tap-postgres
   variant: transferwise
   pip_url: pipelinewise-tap-postgres
   config:
     host: <#TODO: add the AWS RDS host name here>
     port: 5432  
     user: <#TODO: add the AWS RDS user name here>  
     dbname: <#TODO: add the AWS RDS database name here>
     default_replication_method: FULL_TABLE
   metadata:
    <#TODO: fully qualified table name. I.e schemaname-tablename>:
       replication-method: INCREMENTAL
       replication-key: < #TODO: column which should be used for replication >
 loaders:
 - name: target-bigquery
   variant: adswerve
   pip_url: git+https://github.com/adswerve/target-bigquery.git@v0.10.2
   config:
     project_id: < #TODO:GCP Project name >
     dataset_id:  < #TODO: Bigquery Dataset name >
     location: europe-west2
     add_metadata_columns: true
     table_prefix: meltano_extract_

Few things to note about this configuration file –

  1. The ‘extractors’ section holds the details of the source i.e the postgres database in this case. You can find the details of each parameter here.
  2. Under “metadata”, specify the tables that need to be replicated based on the key. All other tables will use the relocation setting defined against default_replication_method. More on replication is here. You can repeat the section under metadata for each table that you want to stream.
  3. You can filter the tables that don’t need to be replicated. You can find more on this here.
  4. The section ‘loader’ holds the information about the target system , in this case Big Query. Project id, dataset id , location are mandatory parameters. add_metadata_columns adds additional metadata columns to the table. You can find more about the parameters here.
  5. Notice that we have not coded the AWS RDS user password or the credentials for the GCP service account. As discussed in the security section, we will manage this outside the container image.
  6. See the replication details here. If necessary, you can define a view to avoid the duplicates.

Dockerfile is used to create the container image with mletano and respective taps and targets installed.

# Pull base image
FROM ubuntu
RUN apt-get update

# Install python3
RUN apt-get install python3 -y
RUN apt-get install python3-pip -y
RUN apt-get install vim -y

# Install python3 venv and git
RUN apt-get install python3-venv -y
RUN apt-get install git -y

# Install meltano
RUN pip3 install --upgrade meltano

# Install dependencies
COPY ./requirements.txt .
RUN pip3 install -r requirements.txt

# Setup meltano project
WORKDIR /projectdir
RUN meltano init postgres2bq
WORKDIR /projectdir/postgres2bq

# Install required tap and target
RUN meltano add extractor tap-postgres
RUN meltano add loader target-bigquery

# Copy meltano.yml file
COPY meltano.yml .

Things to note –

  1. Install Ubuntu from the base image.
  2. Install python3 , pip and virtual environment. Note that Meltano creates virtual environments to avoid dependency conflicts between various taps and targets.
  3. Then I installed meltano and initialised a project. In this case it is ‘postgres2bq’.
  4. I then installed tap-postgres and target-bigquery.
  5. Copy the meltano.yml file created in the first step.

NOTE: If you have golden images available for Python3 already in your corporate registry, you can build the base image using those.

Run Docker build command to create the container image and push the image to the container registry.

postgres-to-bq.py is the DAG that orchestrates the container image. When Cloud Composer runs this DAG, all it does is, connects to the container registry, pulls the image to a Pod and runs the image. Here is the script.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.kubernetes.secret import Secret
from airflow.models.variable import Variable


default_args = {
'owner' : 'Appsbroker',
'description' : 'Run Meltano EL pipeline',
'depend_on_past' : False,
'start_date' : datetime(2018, 1, 3),
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 0,
'retry_delay' : timedelta(minutes=5) 
}

state_db=Variable.get('meltano-db')
postgres_password=Variable.get('rds-user')

with DAG('docker_dag',
default_args=default_args,
schedule_interval=None,
catchup=False) as dag:

secret_volume = Secret(
deploy_type='volume',
deploy_target='/var/secrets/meltanobqsa',
secret='service-account',
key='service-account.json')

meltano_task = kubernetes_pod_operator.KubernetesPodOperator(
task_id='meltano-pgs-to-bq-el',
name='meltano-pgs-to-bq-el',
namespace='default',
env_vars={'TAP_POSTGRES_PASSWORD': postgres_password,
# 'MELTANO_LOG_LEVEL': 'debug',
# 'MELTANO_CLI_LOG_LEVEL' : 'debug',
'MELTANO_DATABASE_URI' : state_db,
'TARGET_BIGQUERY_CREDENTIALS_PATH' :'/var/secrets/meltanobqsa/service-account.json'
},
cmds=['meltano',
'elt',
'tap-postgres',
'target-bigquery',
'--job_id=pg2bq'],
secrets=[secret_volume],
image=#TODO Container Image URI,
affinity={'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [{
'matchExpressions': [{
'key': 'cloud.google.com/gke-nodepool',
'operator': 'In',
'values': [
'pool-0',
'pool-1']
}]
}]
}
}
}
)

meltano_task

Things to note –

  1. Get the variables from the secrets manager. Make sure these are set up correctly.
  2. In the DAG, before we execute the image, we will first define the mount point for the kubernetes secret. Pod will get the secrets from this mount point.
  3. Note that the environment variables are set up for the secrets. In the above program , statedb is the uri of the PostGres SQL with in Google cloud platform. Meltano will use this database to keep track of the replication status of the pipeline.
  4. This DAG can run alongside your existing DAGs. You can additionally define node affinity to avoid resource constraints. Details are here. ( Note that Google released new Autoscaling Composer environment)

Add this DAG to the Cloud Composer Environment and initiate the DAG. You should see data flowing into Big Query. Meltano automatically maintains the replication status based on the column you mention. 

So here goes a streaming pipeline to get your OLTP data into a Big query.

Happy coding 🙂 

PS: I have not covered the CI/CD bit here – look out for it in a future blog!