Monday, 22 December 2025

simple Flex template

Simple Way to Test GCP Dataflow Flex Templates Using Python

Simple Way to Test GCP Dataflow Dynamic (Flex) Templates Using Python

When working with GCP Dataflow Flex Templates, many engineers think they must:

  • Build Docker images
  • Use gcloud
  • Create complex CI pipelines

In reality, you can test a dynamic (Flex) template using a very small Python script. This post shows the simplest and cleanest approach.


What Is a Dynamic (Flex) Template?

A Flex Template allows you to:

  • Pass runtime parameters
  • Use Python UDFs
  • Avoid rebuilding templates for every change

Unlike classic templates, Flex Templates are container-first, even when using Google-provided images.


Option 1: Simplest Logic Test (No Template)

This method only validates your pipeline logic. It does not test the Flex Template itself.


python wordcount.py \
  --runner=DataflowRunner \
  --project=my-project \
  --region=us-east4 \
  --temp_location=gs://my-bucket/temp \
  --staging_location=gs://my-bucket/staging \
  --input=gs://my-bucket/input/input.txt \
  --output=gs://my-bucket/output/wordcount-test

✔ Confirms code works
❌ Does not validate Flex Template JSON


Option 2 (Recommended): Test Flex Template Using Python

This is the best and simplest way to test a dynamic template end-to-end.

Minimal Python Test Script


from googleapiclient.discovery import build
from google.oauth2 import service_account

PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build(
    "dataflow",
    "v1b3",
    credentials=credentials,
    cache_discovery=False
)

body = {
    "launchParameter": {
        "jobName": "flex-test-wordcount",
        "containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
        "parameters": {
            "pythonFile": "gs://my-bucket/pipelines/wordcount.py",
            "requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
            "input": "gs://my-bucket/input/input.txt",
            "output": "gs://my-bucket/output/wordcount-test"
        },
        "environment": {
            "tempLocation": "gs://my-bucket/temp",
            "stagingLocation": "gs://my-bucket/staging"
        }
    }
}

response = (
    dataflow.projects()
    .locations()
    .flexTemplates()
    .launch(
        projectId=PROJECT_ID,
        location=REGION,
        body=body
    )
    .execute()
)

print("Job launched:", response["job"]["id"])

✔ Tests Flex JSON
✔ Tests container
✔ Tests runtime parameters
✔ No Docker
✔ No gcloud


Option 3: Local Development (Fastest Feedback)

For local development, use the DirectRunner:


python wordcount.py \
  --runner=DirectRunner \
  --input=local.txt \
  --output=out

✔ Fast logic testing
❌ No GCP validation


Recommended Testing Flow

Stage Method
Local development DirectRunner
GCP validation DataflowRunner
Template validation Flex Template via Python

Common Issues and Fixes

  • Job stuck in STARTING → Missing compute.networkUser
  • Pipeline fails instantly → Missing requirements.txt
  • Workers fail → Private subnet without NAT
  • Template invalid → Wrong container image

Final Takeaway

The simplest way to test a dynamic Dataflow template is:

  • Use Google’s Flex Template image
  • Launch via flexTemplates().launch()
  • Use a small Python script

No Docker. No gcloud. Fully production-safe.

No comments:

Post a Comment