What is Dataproc?
Dataproc is Google Cloud’s fully managed service for running Apache Spark, Hadoop, and other open-source data processing tools. It excels at handling large-scale data processing and pipeline operations.
Ephemeral Dataproc clusters are temporary clusters that:
- Are created on-demand when processing is needed
- Automatically terminate once their tasks are complete
- Help optimize costs by only running when necessary
- Can be managed through automation tools like Airflow or Terraform
What is Cloud Composer?
Cloud Composer is Google Cloud’s managed Apache Airflow service that helps you:
- Create, schedule, and monitor complex workflows
- Define workflows as Directed Acyclic Graphs (DAGs)
- Seamlessly integrate with other Google Cloud services
- Scale your workflow orchestration without managing infrastructure
Practical Implementation
Let’s walk through an example of transferring data between BigQuery tables using an Ephemeral Dataproc Cluster. While Dataproc is typically used for heavy transformations, this simple example focuses on demonstrating the Airflow/Composer setup.
1. Create the python script to transfer data
First, create a file named bigquery_transfer.py:
from google.cloud import bigquery
def transfer_data():
client = bigquery.Client()
# Set the source and destination tables
source_table = "your_project_id.your_dataset.source_table"
destination_table = "your_project_id.your_dataset.destination_table"
# Query to copy data
query = f"""
CREATE OR REPLACE TABLE `{destination_table}` AS
SELECT * FROM `{source_table}`
"""
# Run the query to copy data
client.query(query).result() # Waits for job to complete
if __name__ == "__main__":
transfer_data()
2. Set Up Google Cloud Storage
- Create a GCS bucket for your processing script
- Upload bigquery_transfer.py to the bucket
- Note the GCS path: gs://your-bucket-name/bigquery_transfer.py
3. Configure Cloud Composer
- Navigate to the GCP Console > Composer section
- Create a new Composer environment
- Make note of:
- The selected region
- The service account (needs Storage Admin permissions granted via IAM Admin)
- The DAGs folder location in GCS
4. Create the Airflow DAG
The DAG will:
- Create an ephemeral Dataproc cluster.
- Submit the Python job stored in the GCS bucket to the cluster.
- Delete the cluster after job completion.
Create a file named bq_transfer_dag.py:
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = "<your-gcp-project-id>"
REGION = "us-central1"
CLUSTER_NAME = "bq-transfer-cluster"
BUCKET_NAME = "your-bucket-name"
PYSPARK_JOB_URI = f"gs://{BUCKET_NAME}/bigquery_transfer.py"
default_args = {
"project_id": PROJECT_ID,
"start_date": days_ago(1),
}
with DAG(
"bq_to_bq_transfer",
default_args=default_args,
schedule_interval=None, # Manual trigger
) as dag:
# Step 1: Create the Dataproc Cluster
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
cluster_config={
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-4"},
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-4"},
},
)
# Step 2: Submit the PySpark Job
pyspark_job = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_JOB_URI},
}
submit_job = DataprocSubmitJobOperator(
task_id="submit_pyspark_job",
job=pyspark_job,
region=REGION,
project_id=PROJECT_ID,
)
# Step 3: Delete the Dataproc Cluster
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
trigger_rule="all_done", # Ensures the cluster is deleted even if the job fails
)
create_cluster >> submit_job >> delete_cluster
5. Deploy and Run
- Upload bq_transfer_dag.py to your Composer environment’s DAGs folder
- Open the Airflow UI through the Composer interface
- Locate the bq_to_bq_transfer DAG
- Trigger the DAG manually to start the process
Note: The python script for the ETL can be added in the same bucket that was created for the Composer instead of creating a new bucket.
Monitoring and Verification
You can monitor the progress through:
- The Airflow UI in Cloud Composer
- Dataproc’s cluster list (to see cluster creation/deletion)
- BigQuery’s job history (to verify data transfer)
Conclusion
Using ephemeral Dataproc clusters with Cloud Composer provides a powerful, cost-effective way to process data at scale. The automation ensures that clusters are created when needed and cleaned up afterward, preventing unnecessary resource usage while maintaining processing capability.