Apache Spark workloads running on Amazon EMR on EKS form the foundation of many modern data platforms. EMR on EKS offers benefits by providing managed Spark that integrates seamlessly with other AWS services and your organization’s existing Kubernetes-based deployment patterns.
Data platforms processing large-scale data volumes often require multiple EMR on EKS clusters. In the post Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments, we introduced Batch Processing Gateway (BPG) as a solution for managing Spark workloads across these clusters. Although BPG provides foundational functionality to distribute workloads and support routing for Spark jobs in multi-cluster environments, enterprise data platforms require additional features for a comprehensive data processing pipeline.
This post shows how to enhance the multi-cluster solution by integrating Amazon Managed Workflows for Apache Airflow (Amazon MWAA) with BPG. By using Amazon MWAA, we add job scheduling and orchestration capabilities, enabling you to build a comprehensive end-to-end Spark-based data processing pipeline.
Overview of solution
Consider HealthTech Analytics, a healthcare analytics company managing two distinct data processing workloads. Their Clinical Insights Data Science team processes sensitive patient outcome data requiring HIPAA compliance and dedicated resources, and their Digital Analytics team handles website interaction data with more flexible requirements. As their operation grows, they face increasing challenges in managing these diverse workloads efficiently.
The company needs to maintain strict separation between protected health information (PHI) and non-PHI data processing, while also addressing different cost center requirements. The Clinical Insights Data Science team runs critical end-of-day batch processes that need guaranteed resources, whereas the Digital Analytics team can use cost-optimized spot instances for their variable workloads. Additionally, data scientists from both teams require environments for experimentation and prototyping as needed.
This scenario presents an ideal use case for implementing a data pipeline using Amazon MWAA, BPG, and multiple EMR on EKS clusters. The solution needs to route different Spark workloads to appropriate clusters based on security requirements and cost profiles, while maintaining the necessary isolation and compliance controls. To effectively manage such an environment, we need a solution that maintains clean separation between application and infrastructure management concerns and stitching together multiple components into a robust pipeline.
Our solution consists of integrating Amazon MWAA with BPG through an Airflow custom operator for BPG called BPGOperator
. This operator encapsulates the infrastructure management logic needed to interact with BPG. BPGOperator
provides a clean interface for job submission through Amazon MWAA. When executed, the operator communicates with BPG, which then routes the Spark workloads to available EMR on EKS clusters based on predefined routing rules.
The following architecture diagram illustrates the components and their interactions.
The solution works through the following steps:
- Amazon MWAA executes scheduled DAGs using
BPGOperator
. Data engineers create DAGs using this operator, requiring only the Spark application configuration file and basic scheduling parameters. BPGOperator
authenticates and submits jobs to the BPG submit endpointPOST:/apiv2/spark
. It handles all HTTP communication details, manages authentication tokens, and provides secure transmission of job configurations.- BPG routes submitted jobs to EMR on EKS clusters based on predefined routing rules. These routing rules are managed centrally through BPG configuration, allowing rules-based distribution of workloads across multiple clusters.
BPGOperator
monitors job status, captures logs, and handles execution retries. It polls the BPG job status endpointGET:/apiv2/spark/{subID}/status
and streams logs to Airflow by polling theGET:/apiv2/log
endpoint every second. The BPG log endpoint retrieves the most current log information directly from the Spark Driver Pod.- The DAG execution progresses to subsequent tasks based on job completion status and defined dependencies.
BPGOperator
communicates the job status through Airflow’s built-in task communication system, enabling complex workflow orchestration.
Refer to the BPG REST API interface documentation for additional details.
This architecture provides several key benefits:
- Separation of responsibilities – Data Engineering and Platform Engineering teams in enterprise organizations typically maintain distinct responsibilities. The modular design in this solution enables platform engineers to configure
BPGOperator
and manage EMR on EKS clusters, while data engineers maintain DAGs. - Centralized code management –
BPGOperator
encapsulates all core functionalities required for Amazon MWAA DAGs to submit Spark jobs through BPG into a single, reusable Python module. This centralization minimizes code duplication across DAGs and improves maintainability by providing a standardized interface for job submissions.
Airflow custom operator for BPG
An Airflow Operator is a template for a predefined Task that you can define declaratively inside your DAGs. Airflow provides multiple built-in operators such as BashOperator, which executes bash commands, PythonOperator, which executes Python functions, and EmrContainerOperator, which submits new jobs to an EMR on EKS cluster. However, no built-in operators exist to implement all the steps required for the Amazon MWAA integration with BPG.
Airflow allows you to create new operators to suit your specific requirements. This operator type is known as a custom operator. A custom operator encapsulates the custom infrastructure-related logic in a single, maintainable component. Custom operators are created by extending the airflow.models.baseoperator.BaseOperator
class. We have developed and open sourced an Airflow custom operator for BPG called BPGOperator
, which implements the necessary steps to provide a seamless integration of Amazon MWAA with BPG.
The following class diagram provides a detailed view of the BPGOperator
implementation.
When a DAG includes a BPGOperator
task, the Amazon MWAA instance triggers the operator to send a job request to BPG. The operator typically performs the following steps:
- Initialize job –
BPGOperator
prepares the job payload, including input parameters, configurations, connection details, and other metadata required by BPG. - Submit job –
BPGOperator
handles HTTP POST requests to submit jobs to BPG endpoints with the provided configurations. - Monitor job execution –
BPGOperator
checks the job status, polling BPG until the job completes successfully or fails. The monitoring process includes handling various job states, managing timeout scenarios, and responding to errors that occur during job execution. - Handle job completion – Upon completion,
BPGOperator
captures the job results, logs relevant details, and can trigger downstream tasks based on the execution outcome.
The following sequence diagram illustrates the interaction flow between the Airflow DAG, BPGOperator
, and BPG.
Deploying the solution
In the remainder of this post, you will implement the end-to-end pipeline to run Spark jobs on multiple EMR on EKS clusters. You will begin by deploying the common components that serve as the foundation for building the pipelines. Next, you will deploy and configure BPG on an EKS cluster, followed by deploying and configuring BPGOperator
on Amazon MWAA. Finally, you will execute Spark jobs on multiple EMR on EKS clusters from Amazon MWAA.
To streamline the setup process, we’ve automated the deployment of all infrastructure components required for this post, so you can focus on the essential aspects of job submission to build an end-to-end pipeline. We provide detailed information to help you understand each step, simplifying the setup while preserving the learning experience.
To showcase the solution, you will create three clusters and an Amazon MWAA environment:
- Two EMR on EKS clusters:
analytics-cluster
anddatascience-cluster
- An EKS cluster:
gateway-cluster
- An Amazon MWAA environment:
airflow-environment
analytics-cluster
and datascience-cluster
serve as data processing clusters that run Spark workloads, gateway-cluster
hosts BPG, and airflow-environment
hosts Airflow for job orchestration and scheduling.
You can find the code base in the GitHub repo.
Prerequisites
Before you deploy this solution, make sure that the following prerequisites are in place:
Set up common infrastructure
This step handles the setup of networking infrastructure, including virtual private cloud (VPC) and subnets, along with the configuration of AWS Identity and Access Management (IAM) roles, Amazon Simple Storage Service (Amazon S3) storage, Amazon Elastic Container Registry (Amazon ECR) repository for BPG images, Amazon Aurora PostgreSQL-Compatible Edition database, Amazon MWAA environment, and both EKS and EMR on EKS clusters with a preconfigured Spark operator. With this infrastructure automatically provisioned, you can concentrate on the subsequent steps without getting caught up in basic setup tasks.
- Clone the repository to your local machine and set the two environment variables. Replace
with the AWS Region where you want to deploy these resources. - Execute the following script to create the common infrastructure:
- To verify successful infrastructure deployment, navigate to the AWS CloudFormation console, open your stack, and check the Events, Resources, and Outputs tabs for completion status, details, and list of resources created.
You have completed the setup of the common components that serve as the foundation for rest of the implementation.
Set up Batch Processing Gateway
This section builds the Docker image for BPG, deploys the helm chart on the gateway-cluster
EKS cluster, and exposes the BPG endpoint using Kubernetes service of type LoadBalancer
. Complete the following steps:
- Deploy BPG on the
gateway-cluster
EKS cluster: - Verify the deployment by listing the pods and viewing the pod logs:
Review the logs and confirm there are no errors or exceptions.
- Exec into the BPG pod and verify the health check:
The
healthcheck
API should return a successful response of{"status":"OK"}
, confirming successful deployment of BPG on thegateway-cluster
EKS cluster.
We have successfully configured BPG on gateway-cluster
and set up EMR on EKS for both datascience-cluster
and analytics-cluster
. This is where we left off in the previous blog post. In the next steps, we will configure Amazon MWAA with BPGOperator
, and then write and submit DAGs to demonstrate an end-to-end Spark-based data pipeline.
Configure the Airflow operator for BPG on Amazon MWAA
This section configures the BPGOperator
plugin on the Amazon MWAA environment airflow-environment
. Complete the following steps:
- Configure
BPGOperator
on Amazon MWAA: - On the Amazon MWAA console, navigate to the
airflow-environment
environment. - Choose Open Airflow UI, and in the Airflow UI, choose the Admin dropdown menu and choose Plugins.
You will see theBPGOperator
plugin listed in the Airflow UI.
Configure Airflow connections for BPG integration
This section guides you through setting up the Airflow connections that enable secure communication between your Amazon MWAA environment and BPG. BPGOperator
uses the configured connection to authenticate and interact with BPG endpoints.
Execute the following script to configure the Airflow connection bpg_connection
.
In the Airflow UI, choose the Admin dropdown menu and choose Connections. You will see the bpg_connection
listed in the Airflow UI.
Configure the Airflow DAG to execute Spark jobs
This step configures an Airflow DAG to run a sample application. In this case, we will submit a DAG containing multiple sample Spark jobs using Amazon MWAA to EMR on EKS clusters using BPG. Please wait for few minutes for the DAG to appear in the Airflow UI.
Trigger the Amazon MWAA DAG
In this step, we trigger the Airflow DAG and observe the job execution behavior, including reviewing the Spark logs in the Airflow UI:
- In the Airflow UI, review the
MWAASparkPipelineDemoJob
DAG and choose the play icon trigger the DAG. - Wait for DAG to complete successfully.
Upon successful completion of the DAG, you should see Success:1 under the Runs column. - In the Airflow UI, locate and choose the
MWAASparkPipelineDemoJob
DAG. - On the Graph tab, choose any task (in this example, we select the
calculate_pi
task) and then choose the Logs - View the Spark logs in the Airflow UI.
Migrate existing Airflow DAGs to use BPG
In enterprise data platforms, a typical data pipeline consists of Amazon MWAA submitting Spark jobs to multiple EMR on EKS clusters using the SparkKubernetesOperator and an Airflow Connection of type Kubernetes. An Airflow Connection is a set of parameters and credentials used to establish communication between Amazon MWAA and external systems or services. A DAG refers to the connection name and connects to the external system.
The following diagram shows the typical architecture.
In this setup, Airflow DAGs typically uses SparkKubernetesOperator and SparkKubernetesSensor to submit Spark jobs to a remote EMR on EKS cluster using kubernetes_conn_id=
.
The following code snippet shows the relevant details:
To migrate the infrastructure to a BPG-based infrastructure without impacting the continuity of the environment, we can deploy a parallel infrastructure using BPG, create a new Airflow Connection for BPG, and incrementally migrate the DAGs to use the new connection. By doing so, we won’t disrupt the existing infrastructure until the BPG-based infrastructure is completely operational, including the migration of all existing DAGs.
The following diagram showcases the interim state where both the Kubernetes connection and BPG connection are operational. Blue arrows indicate the existing workflow paths, and red arrows represent the new BPG-based migration paths.
The modified code snippet for the DAG is as follows:
Finally, when all the DAGs have been modified to use BPGOperator
instead of SparkKubernetesOperator
, you can decommission any remnants of the old workflow. The final state of the infrastructure will look like the following diagram.
Using this approach, we can seamlessly introduce BPG into an environment that currently uses only Amazon MWAA and EMR on EKS clusters.
Clean up
To avoid incurring future charges from the resources created in this tutorial, clean up your environment after you’ve completed the steps. You can do this by running the cleanup.sh
script, which will safely remove all the resources provisioned during the setup:
Conclusion
In the post Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments, we introduced Batch Processing Gateway as a solution for routing Spark workloads across multiple EMR on EKS clusters. In this post, we demonstrated how to enhance this foundation by integrating BPG with Amazon MWAA. Through our custom BPGOperator
, we’ve shown how to build robust end-to-end Spark-based data processing pipelines while maintaining clear separation of responsibilities and centralized code management. Finally, we demonstrated how to seamlessly incorporate the solution into your existing Amazon MWAA and EMR on EKS data platform without impacting operational continuity.
We encourage you to experiment with this architecture in your own environment, adapting it to fit your unique workloads and operational requirements. By implementing this solution, you can build efficient and scalable data processing pipelines that use the full potential of EMR on EKS and Amazon MWAA. Explore further by deploying the solution in your AWS account while adhering to your organizational security best practices and share your experiences with the AWS Big Data community.
About the Authors
Suvojit Dasgupta is a Principal Data Architect at AWS. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.
Avinash Desireddy is a Cloud Infrastructure Architect at AWS, passionate about building secure applications and data platforms. He has extensive experience in Kubernetes, DevOps, and enterprise architecture, helping customers containerize applications, streamline deployments, and optimize cloud-native environments.