Build a synthetic data pipeline using Gretel and Apache Airflow

In this blog post, we build an ETL pipeline that generates synthetic data from a PostgreSQL database using Gretel’s Synthetic Data APIs and Apache Airflow.



By Drew Newberry, Software Engineer at Gretel.ai

Build a synthetic data pipeline using Gretel and Apache Airflow

Hey folks, my name is Drew, and I'm a software engineer here at Gretel. I've recently been thinking about patterns for integrating Gretel APIs into existing tools so that it's easy to build data pipelines where security and customer privacy are first-class features, not just an afterthought or box to check.

One data engineering tool that is popular amongst Gretel engineers and customers is Apache Airflow. It also happens to work great with Gretel. In this blog post, we'll show you how to build a synthetic data pipeline using Airflow, Gretel and PostgreSQL. Let's jump in!

 

What is Airflow

 
 
Airflow is a workflow automation tool commonly used to build data pipelines. It enables data engineers or data scientists to programmatically define and deploy these pipelines using Python and other familiar constructs. At the core of Airflow is the concept of a DAG, or directed acyclic graph. An Airflow DAG provides a model and set of APIs for defining pipeline components, their dependencies and execution order.

You might find Airflow pipelines replicating data from a product database into a data warehouse. Other pipelines might execute queries that join normalized data into a single dataset suitable for analytics or modeling. Yet another pipeline might publish a daily report aggregating key business metrics. A common theme shared amongst these use cases: coordinating the movement of data across systems. This is where Airflow shines.

Leveraging Airflow and its rich ecosystem of integrations, data engineers and scientists can orchestrate any number of disparate tools or services into a single unified pipeline that is easy to maintain and operate. With an understanding of these integration capabilities, we’ll now start talking about how Gretel might be integrated into an Airflow pipeline to improve common data ops workflows.

 

How does Gretel fit in?

 
 
At Gretel, our mission is to make data easier and safer to work with. Talking to customers, one pain point we often hear about is the time and effort required to get data scientists access to sensitive data. Using Gretel Synthetics, we can reduce the risk of working with sensitive data by generating a synthetic copy of the dataset. By integrating Gretel with Airflow, it’s possible to create self-serve pipelines that make it easy for data scientists to quickly get the data they need without requiring a data engineer for every new data request.

To demonstrate these capabilities, we’ll build an ETL pipeline that extracts user activity features from a database, generates a synthetic version of the dataset, and saves the dataset to S3. With the synthetic dataset saved in S3, it can then be used by data scientists for downstream modeling or analysis without compromising customer privacy.

To kick things off, let’s first take a bird’s eye view of the pipeline. Each node in this diagram represents a pipeline step, or “task” in Airflow terms.



Example Gretel synthetics pipeline on Airflow.

 

We can break the pipeline up into 3 stages, similar to what you might find in an ETL pipeline:

  • Extract - The extract_features task will query a database, and transform the data into a set of features that can be used by data scientists for building models.
  • Synthesize - generate_synthetic_features will take the extracted features as input, train a synthetic model, and then generate a synthetic set of features using Gretel APIs and cloud services.
  • Load - upload_synthetic_features saves the synthetic set of features to S3 where it can be ingested into any downstream model or analysis.

In the next few sections we’ll dive into each of these three steps in greater detail. If you wish to follow along with each code sample, you can head over to gretelai/gretel-airflow-pipelines and download all the code used in this blog post. The repo also contains instructions you can follow to start an Airflow instance and run the pipeline end to end.

Additionally, it may be helpful to view the Airflow pipeline in its entirety, before we dissect each component, dags/airbnb_user_bookings.py. The code snippets in the following sections are extracted from the linked user booking pipeline.

 

Extract Features

 
 
The first task, extract_features is responsible for extracting raw data from the source database and transforming it into a set of features. This is a common feature engineering problem you might find in any machine learning or analytics pipeline.

In our example pipeline we will provision a PostgreSQL database and load it with booking data from an Airbnb Kaggle Competition.

This dataset contains two tables, Users and Sessions. Sessions contains a foreign key reference, user_id. Using this relationship, we’ll create a set of features containing various booking metrics aggregated by user. The following figure represents the SQL query used to build the features.

 

WITH session_features_by_user AS (
  SELECT
    user_id,
    count(*) AS number_of_actions_taken,
    count(DISTINCT action_type) AS number_of_unique_actions,
    round(avg(secs_elapsed)) AS avg_session_time_seconds,
    round(max(secs_elapsed)) AS max_session_time_seconds,
    round(min(secs_elapsed)) AS min_session_time_seconds,
    (
      SELECT
        count(*)
      FROM
        sessions s
      WHERE
        s.user_id = user_id
        AND s.action_type = 'booking_request') AS total_bookings
    FROM
      sessions
    GROUP BY
      user_id
)
SELECT
  u.id AS user_id,
  u.gender,
  u.age,
  u.language,
  u.signup_method,
  u.date_account_created,
  s.number_of_actions_taken,
  s.number_of_unique_actions,
  s.avg_session_time_seconds,
  s.min_session_time_seconds,
  s.max_session_time_seconds
FROM
  session_features_by_user s
  LEFT JOIN users u ON u.id = s.user_id
LIMIT 5000


The SQL query is then executed from our Airflow pipeline and written to an intermediate S3 location using the following task definition.

@task()
def extract_features(sql_file: str) -> str:
    context = get_current_context()
    sql_query = Path(sql_file).read_text()
    key = f"{context['dag_run'].run_id}_booking_features.csv"
    with NamedTemporaryFile(mode="r+", suffix=".csv") as tmp_csv:
        postgres.copy_expert(
            f"copy ({sql_query}) to stdout with csv header", tmp_csv.name
        )
        s3.load_file(
            filename=tmp_csv.name,
            key=key,
        )
    return key


The input to the task, sql_file, determines what query to run on the database. This query will be read-in to the task and then executed against the database. The results of the query will then be written to S3 and the remote file key will be returned as an output of the task.

The screenshot below shows a sample result set of the extraction query from above. We will describe how to create a synthetic version of this dataset in the next section.



Query result preview.

 

Synthesize Features using Gretel APIs

 
 
To generate a synthetic version of each feature, we must first train a synthetic model, and then run the model to generate synthetic records. Gretel has a set of Python SDKs that make it easy to integrate into Airflow tasks.

In addition to the Python Client SDKs, we’ve created a Gretel Airflow Hook that manages Gretel API connections and secrets. After setting up a Gretel Airflow Connection, connecting to the Gretel API is as easy as

from hooks.gretel import GretelHook
 
gretel = GretelHook()
project = gretel.get_project()


For more information about how to configure Airflow connections, please refer to our Github repository README.

The project variable in the example above can be used as the main entrypoint for training and running synthetic models using Gretel’s API. For more details, you can check out our Python API docs.

Referring back to the booking pipeline, we’ll now review the generate_synthetic_features task. This step is responsible for training the synthetic model using the features extracted in the previous task.

@task()
def generate_synthetic_features(data_source: str) -> str:
    project = gretel.get_project()
    model = project.create_model_obj(
        model_config="synthetics/default", 
        data_source=s3.download_file(data_source)
    )
    model.submit_cloud()
    poll(model)
    return model.get_artifact_link("data_preview")


Looking at the method signature, you will see it takes a path, data_source. This value points to the S3 features extracted in the previous step. In a later section we’ll walk through how all these inputs and outputs are wired together.

When creating the model using project.create_model_obj, the model_config param represents the synthetic model configuration used to generate the model. In this pipeline, we’re using our default model config, but many other configuration options are available.

After the model has been configured, we call model.submit_cloud(). This will submit the model for training and record generation using Gretel Cloud. Calling poll(model) will block the task until the model has completed training.

Now that the model has been trained, we’ll use get_artifact_link to return a link to download the generated synthetic features.



Data preview of the synthetic set of features.

 

This artifact link will be used as an input to the final upload_synthetic_features step.

 

Load Synthetic Features

 
 
The original features have been extracted, and a synthetic version has been created. Now it’s time to upload the synthetic features so they can be accessed by downstream consumers. In this example, we’re going to use an S3 bucket as the final destination for the dataset.

@task()
def upload_synthetic_features(data_set: str):
    context = get_current_context()
    with open(data_set, "rb") as synth_features:
        s3.load_file_obj(
            file_obj=synth_features,
            key=f"{..._booking_features_synthetic.csv",
        )


This task is pretty straightforward. The data_set input value contains a signed HTTP link to download the synthetic dataset from Gretel’s API. The task will read that file into the Airflow worker, and then use the already configured S3 hook to upload the synthetic feature file to an S3 bucket where downstream consumers or models can access it.

 

Orchestrating the Pipeline

 
 
Over the last three sections we’ve walked through all the code required to extract, synthesize and load a dataset. The last step is to tie each of these tasks together into a single Airflow pipeline.

If you’ll recall back to the beginning of this post, we briefly mentioned the concept of a DAG. Using Airflow’s TaskFlow API we can compose these three Python methods into a DAG that defines the inputs, outputs and order each step will be run.

feature_path = extract_features(
"/opt/airflow/dags/sql/session_rollups__by_user.sql"
    )
synthetic_data = generate_synthetic_features(feature_path)
upload_synthetic_features(synthetic_data)


If you follow the path of these method calls, you will eventually get a graph that looks like our original feature pipeline.



Gretel synthetics pipeline on Airflow.

 

If you want to run this pipeline, and see it in action, head over to the accompanying Github repository. There you will find instructions on how to start an Airflow instance and run the pipeline end to end.

 

Wrapping things up

 
 
If you’ve made it this far, you’ve seen how Gretel can be integrated into a data pipeline built on Airflow. By combining Gretel’s developer friendly APIs, and Airflow’s powerful system of hooks and operators it’s easy to build ETL pipelines that make data more accessible and safer to use.

We also talked about a common feature engineering use case where sensitive data may not be readily accessible. By generating a synthetic version of the dataset, we reduce the risk of exposing any sensitive data, but still retain the utility of the dataset while making it quickly available to those who need it.

Thinking about the feature pipeline in more abstract terms, we now have a pattern that can be repurposed for any number of new SQL queries. By deploying a new version of the pipeline, and swapping out the initial SQL query, we can front any potentially sensitive query with a synthetic dataset that preserves customer privacy. The only line of code that needs to change is the path to the sql file. No complex data engineering required.

 

Thanks for reading

 
 
Send us an email at hi@gretel.ai or come join us in Slack if you have any questions or comments. We’d love to hear how you’re using Airflow and how we can best integrate with your existing data pipelines.

 
Bio: Drew Newberry is a Software Engineer at Gretel.ai.

Original. Reposted with permission.

Related: