domingo, janeiro 19, 2025
HomeBig DataBuilding end-to-end data lineage for one-time and complex queries using Amazon Athena,...

Building end-to-end data lineage for one-time and complex queries using Amazon Athena, Amazon Redshift, Amazon Neptune and dbt


One-time and complex queries are two common scenarios in enterprise data analytics. One-time queries are flexible and suitable for instant analysis and exploratory research. Complex queries, on the other hand, refer to large-scale data processing and in-depth analysis based on petabyte-level data warehouses in massive data scenarios. These complex queries typically involve data sources from multiple business systems, requiring multilevel nested SQL or associations with numerous tables for highly sophisticated analytical tasks.

However, combining the data lineage of these two query types presents several challenges:

  1. Diversity of data sources
  2. Varying query complexity
  3. Inconsistent granularity in lineage tracking
  4. Different real-time requirements
  5. Difficulties in cross-system integration

Moreover, maintaining the accuracy and completeness of lineage information while providing system performance and scalability are crucial considerations. Addressing these challenges requires a carefully designed architecture and advanced technical solutions.

Amazon Athena offers serverless, flexible SQL analytics for one-time queries, enabling direct querying of Amazon Simple Storage Service (Amazon S3) data for rapid, cost-effective instant analysis. Amazon Redshift, optimized for complex queries, provides high-performance columnar storage and massively parallel processing (MPP) architecture, supporting large-scale data processing and advanced SQL capabilities. Amazon Neptune, as a graph database, is ideal for data lineage analysis, offering efficient relationship traversal and complex graph algorithms to handle large-scale, intricate data lineage relationships. The combination of these three services provides a powerful, comprehensive solution for end-to-end data lineage analysis.

In the context of comprehensive data governance, Amazon DataZone offers organization-wide data lineage visualization using Amazon Web Services (AWS) services, while dbt provides project-level lineage through model analysis and supports cross-project integration between data lakes and warehouses.

In this post, we use dbt for data modeling on both Amazon Athena and Amazon Redshift. dbt on Athena supports real-time queries, while dbt on Amazon Redshift handles complex queries, unifying the development language and significantly reducing the technical learning curve. Using a single dbt modeling language not only simplifies the development process but also automatically generates consistent data lineage information. This approach offers robust adaptability, easily accommodating changes in data structures.

By integrating Amazon Neptune graph database to store and analyze complex lineage relationships, combined with AWS Step Functions and AWS Lambda functions, we achieve a fully automated data lineage generation process. This combination promotes consistency and completeness of lineage data while enhancing the efficiency and scalability of the entire process. The result is a powerful and flexible solution for end-to-end data lineage analysis.

Architecture overview

The experiment’s context involves a customer already using Amazon Athena for one-time queries. To better accommodate massive data processing and complex query scenarios, they aim to adopt a unified data modeling language across different data platforms. This led to the implementation of both Athena on dbt and Amazon Redshift on dbt architectures.

AWS Glue crawler crawls data lake information from Amazon S3, generating a Data Catalog to support dbt on Amazon Athena data modeling. For complex query scenarios, AWS Glue performs extract, transform, and load (ETL) processing, loading data into the petabyte-scale data warehouse, Amazon Redshift. Here, data modeling uses dbt on Amazon Redshift.

Lineage data original files from both parts are loaded into an S3 bucket, providing data support for end-to-end data lineage analysis.

The following image is the architecture diagram for the solution.

Figure 1-Architecture diagram of DBT modeling based on Athena and Redshift

Some important considerations:

This experiment uses the following data dictionary:

Source table Tool Target table
imdb.name_basics DBT/Athena stg_imdb__name_basics
imdb.title_akas DBT/Athena stg_imdb__title_akas
imdb.title_basics DBT/Athena stg_imdb__title_basics
imdb.title_crew DBT/Athena stg_imdb__title_crews
imdb.title_episode DBT/Athena stg_imdb__title_episodes
imdb.title_principals DBT/Athena stg_imdb__title_principals
imdb.title_ratings DBT/Athena stg_imdb__title_ratings
stg_imdb__name_basics DBT/Redshift new_stg_imdb__name_basics
stg_imdb__title_akas DBT/Redshift new_stg_imdb__title_akas
stg_imdb__title_basics DBT/Redshift new_stg_imdb__title_basics
stg_imdb__title_crews DBT/Redshift new_stg_imdb__title_crews
stg_imdb__title_episodes DBT/Redshift new_stg_imdb__title_episodes
stg_imdb__title_principals DBT/Redshift new_stg_imdb__title_principals
stg_imdb__title_ratings DBT/Redshift new_stg_imdb__title_ratings
new_stg_imdb__name_basics DBT/Redshift int_primary_profession_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift int_known_for_titles_flattened_from_name_basics
new_stg_imdb__name_basics DBT/Redshift names
new_stg_imdb__title_akas DBT/Redshift titles
new_stg_imdb__title_basics DBT/Redshift int_genres_flattened_from_title_basics
new_stg_imdb__title_basics DBT/Redshift titles
new_stg_imdb__title_crews DBT/Redshift int_directors_flattened_from_title_crews
new_stg_imdb__title_crews DBT/Redshift int_writers_flattened_from_title_crews
new_stg_imdb__title_episodes DBT/Redshift titles
new_stg_imdb__title_principals DBT/Redshift titles
new_stg_imdb__title_ratings DBT/Redshift titles
int_known_for_titles_flattened_from_name_basics DBT/Redshift titles
int_primary_profession_flattened_from_name_basics DBT/Redshift
int_directors_flattened_from_title_crews DBT/Redshift names
int_genres_flattened_from_title_basics DBT/Redshift genre_titles
int_writers_flattened_from_title_crews DBT/Redshift names
genre_titles DBT/Redshift
names DBT/Redshift
titles DBT/Redshift

The lineage data generated by dbt on Athena includes partial lineage diagrams, as exemplified in the following images. The first image shows the lineage of name_basics in dbt on Athena. The second image shows the lineage of title_crew in dbt on Athena.

Figure 3-Lineage of name_basics in DBT on Athena

Figure 4-Lineage of title_crew in DBT on Athena

The lineage data generated by dbt on Amazon Redshift includes partial lineage diagrams, as illustrated in the following image.

Figure 5-Lineage of name_basics and title_crew in DBT on Redshift

Referring to the data dictionary and screenshots, it’s evident that the complete data lineage information is highly dispersed, spread across 29 lineage diagrams. Understanding the end-to-end comprehensive view requires significant time. In real-world environments, the situation is often more complex, with complete data lineage potentially distributed across hundreds of files. Consequently, integrating a complete end-to-end data lineage diagram becomes crucial and challenging.

This experiment will provide a detailed introduction to processing and merging data lineage files stored in Amazon S3, as illustrated in the following diagram.

Figure 6-Merging data lineage from Athena and Redshift into Neptune

Prerequisites

To perform the solution, you need to have the following prerequisites in place:

  • The Lambda function for preprocessing lineage files must have permissions to access Amazon S3 and Amazon Redshift.
  • The Lambda function for constructing the directed acyclic graph (DAG) must have permissions to access Amazon S3 and Amazon Neptune.

Solution walkthrough

To perform the solution, follow the steps in the next sections.

Preprocess raw lineage data for DAG generation using Lambda functions

Use Lambda to preprocess the raw lineage data generated by dbt, converting it into key-value pair JSON files that are easily understood by Neptune: athena_dbt_lineage_map.json and redshift_dbt_lineage_map.json.

  1. To create a new Lambda function in the Lambda console, enter a Function name, select the Runtime (Python in this example), configure the Architecture and Execution role, then click the “Create function” button.

Figure 7-Basic configuration of athena-data-lineage-process Lambda

  1. Open the created Lambda function and on the Configuration tab, in the navigation pane, select Environment variables and choose your configurations. Using Athena on dbt processing as an example, configure the environment variables as follows (the process for Amazon Redshift on dbt is similar):
    • INPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path storing the original Athena on dbt lineage files)
    • INPUT_KEY: athena_manifest.json (the original Athena on dbt lineage file)
    • OUTPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path for storing the preprocessed output of Athena on dbt lineage files)
    • OUTPUT_KEY: athena_dbt_lineage_map.json (the output file after preprocessing the original Athena on dbt lineage file)

Figure 8-Environment variable configuration for athena-data-lineage-process-Lambda

  1. On the Code tab, in the lambda_function.py file, enter the preprocessing code for the raw lineage data. Here’s a code reference using Athena on dbt processing as an example (the process for Amazon Redshift on dbt is similar). The preprocessing code for Athena on dbt’s original lineage file is as follows:

The athena_manifest.json, redshift_manifest.json, and other files used in this experiment can be obtained from the Data Lineage Graph Construction GitHub repository.

import json
import boto3
import os

def lambda_handler(event, context):
    # Set up S3 client
    s3 = boto3.client('s3')

    # Get input and output paths from environment variables
    input_bucket = os.environ['INPUT_BUCKET']
    input_key = os.environ['INPUT_KEY']
    output_bucket = os.environ['OUTPUT_BUCKET']
    output_key = os.environ['OUTPUT_KEY']

    # Define helper function
    def dbt_nodename_format(node_name):
        return node_name.split(".")[-1]

    # Read input JSON file from S3
    response = s3.get_object(Bucket=input_bucket, Key=input_key)
    file_content = response['Body'].read().decode('utf-8')
    data = json.loads(file_content)
    lineage_map = data["child_map"]
    node_dict = {}
    dbt_lineage_map = {}

    # Process data
    for item in lineage_map:
        lineage_map[item] = [dbt_nodename_format(child) for child in lineage_map[item]]
        node_dict[item] = dbt_nodename_format(item)

    # Update key names
    lineage_map = {node_dict[old]: value for old, value in lineage_map.items()}
    dbt_lineage_map["lineage_map"] = lineage_map

    # Convert result to JSON string
    result_json = json.dumps(dbt_lineage_map)

    # Write JSON string to S3
    s3.put_object(Body=result_json, Bucket=output_bucket, Key=output_key)
    print(f"Data written to s3://{output_bucket}/{output_key}")

    return {
        'statusCode': 200,
        'body': json.dumps('Athena data lineage processing completed successfully')
    }

Merge preprocessed lineage data and write to Neptune using Lambda functions

  1. Before processing data with the Lambda function, create a Lambda layer by uploading the required Gremlin plugin. For detailed steps on creating and configuring Lambda Layers, see the AWS Lambda Layers documentation.

Because connecting Lambda to Neptune for constructing a DAG requires the Gremlin plugin, it needs to be uploaded before using Lambda. The Gremlin package can be obtained from the Data Lineage Graph Construction GitHub repository.

Figure 9-Lambda layers

  1. Create a new Lambda function. Choose the function to configure. To the recently created layer, at the bottom of the page, choose Add a layer.

Figure 10_Add a layer

Create another Lambda layer for the requests library, similar to how you created the layer for the Gremlin plugin. This library will be used for HTTP client functionality in the Lambda function.

  1. Choose the recently created Lambda function to configure. Connect to Neptune through Lambda to merge the two datasets and construct a DAG. On the Code tab, the reference code to execute is as follows:
import json
import boto3
import os
import requests
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import get_credentials
from botocore.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed

def read_s3_file(s3_client, bucket, key):
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = json.loads(response['Body'].read().decode('utf-8'))
        return data.get("lineage_map", {})
    except Exception as e:
        print(f"Error reading S3 file {bucket}/{key}: {str(e)}")
        raise

def merge_data(athena_data, redshift_data):
    return {**athena_data, **redshift_data}

def sign_request(request):
    credentials = get_credentials(Session())
    auth = SigV4Auth(credentials, 'neptune-db', os.environ['AWS_REGION'])
    auth.add_auth(request)
    return dict(request.headers)

def send_request(url, headers, data):
    try:
        response = requests.post(url, headers=headers, data=data, timeout=30)
        response.raise_for_status()
        return response.text
    except requests.exceptions.RequestException as e:
        print(f"Request Error: {str(e)}")
        if hasattr(e.response, 'text'):
            print(f"Response content: {e.response.text}")
        raise

def write_to_neptune(data):
    endpoint="https://your neptune endpoint name:8182/gremlin"
    # replace with your neptune endpoint name

    # Clear Neptune database
    clear_query = "g.V().drop()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': clear_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': clear_query}))
    print(f"Clear database response: {response}")

    # Verify if the database is empty
    verify_query = "g.V().count()"
    request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': verify_query}))
    signed_headers = sign_request(request)
    response = send_request(endpoint, signed_headers, json.dumps({'gremlin': verify_query}))
    print(f"Vertex count after clearing: {response}")
    
    def process_node(node, children):
        # Add node
        query = f"g.V().has('lineage_node', 'node_name', '{node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{node}'))"
        request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
        signed_headers = sign_request(request)
        response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
        print(f"Add node response for {node}: {response}")

        for child_node in children:
            # Add child node
            query = f"g.V().has('lineage_node', 'node_name', '{child_node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{child_node}'))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add child node response for {child_node}: {response}")

            # Add edge
            query = f"g.V().has('lineage_node', 'node_name', '{node}').as('a').V().has('lineage_node', 'node_name', '{child_node}').coalesce(inE('lineage_edge').where(outV().as('a')), addE('lineage_edge').from('a').property('edge_name', ' '))"
            request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query}))
            signed_headers = sign_request(request)
            response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query}))
            print(f"Add edge response for {node} -> {child_node}: {response}")

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_node, node, children) for node, children in data.items()]
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error in processing node: {str(e)}")

def lambda_handler(event, context):
    # Initialize S3 client
    s3_client = boto3.client('s3')

    # S3 bucket and file paths
    bucket_name="data-lineage-analysis" # Replace with your S3 bucket name
    athena_key = 'athena_dbt_lineage_map.json' # Replace with your athena lineage key value output json name
    redshift_key = 'redshift_dbt_lineage_map.json' # Replace with your redshift lineage key value output json name

    try:
        # Read Athena lineage data
        athena_data = read_s3_file(s3_client, bucket_name, athena_key)
        print(f"Athena data size: {len(athena_data)}")

        # Read Redshift lineage data
        redshift_data = read_s3_file(s3_client, bucket_name, redshift_key)
        print(f"Redshift data size: {len(redshift_data)}")

        # Merge data
        combined_data = merge_data(athena_data, redshift_data)
        print(f"Combined data size: {len(combined_data)}")

        # Write to Neptune (including clearing the database)
        write_to_neptune(combined_data)

        return {
            'statusCode': 200,
            'body': json.dumps('Data successfully written to Neptune')
        }
    except Exception as e:
        print(f"Error in lambda_handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

Create Step Functions workflow

  1. On the Step Functions console, choose State machines, and then choose Create state machine. On the Choose a template page, select Blank template.

Figure 11-Step Functions blank template

  1. In the Blank template, choose Code to define your state machine. Use the following example code:
{
  "Comment": "Daily Data Lineage Processing Workflow",
  "StartAt": "Parallel Processing",
  "States": {
    "Parallel Processing": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Process Athena Data",
          "States": {
            "Process Athena Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "athena-data-lineange-process-Lambda", ##Replace with your Athena data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Process Redshift Data",
          "States": {
            "Process Redshift Data": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "Parameters": {
                "FunctionName": "redshift-data-lineange-process-Lambda", ##Replace with your Redshift data lineage process Lambda function name
                "Payload": {
                  "input.$": "$"
                }
              },
              "End": true
            }
          }
        }
      ],
      "Next": "Load Data to Neptune"
    },
    "Load Data to Neptune": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "data-lineage-analysis-lambda" ##Replace with your Lambda function Name
      },
      "End": true
    }
  }
}

  1. After completing the configuration, choose the Design tab to view the workflow shown in the following diagram.

Figure 12-Step Functions design view

Create scheduling rules with Amazon EventBridge

Configure Amazon EventBridge to generate lineage data daily during off-peak business hours. To do this:

  1. Create a new rule in the EventBridge console with a descriptive name.
  2. Set the rule type to “Schedule” and configure it to run once daily (using either a fixed rate or the Cron expression “0 0 * * ? *”).
  3. Select the AWS Step Functions state machine as the target and specify the state machine you created earlier.

Query results in Neptune

  1. On the Neptune console, select Notebooks. Open an existing notebook or create a new one.

Figure 13-Neptune notebook

  1. In the notebook, create a new code cell to perform a query. The following code example shows the query statement and its results:
%%gremlin -d node_name -de edge_name
g.V().hasLabel('lineage_node').outE('lineage_edge').inV().hasLabel('lineage_node').path().by(elementMap())

You can now see the end-to-end data lineage graph information for both dbt on Athena and dbt on Amazon Redshift. The following image shows the merged DAG data lineage graph in Neptune.

Figure 14-Merged DAG data lineage graph in Neptune

You can query the generated data lineage graph for data related to a specific table, such as title_crew.

The sample query statement and its results are shown in the following code example:

%%gremlin -d node_name -de edge_name
g.V().has('lineage_node', 'node_name', 'title_crew')
  .repeat(
    union(
      __.inE('lineage_edge').outV(),
      __.outE('lineage_edge').inV()
    )
  )
  .until(
    __.has('node_name', within('names', 'genre_titles', 'titles'))
    .or()
    .loops().is(gt(10))
  )
  .path()
  .by(elementMap())

The following image shows the filtered results based on title_crew table in Neptune.

Figure 15-Filtered results based on title_crew table in Neptune

Clean up

To clean up your resources, complete the following steps:

  1. Delete EventBridge rules
# Stop new events from triggering while removing dependencies
aws events disable-rule --name 
# Break connections between rule and targets (like Lambda functions)
aws events remove-targets --rule  --ids 
# Remove the rule completely from EventBridge
aws events delete-rule --name 

  1. Delete Step Functions state machine
# Stop all running executions
aws stepfunctions stop-execution --execution-arn 
# Delete the state machine
aws stepfunctions delete-state-machine --state-machine-arn 

  1. Delete Lambda functions
# Delete Lambda function
aws lambda delete-function --function-name 
# Delete Lambda layers (if used)
aws lambda delete-layer-version --layer-name  --version-number 

  1. Clean up the Neptune database
# Delete all snapshots
aws neptune delete-db-cluster-snapshot --db-cluster-snapshot-identifier 
# Delete database instance
aws neptune delete-db-instance --db-instance-identifier  --skip-final-snapshot
# Delete database cluster
aws neptune delete-db-cluster --db-cluster-identifier  --skip-final-snapshot

  1. Follow the instructions at Deleting a single object to clean up the S3 buckets

Conclusion

In this post, we demonstrated how dbt enables unified data modeling across Amazon Athena and Amazon Redshift, integrating data lineage from both one-time and complex queries. By using Amazon Neptune, this solution provides comprehensive end-to-end lineage analysis. The architecture uses AWS serverless computing and managed services, including Step Functions, Lambda, and EventBridge, providing a highly flexible and scalable design.

This approach significantly lowers the learning curve through a unified data modeling method while enhancing development efficiency. The end-to-end data lineage graph visualization and analysis not only strengthen data governance capabilities but also offer deep insights for decision-making.

The solution’s flexible and scalable architecture effectively optimizes operational costs and improves business responsiveness. This comprehensive approach balances technical innovation, data governance, operational efficiency, and cost-effectiveness, thus supporting long-term business growth with the adaptability to meet evolving enterprise needs.

With OpenLineage-compatible data lineage now generally available in Amazon DataZone, we plan to explore integration possibilities to further enhance the system’s capability to handle complex data lineage analysis scenarios.

If you have any questions, please feel free to leave a comment in the comments section.


About the authors

nancynwu+photo

Nancy Wu is a Solutions Architect at AWS, responsible for cloud computing architecture consulting and design for multinational enterprise customers. Has many years of experience in big data, enterprise digital transformation research and development, consulting, and project management across telecommunications, entertainment, and financial industries.

Xu+Feng+PhotoXu Feng is a Senior Industry Solution Architect at AWS, responsible for designing, building, and promoting industry solutions for the Media & Entertainment and Advertising sectors, such as intelligent customer service and business intelligence. With 20 years of software industry experience, currently focused on researching and implementing generative AI and AI-powered data solutions.

Xu+Da+PhotoXu Da is a Amazon Web Services (AWS) Partner Solutions Architect based out of Shanghai, China. He has more than 25 years of experience in IT industry, software development and solution architecture. He is passionate about collaborative learning, knowledge sharing, and guiding community in their cloud technologies journey.

RELATED ARTICLES
- Advertisment -
Google search engine

Most Popular

Recent Comments