Monday, 22 December 2025

simple Flex template

Simple Way to Test GCP Dataflow Flex Templates Using Python

Simple Way to Test GCP Dataflow Dynamic (Flex) Templates Using Python

When working with GCP Dataflow Flex Templates, many engineers think they must:

  • Build Docker images
  • Use gcloud
  • Create complex CI pipelines

In reality, you can test a dynamic (Flex) template using a very small Python script. This post shows the simplest and cleanest approach.


What Is a Dynamic (Flex) Template?

A Flex Template allows you to:

  • Pass runtime parameters
  • Use Python UDFs
  • Avoid rebuilding templates for every change

Unlike classic templates, Flex Templates are container-first, even when using Google-provided images.


Option 1: Simplest Logic Test (No Template)

This method only validates your pipeline logic. It does not test the Flex Template itself.


python wordcount.py \
  --runner=DataflowRunner \
  --project=my-project \
  --region=us-east4 \
  --temp_location=gs://my-bucket/temp \
  --staging_location=gs://my-bucket/staging \
  --input=gs://my-bucket/input/input.txt \
  --output=gs://my-bucket/output/wordcount-test

✔ Confirms code works
❌ Does not validate Flex Template JSON


Option 2 (Recommended): Test Flex Template Using Python

This is the best and simplest way to test a dynamic template end-to-end.

Minimal Python Test Script


from googleapiclient.discovery import build
from google.oauth2 import service_account

PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build(
    "dataflow",
    "v1b3",
    credentials=credentials,
    cache_discovery=False
)

body = {
    "launchParameter": {
        "jobName": "flex-test-wordcount",
        "containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
        "parameters": {
            "pythonFile": "gs://my-bucket/pipelines/wordcount.py",
            "requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
            "input": "gs://my-bucket/input/input.txt",
            "output": "gs://my-bucket/output/wordcount-test"
        },
        "environment": {
            "tempLocation": "gs://my-bucket/temp",
            "stagingLocation": "gs://my-bucket/staging"
        }
    }
}

response = (
    dataflow.projects()
    .locations()
    .flexTemplates()
    .launch(
        projectId=PROJECT_ID,
        location=REGION,
        body=body
    )
    .execute()
)

print("Job launched:", response["job"]["id"])

✔ Tests Flex JSON
✔ Tests container
✔ Tests runtime parameters
✔ No Docker
✔ No gcloud


Option 3: Local Development (Fastest Feedback)

For local development, use the DirectRunner:


python wordcount.py \
  --runner=DirectRunner \
  --input=local.txt \
  --output=out

✔ Fast logic testing
❌ No GCP validation


Recommended Testing Flow

Stage Method
Local development DirectRunner
GCP validation DataflowRunner
Template validation Flex Template via Python

Common Issues and Fixes

  • Job stuck in STARTING → Missing compute.networkUser
  • Pipeline fails instantly → Missing requirements.txt
  • Workers fail → Private subnet without NAT
  • Template invalid → Wrong container image

Final Takeaway

The simplest way to test a dynamic Dataflow template is:

  • Use Google’s Flex Template image
  • Launch via flexTemplates().launch()
  • Use a small Python script

No Docker. No gcloud. Fully production-safe.

dataflow

GCP Dataflow Flex Template – Python WordCount (No Docker)

GCP Dataflow Flex Template – Python WordCount (Complete ZIP)

This is a fully working Apache Beam WordCount Flex Template using:

  • ✅ Google-provided Dataflow container image
  • ✅ No Docker build
  • ✅ No gcloud
  • ✅ Dynamic parameters

📦 ZIP Structure

wordcount-flex-template/
├── wordcount.py
├── requirements.txt
└── flex_template.json

1️⃣ wordcount.py


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class WordCountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            "--input",
            type=str,
            help="GCS input file"
        )
        parser.add_value_provider_argument(
            "--output",
            type=str,
            help="GCS output prefix"
        )


def run():
    options = PipelineOptions(save_main_session=True)
    custom_options = options.view_as(WordCountOptions)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read" >> beam.io.ReadFromText(custom_options.input)
            | "Split" >> beam.FlatMap(lambda x: x.split())
            | "PairWithOne" >> beam.Map(lambda x: (x, 1))
            | "Count" >> beam.CombinePerKey(sum)
            | "Format" >> beam.Map(lambda kv: f"{kv[0]}: {kv[1]}")
            | "Write" >> beam.io.WriteToText(custom_options.output)
        )


if __name__ == "__main__":
    run()

2️⃣ requirements.txt


apache-beam[gcp]==2.53.0

Why required?
Google launcher image does NOT auto-install Beam SDK.


3️⃣ flex_template.json


{
  "sdkInfo": {
    "language": "PYTHON"
  },
  "containerSpec": {
    "image": "gcr.io/dataflow-templates-base/python3-template-launcher-base",
    "metadata": {
      "name": "Python WordCount Flex Template",
      "description": "Minimal WordCount Flex Template using Google Dataflow image",
      "parameters": [
        {
          "name": "pythonFile",
          "label": "Python Pipeline File",
          "helpText": "GCS path to the Python pipeline"
        },
        {
          "name": "requirementsFile",
          "label": "Requirements File",
          "helpText": "GCS path to requirements.txt"
        },
        {
          "name": "input",
          "label": "Input File",
          "helpText": "GCS input file"
        },
        {
          "name": "output",
          "label": "Output Prefix",
          "helpText": "GCS output prefix"
        }
      ]
    }
  }
}

4️⃣ Upload to GCS

gs://my-bucket/pipelines/wordcount.py
gs://my-bucket/pipelines/requirements.txt
gs://my-bucket/templates/flex_wordcount.json

5️⃣ Launch via Python API


from googleapiclient.discovery import build
from google.oauth2 import service_account
import uuid

PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build(
    "dataflow",
    "v1b3",
    credentials=credentials,
    cache_discovery=False
)

job_name = f"wordcount-flex-{uuid.uuid4().hex[:6]}"

body = {
    "launchParameter": {
        "jobName": job_name,
        "containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
        "parameters": {
            "pythonFile": "gs://my-bucket/pipelines/wordcount.py",
            "requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
            "input": "gs://my-bucket/input/input.txt",
            "output": "gs://my-bucket/output/wordcount"
        },
        "environment": {
            "serviceAccountEmail": "dataflow-sa@my-project.iam.gserviceaccount.com",
            "tempLocation": "gs://my-bucket/temp",
            "stagingLocation": "gs://my-bucket/staging",
            "maxWorkers": 5
        }
    }
}

response = (
    dataflow.projects()
    .locations()
    .flexTemplates()
    .launch(
        projectId=PROJECT_ID,
        location=REGION,
        body=body
    )
    .execute()
)

print(response)

6️⃣ Required IAM Roles

roles/dataflow.developer
roles/storage.objectAdmin
roles/iam.serviceAccountUser
roles/compute.networkUser

✅ Final Verification

  • ✔ Works with private VPC
  • ✔ No Docker build
  • ✔ No gcloud
  • ✔ Production-safe

🎯 Blog Takeaway

Flex Templates are container-first. Even with Google images, you must supply:

  • Python pipeline
  • requirements.txt
  • flex_template.json

Nothing else is required.

Tuesday, 9 December 2025

Databricks on AWS - Technical Guide

Databricks on AWS - Technical Guide

Databricks on AWS - Technical Guide

1. What is Databricks?

Databricks is a unified data and AI platform built on Apache Spark. It is designed for:

  • Big Data Processing (ETL, analytics)
  • Machine Learning (ML, AI)
  • Data Science and Analytics Exploration
  • Data Lakehouse architecture combining data lakes and warehouses
---

2. Databricks Architecture on AWS

Databricks separates control plane (managed by Databricks) from data plane (your AWS account).

Control Plane

  • Managed by Databricks in Databricks AWS accounts
  • Manages authentication, notebooks, job scheduling, cluster orchestration
  • Serverless for users, no EC2 instances to manage

Data Plane

  • Runs in your AWS account
  • Compute: EC2 instances in your VPC
  • Storage: S3 (Delta Lake), EBS (temporary storage for nodes)
  • Networking: Private subnets, VPC Peering, PrivateLink
Databricks AWS Architecture ---

3. Key AWS Integrations

AWS ServiceUse Case in Databricks
S3Delta Lake storage, raw data, ML datasets
EC2Compute for clusters
EBSTemporary cluster storage (shuffle, logs)
Kinesis / KafkaReal-time streaming ingestion
Redshift / RDS / AuroraData warehousing and relational DB queries
Glue Data CatalogMetadata management for Delta tables
KMSEncrypt S3 buckets and cluster storage
IAMSecure access to S3, KMS, and other services
---

4. Cluster Types & Runtimes

Cluster TypeUse CaseNotes
StandardETL, analyticsOptional auto-scaling
High ConcurrencyMultiple SQL usersOptimized for concurrent queries
Job ClusterBatch job executionTerminate after job completes
ML / GPU ClusterMachine Learning / Deep LearningGPU-enabled EC2 instances

Databricks Runtimes

  • Standard Runtime: Spark workloads
  • ML Runtime: MLlib, TensorFlow, PyTorch, scikit-learn
  • GPU Runtime: Deep learning / model training
  • Photon Engine: Optimized Delta queries
---

5. Storage & Delta Lake

Delta Lake provides:

  • ACID Transactions
  • Time Travel Queries
  • Schema Enforcement
  • Batch + Streaming support
// Mount S3 bucket to DBFS
dbutils.fs.mount(
  source="s3a://my-bucket",
  mount_point="/mnt/mydata",
  extra_configs={"fs.s3a.awsAccessKeyId":"YOUR_KEY","fs.s3a.awsSecretAccessKey":"YOUR_SECRET"}
)
---

6. Data Ingestion & Streaming

// Kafka Example
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers","broker:9092") \
  .option("subscribe","topic_name") \
  .load()
- Supports **Kinesis Data Streams** for AWS-native ingestion - **Checkpointing** in S3 or DBFS to track streaming state - Watermarking for late-arriving data ---

7. ML / AI Workloads

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import mlflow
import mlflow.spark

# Load data
data = spark.read.format("delta").load("/mnt/delta/customer_data")

# Prepare features
assembler = VectorAssembler(inputCols=["age","account_age","num_transactions"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="churn")
pipeline = Pipeline(stages=[assembler, rf])

# Train model
model = pipeline.fit(data)

# Log model to MLflow
mlflow.start_run()
mlflow.spark.log_model(model, "churn_model")
mlflow.end_run()
---

8. Security

  • IAM Roles & Policies: Assign Databricks role for S3, Kinesis access
  • KMS: Encrypt S3 buckets and cluster storage
  • Private Networking: VPC Peering, PrivateLink
  • Audit Logging: CloudTrail, S3 access logs
---

9. Autoscaling & Performance

  • Auto-scaling: min/max worker nodes per cluster
  • Photon Engine: faster Delta table queries
  • Caching: df.cache() for repeated computations
---

10. Sample Data Flow (AWS + Databricks)

Scenario: Real-time e-commerce analytics

  • Ingest customer clickstreams → Kinesis Data Stream
  • Databricks Streaming Cluster processes & cleans data → Delta Lake (S3)
  • Analyze via Databricks SQL / Redshift Spectrum dashboards
  • Train recommendation ML models on Delta Lake
  • Secure via IAM roles, KMS encryption, private VPC
AWS Databricks Data Flow ---

11. Quick Hands-On Use Cases in Databricks on AWS

ETL Pipeline

df = spark.read.json("s3://my-bucket/raw_logs/")
df_clean = df.select("userId","eventType","timestamp").filter(df.eventType!="ignore")
df_clean.write.format("delta").mode("overwrite").save("/mnt/delta/clean_logs")

Streaming Analytics

df_stream = spark.readStream.format("kinesis")\
  .option("streamName","clickstream")\
  .option("region","us-east-1").load()

# Process streaming data
df_clean = df_stream.filter(df_stream.eventType!="ignore")
df_clean.writeStream.format("delta").option("checkpointLocation","/mnt/delta/checkpoint").start("/mnt/delta/streaming_data")

ML Model Training

assembler = VectorAssembler(inputCols=["age","account_age"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="churn")
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(data)
mlflow.spark.log_model(model, "churn_model")
---

References

Databricks Beginner Guide

Databricks Beginner Guide & Use Cases

Databricks Beginner Guide & Use Cases

Basics of Databricks

Databricks is a unified data and AI platform built on Apache Spark. It is used for big data processing, analytics, and machine learning. Databricks simplifies distributed computing and provides a collaborative workspace for Data Engineers, Data Scientists, and Analysts.

Key Features of Databricks

  • Delta Lake: ACID-compliant storage for batch & streaming data.
  • Scalable Clusters: Auto-scaling Spark clusters for big data workloads.
  • MLflow Integration: Experiment tracking and model management.
  • Structured Streaming: Real-time data processing.
  • Notebooks: Support for Python, SQL, Scala, and R in a collaborative workspace.
  • Data Lakehouse: Unified architecture for structured and unstructured data.
  • Integrations: Works with AWS S3, Azure Data Lake Storage, Kafka, and more.

5 Use Cases with Technical Implementation

Use Case Cluster Type / Runtime Notebook Name Technical Implementation
1. ETL Pipeline Standard Spark Cluster ETL_Pipeline
# Load JSON from S3
df = spark.read.json("s3://my-bucket/raw_logs/")
# Clean data
df_clean = df.select("userId","eventType","timestamp").filter(df.eventType!="ignore")
# Save as Delta
df_clean.write.format("delta").mode("overwrite").save("/mnt/delta/clean_logs")
2. Real-Time Streaming Databricks ML / Spark Streaming Runtime Streaming_IoT
# Read from Kafka
streaming_df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","iot_sensors").load()

# Parse JSON & filter anomalies
from pyspark.sql.functions import from_json, col, schema_of_json
json_schema = schema_of_json('{"sensorId":"string","temperature":"double"}')
parsed_df = streaming_df.select(from_json(col("value").cast("string"), json_schema).alias("data")).select("data.*")
anomalies = parsed_df.filter(col("temperature") > 100)

# Write to console
query = anomalies.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
3. ML Model Training Databricks ML Runtime ML_Churn_Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

data = spark.read.format("delta").load("/mnt/delta/customer_data")
assembler = VectorAssembler(inputCols=["age","account_age","num_transactions"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="churn")
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(data)
model.write().overwrite().save("/mnt/models/churn_model")
4. Data Analytics / Exploration Standard Spark Cluster Sales_Analysis
import matplotlib.pyplot as plt

sales_df = spark.read.format("delta").load("/mnt/delta/sales_data")
sales_pd = sales_df.groupBy("month").sum("revenue").toPandas()
plt.plot(sales_pd["month"], sales_pd["sum(revenue)"])
plt.title("Monthly Sales Trend")
plt.xlabel("Month")
plt.ylabel("Revenue")
plt.show()
5. Data Lakehouse / Delta Lake Standard Spark Cluster Delta_Lakehouse
# Load CSV & JSON
csv_df = spark.read.format("csv").option("header","true").load("/mnt/raw/csv_data")
json_df = spark.read.format("json").load("/mnt/raw/json_data")

# Combine & save as Delta
combined_df = csv_df.unionByName(json_df)
combined_df.write.format("delta").mode("overwrite").save("/mnt/delta/combined_data")

# SQL query
spark.sql("SELECT count(*) FROM delta.`/mnt/delta/combined_data`").show()

Technical Flow in Databricks

  1. Create Cluster → select runtime (Spark / ML / GPU if needed).
  2. Mount storage → S3, ADLS, or DBFS.
  3. Create Notebook → Python / SQL / Scala.
  4. Write code → ETL, ML, Streaming, Analytics.
  5. Attach Notebook to Cluster → Run & Monitor.
  6. Save results → Delta Tables, ML Models, Visualizations.

GCP Dataflow Flex Template with Python UDF

GCP Dataflow Flex Template with Python UDF

GCP Dataflow Flex Template with Python UDF

This guide demonstrates how to build and launch a Dataflow Flex Template in Python using a custom UDF. The entire workflow is handled in Python without using any gcloud CLI commands for launching.


1. Folder Structure

Organize your files as follows:

dataflow_flex_udf/
├── main.py
├── my_udf/
│   └── custom_udf.py
├── requirements.txt
├── metadata.json
├── build_flex_template.py
└── launch_flex_template.py

2. Python UDF

Create your UDF function in my_udf/custom_udf.py:


def my_uppercase_udf(text: str) -> str:
    """
    Example UDF to convert text to uppercase
    """
    return text.upper()
---

3. Dataflow Pipeline (main.py)

This pipeline reads input text from GCS, applies the UDF, and writes output back to GCS:


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from my_udf.custom_udf import my_uppercase_udf

def run():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    args, pipeline_args = parser.parse_known_args()

    options = PipelineOptions(pipeline_args, save_main_session=True)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read" >> beam.io.ReadFromText(args.input)
            | "ApplyUDF" >> beam.Map(my_uppercase_udf)
            | "Write" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    run()
---

4. Requirements File (requirements.txt)

apache-beam[gcp]==2.57.0
---

5. Metadata File (metadata.json)

Define the parameters for the Flex Template:


{
  "name": "Python UDF Flex Template",
  "description": "Run Python Beam pipeline with UDF",
  "parameters": [
    { "name": "input", "label": "Input File", "helpText": "GCS path to input" },
    { "name": "output", "label": "Output File", "helpText": "GCS path to output" }
  ]
}
---

6. Build Flex Template in Python (build_flex_template.py)

This script builds the Flex Template JSON using Google’s Python base image:


import subprocess

# Configuration
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
MAIN_PY = "main.py"
REQUIREMENTS = "requirements.txt"
PYTHON_FILES = "."
METADATA = "metadata.json"
LAUNCHER_IMAGE = "gcr.io/dataflow-templates-base/python3-template-launcher-base"

# Build command
cmd = [
    "gcloud", "dataflow", "flex-template", "build", TEMPLATE_JSON_GCS,
    f"--image={LAUNCHER_IMAGE}",
    "--sdk-language=PYTHON",
    f"--metadata-file={METADATA}",
    f"--python-requirements-file={REQUIREMENTS}",
    f"--python-entry-point={MAIN_PY}",
    f"--python-files={PYTHON_FILES}"
]

# Execute
subprocess.run(cmd, check=True)
print("Flex Template JSON created at:", TEMPLATE_JSON_GCS)
Note: Even though we use Python here, building the template requires calling gcloud to generate the JSON. Launching can be done fully via Python API.
---

7. Launch Flex Template via Python (launch_flex_template.py)


import json
import uuid
from googleapiclient.discovery import build
from google.oauth2 import service_account

# Config
PROJECT_ID = "your-project-id"
REGION = "us-central1"
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
SERVICE_ACCOUNT_FILE = "service-account.json"
INPUT_FILE = f"gs://{BUCKET}/input/input.txt"
OUTPUT_FILE = f"gs://{BUCKET}/output/result"

# Authenticate
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build("dataflow", "v1b3", credentials=credentials)

# Unique job name
job_name = f"flex-udf-job-{uuid.uuid4().hex[:6]}"

# Launch request body
request = dataflow.projects().locations().flexTemplates().launch(
    projectId=PROJECT_ID,
    location=REGION,
    body={
        "launchParameter": {
            "containerSpecGcsPath": TEMPLATE_JSON_GCS,
            "jobName": job_name,
            "parameters": {"input": INPUT_FILE, "output": OUTPUT_FILE},
            "environment": {"tempLocation": f"gs://{BUCKET}/temp"}
        }
    }
)

response = request.execute()
print(json.dumps(response, indent=2))
---

8. IAM Permissions Required

  • For building template / launching: roles/dataflow.admin, roles/storage.admin
  • For Dataflow workers: roles/dataflow.worker, roles/storage.objectViewer, roles/storage.objectCreator, roles/logging.logWriter
---

9. How It Works

  1. Build Flex Template JSON: package pipeline + UDF
  2. Upload to GCS: store template JSON
  3. Launch via Python API: start Dataflow job
  4. Pipeline executes: reads input, applies UDF, writes output
---

10. Benefits

  • No custom Docker needed — uses Google-provided Python base image
  • Full Python automation — build + launch + monitoring
  • Supports complex UDFs and multiple Python modules
  • Integrates easily into CI/CD pipelines

GCP Dataflow Classic Template — Python UDF in GCS (Step-by-Step)

GCP Dataflow Classic Template — Python UDF in GCS (Step-by-Step)

GCP Dataflow — Classic Python Template with UDF in GCS (Step-by-Step)

Complete, beginner-friendly guide: which files to upload, exact commands, launcher script, pipeline code, IAM roles, troubleshooting, and a final pre-launch checklist. Copy & paste into your blog or CMS.

1. Quick Overview

In the Classic Dataflow approach you keep two distinct things:

  • main.py — your Beam pipeline (this must be uploaded to GCS; workers download & run it).
  • udf/my_udf.py — optional user-defined function (upload to GCS if main.py loads it from GCS).
  • launch_dataflow.py — local script that calls the Dataflow REST API to create the job (runs on your laptop/CI; do not upload).
Note: The launch script only submits the job. The pipeline code is what runs on Dataflow worker VMs and must be in Cloud Storage (GCS).

2. Replace placeholders first

Before you run any command, decide and/or replace these values:

  • PROJECT_ID → your GCP project id
  • REGION → Dataflow region (e.g. us-east4)
  • BUCKET → a GCS bucket name (unique), e.g. my-dataflow-bucket-12345
  • SERVICE_ACCOUNT_FILE → path to service account JSON (optional if using gcloud auth)
  • SERVICE_ACCOUNT_EMAIL → email of the SA used by Dataflow workers (optional)

3. Local folder structure

my-dataflow-project/
├── main.py                # Beam pipeline (uploaded to GCS)
├── udf/
│   └── my_udf.py          # UDF (uploaded to GCS if loaded from GCS)
└── launch_dataflow.py     # job submitter (run locally)

4. File contents — copy these exact examples

a) udf/my_udf.py

# simple example UDF
def process(line):
    # transform input line to upper-case
    return line.upper()

b) main.py — pipeline that downloads UDF from GCS

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import importlib.util
import os
import tempfile

class LoadUDF(beam.DoFn):
    def __init__(self, udf_gcs_path):
        self.udf_gcs_path = udf_gcs_path

    def setup(self):
        # download udf file from GCS to local path
        local_tmp = tempfile.mkdtemp()
        local_path = os.path.join(local_tmp, "udf.py")

        # Use gsutil to copy the file (workers must have gsutil available)
        os.system(f"gsutil cp {self.udf_gcs_path} {local_path}")

        spec = importlib.util.spec_from_file_location("udf", local_path)
        self.udf = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(self.udf)

    def process(self, element):
        yield self.udf.process(element)

def run():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    parser.add_argument("--udf_file", required=True)
    args, pipeline_args = parser.parse_known_args()

    options = PipelineOptions(pipeline_args, save_main_session=True)

    with beam.Pipeline(options=options) as p:
        (p
         | "Read" >> beam.io.ReadFromText(args.input)
         | "ApplyUDF" >> beam.ParDo(LoadUDF(args.udf_file))
         | "Write" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    run()

c) launch_dataflow.py — job submitter using Dataflow REST client

import uuid
import json
from google.oauth2 import service_account
from googleapiclient.discovery import build

PROJECT_ID = "YOUR_PROJECT_ID"
REGION = "us-east4"
BUCKET = "YOUR_BUCKET_NAME"
SERVICE_ACCOUNT_FILE = "service-account.json"  # or leave to use gcloud default auth

MAIN_PY_GCS = f"gs://{BUCKET}/pipeline/main.py"
UDF_GCS = f"gs://{BUCKET}/udf/my_udf.py"
TEMP_LOCATION = f"gs://{BUCKET}/temp"
STAGING_LOCATION = f"gs://{BUCKET}/staging"

JOB_NAME = f"classic-udf-{uuid.uuid4().hex[:6]}"

# If using service account file
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build("dataflow", "v1b3", credentials=credentials)

job_body = {
    "jobName": JOB_NAME,
    "parameters": {
        "input": f"gs://{BUCKET}/input/input.txt",
        "output": f"gs://{BUCKET}/output/result",
        "udf_file": UDF_GCS
    },
    "environment": {
        "tempLocation": TEMP_LOCATION,
        "stagingLocation": STAGING_LOCATION,
        "serviceAccountEmail": f"{PROJECT_ID}@appspot.gserviceaccount.com"
    }
}

request = dataflow.projects().locations().templates().launch(
    projectId=PROJECT_ID,
    location=REGION,
    gcsPath=MAIN_PY_GCS,
    body=job_body
)

response = request.execute()
print(json.dumps(response, indent=2))
Tip: Replace the placeholder values (PROJECT_ID, BUCKET, service account file path) before running the launcher script.

5. Create the GCS bucket (if not exists)

gcloud config set project YOUR_PROJECT_ID
gsutil mb -l us-east4 gs://YOUR_BUCKET_NAME

Use the same region as your Dataflow region for lower latency and fewer surprises.

6. Upload files to GCS (exact commands)

# upload pipeline and udf
gsutil cp main.py gs://YOUR_BUCKET_NAME/pipeline/main.py
gsutil cp udf/my_udf.py gs://YOUR_BUCKET_NAME/udf/my_udf.py

# create an example input file and upload it
echo "hello world" > input.txt
gsutil cp input.txt gs://YOUR_BUCKET_NAME/input/input.txt

7. IAM & Service Account (minimum permissions)

The launcher (the account that creates jobs) and the worker service account require appropriate roles:

EntityMinimum Roles
Launcher (account that calls Dataflow API) roles/dataflow.admin (or permissions to create Dataflow jobs)
Worker service account (used by Dataflow workers)
  • roles/dataflow.worker
  • roles/storage.objectViewer (read UDF & inputs)
  • roles/storage.objectCreator or roles/storage.objectAdmin (if writing outputs)
# Example: grant worker SA the storage.objectViewer role
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
  --member="serviceAccount:YOUR_WORKER_SA_EMAIL" \
  --role="roles/storage.objectViewer"

8. Launch the job (two ways)

A) Using the Python launcher (recommended)

Edit launch_dataflow.py with correct placeholders, then run:

python launch_dataflow.py

B) Using gcloud CLI (alternate)

gcloud dataflow jobs run my-job-name \
  --gcs-location=gs://YOUR_BUCKET_NAME/pipeline/main.py \
  --region=us-east4 \
  --parameters=input=gs://YOUR_BUCKET_NAME/input/input.txt,output=gs://YOUR_BUCKET_NAME/output/result,udf_file=gs://YOUR_BUCKET_NAME/udf/my_udf.py
Note: The Python launcher calls the Dataflow REST API and returns richer JSON job details. Use whichever method fits your workflow.

9. Monitor job and logs

  • GCP Console → Dataflow (select region) — job graph & status.
  • Cloud Logging — filter logs by Dataflow job id or resource.type="dataflow_step".
  • Check worker logs for startup errors (missing dependencies, permission issues reading GCS, or gsutil not found).

10. Common errors & fixes

  • Permission denied reading gs://...
    → Ensure worker SA has storage.objectViewer.
  • Job stuck in STARTING
    → Verify tempLocation & stagingLocation exist and are writable; check project quotas and region availability.
  • ModuleNotFoundError inside UDF
    → Ensure your UDF is a single Python file or that the worker image contains required libs. For more complex deps use Flex Templates (container).
  • gsutil not found on worker
    → Some worker images lack gsutil. If so either vendor the UDF into the pipeline bundle or use a Flex Template with a container that includes gsutil.

11. Optional: test locally (quick sanity)

Run the pipeline locally to validate behavior (you may need to adapt LoadUDF to accept local paths):

python main.py --input input.txt --output local-output --udf_file udf/my_udf.py

This helps verify logic before uploading to GCS and launching on Dataflow.

12. Final pre-launch checklist

  • [ ] main.py uploaded to gs://BUCKET/pipeline/main.py
  • [ ] udf/my_udf.py uploaded to gs://BUCKET/udf/my_udf.py (if referenced)
  • [ ] Input file uploaded to gs://BUCKET/input/input.txt
  • [ ] TEMP and STAGING set to gs://BUCKET/temp and gs://BUCKET/staging
  • [ ] Launcher configured with correct PROJECT_ID and SERVICE_ACCOUNT_FILE
  • [ ] IAM roles granted to launcher SA and worker SA
Written for beginners — adapt paths and service accounts to your project's conventions. Always avoid uploading sensitive keys directly into buckets that are publicly accessible.