Tuesday, 9 December 2025

GCP Dataflow Flex Template with Python UDF

GCP Dataflow Flex Template with Python UDF

GCP Dataflow Flex Template with Python UDF

This guide demonstrates how to build and launch a Dataflow Flex Template in Python using a custom UDF. The entire workflow is handled in Python without using any gcloud CLI commands for launching.


1. Folder Structure

Organize your files as follows:

dataflow_flex_udf/
├── main.py
├── my_udf/
│   └── custom_udf.py
├── requirements.txt
├── metadata.json
├── build_flex_template.py
└── launch_flex_template.py

2. Python UDF

Create your UDF function in my_udf/custom_udf.py:


def my_uppercase_udf(text: str) -> str:
    """
    Example UDF to convert text to uppercase
    """
    return text.upper()
---

3. Dataflow Pipeline (main.py)

This pipeline reads input text from GCS, applies the UDF, and writes output back to GCS:


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from my_udf.custom_udf import my_uppercase_udf

def run():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    args, pipeline_args = parser.parse_known_args()

    options = PipelineOptions(pipeline_args, save_main_session=True)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read" >> beam.io.ReadFromText(args.input)
            | "ApplyUDF" >> beam.Map(my_uppercase_udf)
            | "Write" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    run()
---

4. Requirements File (requirements.txt)

apache-beam[gcp]==2.57.0
---

5. Metadata File (metadata.json)

Define the parameters for the Flex Template:


{
  "name": "Python UDF Flex Template",
  "description": "Run Python Beam pipeline with UDF",
  "parameters": [
    { "name": "input", "label": "Input File", "helpText": "GCS path to input" },
    { "name": "output", "label": "Output File", "helpText": "GCS path to output" }
  ]
}
---

6. Build Flex Template in Python (build_flex_template.py)

This script builds the Flex Template JSON using Google’s Python base image:


import subprocess

# Configuration
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
MAIN_PY = "main.py"
REQUIREMENTS = "requirements.txt"
PYTHON_FILES = "."
METADATA = "metadata.json"
LAUNCHER_IMAGE = "gcr.io/dataflow-templates-base/python3-template-launcher-base"

# Build command
cmd = [
    "gcloud", "dataflow", "flex-template", "build", TEMPLATE_JSON_GCS,
    f"--image={LAUNCHER_IMAGE}",
    "--sdk-language=PYTHON",
    f"--metadata-file={METADATA}",
    f"--python-requirements-file={REQUIREMENTS}",
    f"--python-entry-point={MAIN_PY}",
    f"--python-files={PYTHON_FILES}"
]

# Execute
subprocess.run(cmd, check=True)
print("Flex Template JSON created at:", TEMPLATE_JSON_GCS)
Note: Even though we use Python here, building the template requires calling gcloud to generate the JSON. Launching can be done fully via Python API.
---

7. Launch Flex Template via Python (launch_flex_template.py)


import json
import uuid
from googleapiclient.discovery import build
from google.oauth2 import service_account

# Config
PROJECT_ID = "your-project-id"
REGION = "us-central1"
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
SERVICE_ACCOUNT_FILE = "service-account.json"
INPUT_FILE = f"gs://{BUCKET}/input/input.txt"
OUTPUT_FILE = f"gs://{BUCKET}/output/result"

# Authenticate
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build("dataflow", "v1b3", credentials=credentials)

# Unique job name
job_name = f"flex-udf-job-{uuid.uuid4().hex[:6]}"

# Launch request body
request = dataflow.projects().locations().flexTemplates().launch(
    projectId=PROJECT_ID,
    location=REGION,
    body={
        "launchParameter": {
            "containerSpecGcsPath": TEMPLATE_JSON_GCS,
            "jobName": job_name,
            "parameters": {"input": INPUT_FILE, "output": OUTPUT_FILE},
            "environment": {"tempLocation": f"gs://{BUCKET}/temp"}
        }
    }
)

response = request.execute()
print(json.dumps(response, indent=2))
---

8. IAM Permissions Required

  • For building template / launching: roles/dataflow.admin, roles/storage.admin
  • For Dataflow workers: roles/dataflow.worker, roles/storage.objectViewer, roles/storage.objectCreator, roles/logging.logWriter
---

9. How It Works

  1. Build Flex Template JSON: package pipeline + UDF
  2. Upload to GCS: store template JSON
  3. Launch via Python API: start Dataflow job
  4. Pipeline executes: reads input, applies UDF, writes output
---

10. Benefits

  • No custom Docker needed — uses Google-provided Python base image
  • Full Python automation — build + launch + monitoring
  • Supports complex UDFs and multiple Python modules
  • Integrates easily into CI/CD pipelines

No comments:

Post a Comment