GCP Dataflow Flex Template – Python WordCount (Complete ZIP)
This is a fully working Apache Beam WordCount Flex Template using:
- ✅ Google-provided Dataflow container image
- ✅ No Docker build
- ✅ No gcloud
- ✅ Dynamic parameters
📦 ZIP Structure
wordcount-flex-template/ ├── wordcount.py ├── requirements.txt └── flex_template.json
1️⃣ wordcount.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class WordCountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input",
type=str,
help="GCS input file"
)
parser.add_value_provider_argument(
"--output",
type=str,
help="GCS output prefix"
)
def run():
options = PipelineOptions(save_main_session=True)
custom_options = options.view_as(WordCountOptions)
with beam.Pipeline(options=options) as p:
(
p
| "Read" >> beam.io.ReadFromText(custom_options.input)
| "Split" >> beam.FlatMap(lambda x: x.split())
| "PairWithOne" >> beam.Map(lambda x: (x, 1))
| "Count" >> beam.CombinePerKey(sum)
| "Format" >> beam.Map(lambda kv: f"{kv[0]}: {kv[1]}")
| "Write" >> beam.io.WriteToText(custom_options.output)
)
if __name__ == "__main__":
run()
2️⃣ requirements.txt
apache-beam[gcp]==2.53.0
Why required?
Google launcher image does NOT auto-install Beam SDK.
3️⃣ flex_template.json
{
"sdkInfo": {
"language": "PYTHON"
},
"containerSpec": {
"image": "gcr.io/dataflow-templates-base/python3-template-launcher-base",
"metadata": {
"name": "Python WordCount Flex Template",
"description": "Minimal WordCount Flex Template using Google Dataflow image",
"parameters": [
{
"name": "pythonFile",
"label": "Python Pipeline File",
"helpText": "GCS path to the Python pipeline"
},
{
"name": "requirementsFile",
"label": "Requirements File",
"helpText": "GCS path to requirements.txt"
},
{
"name": "input",
"label": "Input File",
"helpText": "GCS input file"
},
{
"name": "output",
"label": "Output Prefix",
"helpText": "GCS output prefix"
}
]
}
}
}
4️⃣ Upload to GCS
gs://my-bucket/pipelines/wordcount.py gs://my-bucket/pipelines/requirements.txt gs://my-bucket/templates/flex_wordcount.json
5️⃣ Launch via Python API
from googleapiclient.discovery import build
from google.oauth2 import service_account
import uuid
PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
dataflow = build(
"dataflow",
"v1b3",
credentials=credentials,
cache_discovery=False
)
job_name = f"wordcount-flex-{uuid.uuid4().hex[:6]}"
body = {
"launchParameter": {
"jobName": job_name,
"containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
"parameters": {
"pythonFile": "gs://my-bucket/pipelines/wordcount.py",
"requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
"input": "gs://my-bucket/input/input.txt",
"output": "gs://my-bucket/output/wordcount"
},
"environment": {
"serviceAccountEmail": "dataflow-sa@my-project.iam.gserviceaccount.com",
"tempLocation": "gs://my-bucket/temp",
"stagingLocation": "gs://my-bucket/staging",
"maxWorkers": 5
}
}
}
response = (
dataflow.projects()
.locations()
.flexTemplates()
.launch(
projectId=PROJECT_ID,
location=REGION,
body=body
)
.execute()
)
print(response)
6️⃣ Required IAM Roles
roles/dataflow.developer roles/storage.objectAdmin roles/iam.serviceAccountUser roles/compute.networkUser
✅ Final Verification
- ✔ Works with private VPC
- ✔ No Docker build
- ✔ No gcloud
- ✔ Production-safe
🎯 Blog Takeaway
Flex Templates are container-first. Even with Google images, you must supply:
- Python pipeline
- requirements.txt
- flex_template.json
Nothing else is required.
No comments:
Post a Comment