Saturday, 9 August 2025

Running GCP Dataflow WordCount Job Using jobs.create (Python)

Running GCP Dataflow WordCount Job Using jobs.create (Python)

🚀 Run Google Cloud Dataflow WordCount Using jobs.create (Python Example)

This tutorial shows how to create and run a Google Cloud Dataflow WordCount job using the projects.locations.jobs.create API directly — without using the templates.launch method. It’s useful when you want full control over your Dataflow job submission or are integrating it into your automation workflow.


📋 Prerequisites

  • Google Cloud Project with Dataflow and Storage APIs enabled
  • Service account with roles:
    • roles/dataflow.developer
    • roles/storage.objectAdmin
    • roles/serviceAccountUser
  • Python packages:
    pip install apache-beam[gcp] google-cloud-storage google-api-python-client

🧩 Complete Python Script

"""
GCP Dataflow WordCount using jobs.create API
--------------------------------------------
This example:
1. Builds and saves a Beam WordCount pipeline JSON spec.
2. Uploads it to GCS.
3. Creates and submits a Dataflow job using jobs.create.
"""

import os
import json
import random
import tempfile
import urllib.request
from googleapiclient.discovery import build
from google.oauth2 import service_account
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# ----------------------------
# CONFIGURATION SECTION
# ----------------------------
PROJECT_ID = "your-project-id"
REGION = "us-central1"
SERVICE_ACCOUNT = "your-sa@your-project.iam.gserviceaccount.com"
BUCKET = "your-bucket-name"
INPUT_FILE = "gs://dataflow-samples/shakespeare/kinglear.txt"
OUTPUT_PREFIX = f"gs://{BUCKET}/output/wordcount_result"
SERVICE_ACCOUNT_KEY = "path/to/your-service-account-key.json"

# Template path for saving pipeline spec
TEMPLATE_JSON_LOCAL = "wordcount_spec.json"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/wordcount_spec.json"

# ----------------------------
# STEP 1: DEFINE WORDCOUNT PIPELINE
# ----------------------------
def build_wordcount_pipeline():
    """Build a Beam WordCount pipeline and save it as JSON spec."""
    options = PipelineOptions(
        runner="DataflowRunner",
        project=PROJECT_ID,
        region=REGION,
        temp_location=f"gs://{BUCKET}/temp",
        staging_location=f"gs://{BUCKET}/staging",
        service_account_email=SERVICE_ACCOUNT,
    )

    p = beam.Pipeline(options=options)

    (
        p
        | "ReadLines" >> beam.io.ReadFromText(INPUT_FILE)
        | "SplitWords" >> beam.FlatMap(lambda line: line.split())
        | "PairWithOne" >> beam.Map(lambda word: (word, 1))
        | "GroupAndSum" >> beam.CombinePerKey(sum)
        | "FormatResults" >> beam.Map(lambda wc: f"{wc[0]}: {wc[1]}")
        | "WriteResults" >> beam.io.WriteToText(OUTPUT_PREFIX)
    )

    # Serialize pipeline spec to JSON (template)
    pipeline_proto = p.to_runner_api(use_fake_coders=True)
    json_spec = json.loads(json.dumps(pipeline_proto.to_json()))

    with open(TEMPLATE_JSON_LOCAL, "w") as f:
        json.dump(json_spec, f, indent=2)

    print(f"✅ Pipeline spec written to {TEMPLATE_JSON_LOCAL}")

# ----------------------------
# STEP 2: UPLOAD PIPELINE JSON TO GCS
# ----------------------------
def upload_to_gcs(local_path, gcs_uri):
    """Upload the local JSON spec to GCS."""
    from google.cloud import storage

    client = storage.Client(project=PROJECT_ID)
    bucket_name, blob_path = gcs_uri.replace("gs://", "").split("/", 1)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_path)
    blob.upload_from_filename(local_path)
    print(f"✅ Uploaded pipeline spec to {gcs_uri}")

# ----------------------------
# STEP 3: CREATE DATAFLOW JOB VIA jobs.create
# ----------------------------
def create_dataflow_job():
    """Submit a Dataflow job using jobs.create API."""
    credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_KEY,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    dataflow = build("dataflow", "v1b3", credentials=credentials)

    # Download the pipeline spec JSON from GCS
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    urllib.request.urlretrieve(TEMPLATE_JSON_GCS, temp_file.name)
    with open(temp_file.name) as f:
        pipeline_spec = json.load(f)

    job_name = f"wordcount-job-{random.randint(1000,9999)}"

    job_body = {
        "name": job_name,
        "type": "JOB_TYPE_BATCH",
        "environment": {
            "serviceAccountEmail": SERVICE_ACCOUNT,
            "tempLocation": f"gs://{BUCKET}/temp",
            "machineType": "n1-standard-1",
            "maxWorkers": 2
        },
        "steps": pipeline_spec.get("steps", [])
    }

    request = dataflow.projects().locations().jobs().create(
        projectId=PROJECT_ID,
        location=REGION,
        body=job_body
    )
    response = request.execute()

    print(f"🚀 Job Created: {response.get('id')}")
    print(f"🔗 View in Console: https://console.cloud.google.com/dataflow/jobs/{REGION}/{response.get('id')}?project={PROJECT_ID}")

# ----------------------------
# STEP 4: MAIN EXECUTION
# ----------------------------
if __name__ == "__main__":
    build_wordcount_pipeline()
    upload_to_gcs(TEMPLATE_JSON_LOCAL, TEMPLATE_JSON_GCS)
    create_dataflow_job()

⚙️ How It Works

  1. Builds the Beam WordCount pipeline and exports it as a JSON spec (wordcount_spec.json).
  2. Uploads the pipeline spec to Google Cloud Storage.
  3. Creates a Dataflow job via projects.locations.jobs.create using that JSON spec.
  4. Prints the Dataflow job ID and console URL to monitor progress.

💡 Key Notes

  • Do not include parameters or sdkPipelineOptions in jobs.create; they are not valid fields.
  • The steps array is required; it represents your compiled pipeline DAG.
  • Your GCS bucket and region must match the Dataflow job location.
  • Use templates.launch only when running a prebuilt Google template (like Pub/Sub to BigQuery).

✅ Summary

This approach allows you to create a Dataflow job from your own compiled Apache Beam pipeline without relying on the templates.launch method. It’s ideal for automation or controlled environments where you need to directly invoke the projects.locations.jobs.create API.

With this script, you can:

  • Build and stage your WordCount pipeline
  • Submit jobs directly via REST API
  • Fully control Dataflow environment options and IAM context


No comments:

Post a Comment