Tuesday, 9 December 2025

GCP Dataflow Classic Template — Python UDF in GCS (Step-by-Step)

GCP Dataflow Classic Template — Python UDF in GCS (Step-by-Step)

GCP Dataflow — Classic Python Template with UDF in GCS (Step-by-Step)

Complete, beginner-friendly guide: which files to upload, exact commands, launcher script, pipeline code, IAM roles, troubleshooting, and a final pre-launch checklist. Copy & paste into your blog or CMS.

1. Quick Overview

In the Classic Dataflow approach you keep two distinct things:

  • main.py — your Beam pipeline (this must be uploaded to GCS; workers download & run it).
  • udf/my_udf.py — optional user-defined function (upload to GCS if main.py loads it from GCS).
  • launch_dataflow.py — local script that calls the Dataflow REST API to create the job (runs on your laptop/CI; do not upload).
Note: The launch script only submits the job. The pipeline code is what runs on Dataflow worker VMs and must be in Cloud Storage (GCS).

2. Replace placeholders first

Before you run any command, decide and/or replace these values:

  • PROJECT_ID → your GCP project id
  • REGION → Dataflow region (e.g. us-east4)
  • BUCKET → a GCS bucket name (unique), e.g. my-dataflow-bucket-12345
  • SERVICE_ACCOUNT_FILE → path to service account JSON (optional if using gcloud auth)
  • SERVICE_ACCOUNT_EMAIL → email of the SA used by Dataflow workers (optional)

3. Local folder structure

my-dataflow-project/
├── main.py                # Beam pipeline (uploaded to GCS)
├── udf/
│   └── my_udf.py          # UDF (uploaded to GCS if loaded from GCS)
└── launch_dataflow.py     # job submitter (run locally)

4. File contents — copy these exact examples

a) udf/my_udf.py

# simple example UDF
def process(line):
    # transform input line to upper-case
    return line.upper()

b) main.py — pipeline that downloads UDF from GCS

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import importlib.util
import os
import tempfile

class LoadUDF(beam.DoFn):
    def __init__(self, udf_gcs_path):
        self.udf_gcs_path = udf_gcs_path

    def setup(self):
        # download udf file from GCS to local path
        local_tmp = tempfile.mkdtemp()
        local_path = os.path.join(local_tmp, "udf.py")

        # Use gsutil to copy the file (workers must have gsutil available)
        os.system(f"gsutil cp {self.udf_gcs_path} {local_path}")

        spec = importlib.util.spec_from_file_location("udf", local_path)
        self.udf = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(self.udf)

    def process(self, element):
        yield self.udf.process(element)

def run():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    parser.add_argument("--udf_file", required=True)
    args, pipeline_args = parser.parse_known_args()

    options = PipelineOptions(pipeline_args, save_main_session=True)

    with beam.Pipeline(options=options) as p:
        (p
         | "Read" >> beam.io.ReadFromText(args.input)
         | "ApplyUDF" >> beam.ParDo(LoadUDF(args.udf_file))
         | "Write" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    run()

c) launch_dataflow.py — job submitter using Dataflow REST client

import uuid
import json
from google.oauth2 import service_account
from googleapiclient.discovery import build

PROJECT_ID = "YOUR_PROJECT_ID"
REGION = "us-east4"
BUCKET = "YOUR_BUCKET_NAME"
SERVICE_ACCOUNT_FILE = "service-account.json"  # or leave to use gcloud default auth

MAIN_PY_GCS = f"gs://{BUCKET}/pipeline/main.py"
UDF_GCS = f"gs://{BUCKET}/udf/my_udf.py"
TEMP_LOCATION = f"gs://{BUCKET}/temp"
STAGING_LOCATION = f"gs://{BUCKET}/staging"

JOB_NAME = f"classic-udf-{uuid.uuid4().hex[:6]}"

# If using service account file
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build("dataflow", "v1b3", credentials=credentials)

job_body = {
    "jobName": JOB_NAME,
    "parameters": {
        "input": f"gs://{BUCKET}/input/input.txt",
        "output": f"gs://{BUCKET}/output/result",
        "udf_file": UDF_GCS
    },
    "environment": {
        "tempLocation": TEMP_LOCATION,
        "stagingLocation": STAGING_LOCATION,
        "serviceAccountEmail": f"{PROJECT_ID}@appspot.gserviceaccount.com"
    }
}

request = dataflow.projects().locations().templates().launch(
    projectId=PROJECT_ID,
    location=REGION,
    gcsPath=MAIN_PY_GCS,
    body=job_body
)

response = request.execute()
print(json.dumps(response, indent=2))
Tip: Replace the placeholder values (PROJECT_ID, BUCKET, service account file path) before running the launcher script.

5. Create the GCS bucket (if not exists)

gcloud config set project YOUR_PROJECT_ID
gsutil mb -l us-east4 gs://YOUR_BUCKET_NAME

Use the same region as your Dataflow region for lower latency and fewer surprises.

6. Upload files to GCS (exact commands)

# upload pipeline and udf
gsutil cp main.py gs://YOUR_BUCKET_NAME/pipeline/main.py
gsutil cp udf/my_udf.py gs://YOUR_BUCKET_NAME/udf/my_udf.py

# create an example input file and upload it
echo "hello world" > input.txt
gsutil cp input.txt gs://YOUR_BUCKET_NAME/input/input.txt

7. IAM & Service Account (minimum permissions)

The launcher (the account that creates jobs) and the worker service account require appropriate roles:

EntityMinimum Roles
Launcher (account that calls Dataflow API) roles/dataflow.admin (or permissions to create Dataflow jobs)
Worker service account (used by Dataflow workers)
  • roles/dataflow.worker
  • roles/storage.objectViewer (read UDF & inputs)
  • roles/storage.objectCreator or roles/storage.objectAdmin (if writing outputs)
# Example: grant worker SA the storage.objectViewer role
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
  --member="serviceAccount:YOUR_WORKER_SA_EMAIL" \
  --role="roles/storage.objectViewer"

8. Launch the job (two ways)

A) Using the Python launcher (recommended)

Edit launch_dataflow.py with correct placeholders, then run:

python launch_dataflow.py

B) Using gcloud CLI (alternate)

gcloud dataflow jobs run my-job-name \
  --gcs-location=gs://YOUR_BUCKET_NAME/pipeline/main.py \
  --region=us-east4 \
  --parameters=input=gs://YOUR_BUCKET_NAME/input/input.txt,output=gs://YOUR_BUCKET_NAME/output/result,udf_file=gs://YOUR_BUCKET_NAME/udf/my_udf.py
Note: The Python launcher calls the Dataflow REST API and returns richer JSON job details. Use whichever method fits your workflow.

9. Monitor job and logs

  • GCP Console → Dataflow (select region) — job graph & status.
  • Cloud Logging — filter logs by Dataflow job id or resource.type="dataflow_step".
  • Check worker logs for startup errors (missing dependencies, permission issues reading GCS, or gsutil not found).

10. Common errors & fixes

  • Permission denied reading gs://...
    → Ensure worker SA has storage.objectViewer.
  • Job stuck in STARTING
    → Verify tempLocation & stagingLocation exist and are writable; check project quotas and region availability.
  • ModuleNotFoundError inside UDF
    → Ensure your UDF is a single Python file or that the worker image contains required libs. For more complex deps use Flex Templates (container).
  • gsutil not found on worker
    → Some worker images lack gsutil. If so either vendor the UDF into the pipeline bundle or use a Flex Template with a container that includes gsutil.

11. Optional: test locally (quick sanity)

Run the pipeline locally to validate behavior (you may need to adapt LoadUDF to accept local paths):

python main.py --input input.txt --output local-output --udf_file udf/my_udf.py

This helps verify logic before uploading to GCS and launching on Dataflow.

12. Final pre-launch checklist

  • [ ] main.py uploaded to gs://BUCKET/pipeline/main.py
  • [ ] udf/my_udf.py uploaded to gs://BUCKET/udf/my_udf.py (if referenced)
  • [ ] Input file uploaded to gs://BUCKET/input/input.txt
  • [ ] TEMP and STAGING set to gs://BUCKET/temp and gs://BUCKET/staging
  • [ ] Launcher configured with correct PROJECT_ID and SERVICE_ACCOUNT_FILE
  • [ ] IAM roles granted to launcher SA and worker SA
Written for beginners — adapt paths and service accounts to your project's conventions. Always avoid uploading sensitive keys directly into buckets that are publicly accessible.

No comments:

Post a Comment