Simple Way to Test GCP Dataflow Dynamic (Flex) Templates Using Python
When working with GCP Dataflow Flex Templates, many engineers think they must:
- Build Docker images
- Use
gcloud - Create complex CI pipelines
In reality, you can test a dynamic (Flex) template using a very small Python script. This post shows the simplest and cleanest approach.
What Is a Dynamic (Flex) Template?
A Flex Template allows you to:
- Pass runtime parameters
- Use Python UDFs
- Avoid rebuilding templates for every change
Unlike classic templates, Flex Templates are container-first, even when using Google-provided images.
Option 1: Simplest Logic Test (No Template)
This method only validates your pipeline logic. It does not test the Flex Template itself.
python wordcount.py \
--runner=DataflowRunner \
--project=my-project \
--region=us-east4 \
--temp_location=gs://my-bucket/temp \
--staging_location=gs://my-bucket/staging \
--input=gs://my-bucket/input/input.txt \
--output=gs://my-bucket/output/wordcount-test
✔ Confirms code works
❌ Does not validate Flex Template JSON
Option 2 (Recommended): Test Flex Template Using Python
This is the best and simplest way to test a dynamic template end-to-end.
Minimal Python Test Script
from googleapiclient.discovery import build
from google.oauth2 import service_account
PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
dataflow = build(
"dataflow",
"v1b3",
credentials=credentials,
cache_discovery=False
)
body = {
"launchParameter": {
"jobName": "flex-test-wordcount",
"containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
"parameters": {
"pythonFile": "gs://my-bucket/pipelines/wordcount.py",
"requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
"input": "gs://my-bucket/input/input.txt",
"output": "gs://my-bucket/output/wordcount-test"
},
"environment": {
"tempLocation": "gs://my-bucket/temp",
"stagingLocation": "gs://my-bucket/staging"
}
}
}
response = (
dataflow.projects()
.locations()
.flexTemplates()
.launch(
projectId=PROJECT_ID,
location=REGION,
body=body
)
.execute()
)
print("Job launched:", response["job"]["id"])
✔ Tests Flex JSON
✔ Tests container
✔ Tests runtime parameters
✔ No Docker
✔ No gcloud
Option 3: Local Development (Fastest Feedback)
For local development, use the DirectRunner:
python wordcount.py \
--runner=DirectRunner \
--input=local.txt \
--output=out
✔ Fast logic testing
❌ No GCP validation
Recommended Testing Flow
| Stage | Method |
|---|---|
| Local development | DirectRunner |
| GCP validation | DataflowRunner |
| Template validation | Flex Template via Python |
Common Issues and Fixes
- Job stuck in STARTING → Missing
compute.networkUser - Pipeline fails instantly → Missing
requirements.txt - Workers fail → Private subnet without NAT
- Template invalid → Wrong container image
Final Takeaway
The simplest way to test a dynamic Dataflow template is:
- Use Google’s Flex Template image
- Launch via
flexTemplates().launch() - Use a small Python script
No Docker. No gcloud. Fully production-safe.
No comments:
Post a Comment