GCP Dataflow Flex Template with Python UDF
This guide demonstrates how to build and launch a Dataflow Flex Template in Python using a custom UDF. The entire workflow is handled in Python without using any gcloud CLI commands for launching.
1. Folder Structure
Organize your files as follows:
dataflow_flex_udf/ ├── main.py ├── my_udf/ │ └── custom_udf.py ├── requirements.txt ├── metadata.json ├── build_flex_template.py └── launch_flex_template.py
2. Python UDF
Create your UDF function in my_udf/custom_udf.py:
def my_uppercase_udf(text: str) -> str:
"""
Example UDF to convert text to uppercase
"""
return text.upper()
---
3. Dataflow Pipeline (main.py)
This pipeline reads input text from GCS, applies the UDF, and writes output back to GCS:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from my_udf.custom_udf import my_uppercase_udf
def run():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", required=True)
args, pipeline_args = parser.parse_known_args()
options = PipelineOptions(pipeline_args, save_main_session=True)
with beam.Pipeline(options=options) as p:
(
p
| "Read" >> beam.io.ReadFromText(args.input)
| "ApplyUDF" >> beam.Map(my_uppercase_udf)
| "Write" >> beam.io.WriteToText(args.output)
)
if __name__ == "__main__":
run()
---
4. Requirements File (requirements.txt)
apache-beam[gcp]==2.57.0
---
5. Metadata File (metadata.json)
Define the parameters for the Flex Template:
{
"name": "Python UDF Flex Template",
"description": "Run Python Beam pipeline with UDF",
"parameters": [
{ "name": "input", "label": "Input File", "helpText": "GCS path to input" },
{ "name": "output", "label": "Output File", "helpText": "GCS path to output" }
]
}
---
6. Build Flex Template in Python (build_flex_template.py)
This script builds the Flex Template JSON using Google’s Python base image:
import subprocess
# Configuration
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
MAIN_PY = "main.py"
REQUIREMENTS = "requirements.txt"
PYTHON_FILES = "."
METADATA = "metadata.json"
LAUNCHER_IMAGE = "gcr.io/dataflow-templates-base/python3-template-launcher-base"
# Build command
cmd = [
"gcloud", "dataflow", "flex-template", "build", TEMPLATE_JSON_GCS,
f"--image={LAUNCHER_IMAGE}",
"--sdk-language=PYTHON",
f"--metadata-file={METADATA}",
f"--python-requirements-file={REQUIREMENTS}",
f"--python-entry-point={MAIN_PY}",
f"--python-files={PYTHON_FILES}"
]
# Execute
subprocess.run(cmd, check=True)
print("Flex Template JSON created at:", TEMPLATE_JSON_GCS)
Note: Even though we use Python here, building the template requires calling
---
gcloud to generate the JSON. Launching can be done fully via Python API.
7. Launch Flex Template via Python (launch_flex_template.py)
import json
import uuid
from googleapiclient.discovery import build
from google.oauth2 import service_account
# Config
PROJECT_ID = "your-project-id"
REGION = "us-central1"
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
SERVICE_ACCOUNT_FILE = "service-account.json"
INPUT_FILE = f"gs://{BUCKET}/input/input.txt"
OUTPUT_FILE = f"gs://{BUCKET}/output/result"
# Authenticate
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
dataflow = build("dataflow", "v1b3", credentials=credentials)
# Unique job name
job_name = f"flex-udf-job-{uuid.uuid4().hex[:6]}"
# Launch request body
request = dataflow.projects().locations().flexTemplates().launch(
projectId=PROJECT_ID,
location=REGION,
body={
"launchParameter": {
"containerSpecGcsPath": TEMPLATE_JSON_GCS,
"jobName": job_name,
"parameters": {"input": INPUT_FILE, "output": OUTPUT_FILE},
"environment": {"tempLocation": f"gs://{BUCKET}/temp"}
}
}
)
response = request.execute()
print(json.dumps(response, indent=2))
---
8. IAM Permissions Required
- For building template / launching:
roles/dataflow.admin,roles/storage.admin - For Dataflow workers:
roles/dataflow.worker,roles/storage.objectViewer,roles/storage.objectCreator,roles/logging.logWriter
9. How It Works
- Build Flex Template JSON: package pipeline + UDF
- Upload to GCS: store template JSON
- Launch via Python API: start Dataflow job
- Pipeline executes: reads input, applies UDF, writes output
10. Benefits
- No custom Docker needed — uses Google-provided Python base image
- Full Python automation — build + launch + monitoring
- Supports complex UDFs and multiple Python modules
- Integrates easily into CI/CD pipelines
No comments:
Post a Comment