Monday, 22 December 2025

dataflow

GCP Dataflow Flex Template – Python WordCount (No Docker)

GCP Dataflow Flex Template – Python WordCount (Complete ZIP)

This is a fully working Apache Beam WordCount Flex Template using:

  • ✅ Google-provided Dataflow container image
  • ✅ No Docker build
  • ✅ No gcloud
  • ✅ Dynamic parameters

📦 ZIP Structure

wordcount-flex-template/
├── wordcount.py
├── requirements.txt
└── flex_template.json

1️⃣ wordcount.py


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class WordCountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            "--input",
            type=str,
            help="GCS input file"
        )
        parser.add_value_provider_argument(
            "--output",
            type=str,
            help="GCS output prefix"
        )


def run():
    options = PipelineOptions(save_main_session=True)
    custom_options = options.view_as(WordCountOptions)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read" >> beam.io.ReadFromText(custom_options.input)
            | "Split" >> beam.FlatMap(lambda x: x.split())
            | "PairWithOne" >> beam.Map(lambda x: (x, 1))
            | "Count" >> beam.CombinePerKey(sum)
            | "Format" >> beam.Map(lambda kv: f"{kv[0]}: {kv[1]}")
            | "Write" >> beam.io.WriteToText(custom_options.output)
        )


if __name__ == "__main__":
    run()

2️⃣ requirements.txt


apache-beam[gcp]==2.53.0

Why required?
Google launcher image does NOT auto-install Beam SDK.


3️⃣ flex_template.json


{
  "sdkInfo": {
    "language": "PYTHON"
  },
  "containerSpec": {
    "image": "gcr.io/dataflow-templates-base/python3-template-launcher-base",
    "metadata": {
      "name": "Python WordCount Flex Template",
      "description": "Minimal WordCount Flex Template using Google Dataflow image",
      "parameters": [
        {
          "name": "pythonFile",
          "label": "Python Pipeline File",
          "helpText": "GCS path to the Python pipeline"
        },
        {
          "name": "requirementsFile",
          "label": "Requirements File",
          "helpText": "GCS path to requirements.txt"
        },
        {
          "name": "input",
          "label": "Input File",
          "helpText": "GCS input file"
        },
        {
          "name": "output",
          "label": "Output Prefix",
          "helpText": "GCS output prefix"
        }
      ]
    }
  }
}

4️⃣ Upload to GCS

gs://my-bucket/pipelines/wordcount.py
gs://my-bucket/pipelines/requirements.txt
gs://my-bucket/templates/flex_wordcount.json

5️⃣ Launch via Python API


from googleapiclient.discovery import build
from google.oauth2 import service_account
import uuid

PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build(
    "dataflow",
    "v1b3",
    credentials=credentials,
    cache_discovery=False
)

job_name = f"wordcount-flex-{uuid.uuid4().hex[:6]}"

body = {
    "launchParameter": {
        "jobName": job_name,
        "containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
        "parameters": {
            "pythonFile": "gs://my-bucket/pipelines/wordcount.py",
            "requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
            "input": "gs://my-bucket/input/input.txt",
            "output": "gs://my-bucket/output/wordcount"
        },
        "environment": {
            "serviceAccountEmail": "dataflow-sa@my-project.iam.gserviceaccount.com",
            "tempLocation": "gs://my-bucket/temp",
            "stagingLocation": "gs://my-bucket/staging",
            "maxWorkers": 5
        }
    }
}

response = (
    dataflow.projects()
    .locations()
    .flexTemplates()
    .launch(
        projectId=PROJECT_ID,
        location=REGION,
        body=body
    )
    .execute()
)

print(response)

6️⃣ Required IAM Roles

roles/dataflow.developer
roles/storage.objectAdmin
roles/iam.serviceAccountUser
roles/compute.networkUser

✅ Final Verification

  • ✔ Works with private VPC
  • ✔ No Docker build
  • ✔ No gcloud
  • ✔ Production-safe

🎯 Blog Takeaway

Flex Templates are container-first. Even with Google images, you must supply:

  • Python pipeline
  • requirements.txt
  • flex_template.json

Nothing else is required.

No comments:

Post a Comment