🚀 Run Google Cloud Dataflow WordCount Using jobs.create (Python Example)
This tutorial shows how to create and run a Google Cloud Dataflow WordCount job
using the projects.locations.jobs.create API directly — without using
the templates.launch method.
It’s useful when you want full control over your Dataflow job submission or
are integrating it into your automation workflow.
📋 Prerequisites
- Google Cloud Project with Dataflow and Storage APIs enabled
- Service account with roles:
roles/dataflow.developerroles/storage.objectAdminroles/serviceAccountUser
- Python packages:
pip install apache-beam[gcp] google-cloud-storage google-api-python-client
🧩 Complete Python Script
"""
GCP Dataflow WordCount using jobs.create API
--------------------------------------------
This example:
1. Builds and saves a Beam WordCount pipeline JSON spec.
2. Uploads it to GCS.
3. Creates and submits a Dataflow job using jobs.create.
"""
import os
import json
import random
import tempfile
import urllib.request
from googleapiclient.discovery import build
from google.oauth2 import service_account
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# ----------------------------
# CONFIGURATION SECTION
# ----------------------------
PROJECT_ID = "your-project-id"
REGION = "us-central1"
SERVICE_ACCOUNT = "your-sa@your-project.iam.gserviceaccount.com"
BUCKET = "your-bucket-name"
INPUT_FILE = "gs://dataflow-samples/shakespeare/kinglear.txt"
OUTPUT_PREFIX = f"gs://{BUCKET}/output/wordcount_result"
SERVICE_ACCOUNT_KEY = "path/to/your-service-account-key.json"
# Template path for saving pipeline spec
TEMPLATE_JSON_LOCAL = "wordcount_spec.json"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/wordcount_spec.json"
# ----------------------------
# STEP 1: DEFINE WORDCOUNT PIPELINE
# ----------------------------
def build_wordcount_pipeline():
"""Build a Beam WordCount pipeline and save it as JSON spec."""
options = PipelineOptions(
runner="DataflowRunner",
project=PROJECT_ID,
region=REGION,
temp_location=f"gs://{BUCKET}/temp",
staging_location=f"gs://{BUCKET}/staging",
service_account_email=SERVICE_ACCOUNT,
)
p = beam.Pipeline(options=options)
(
p
| "ReadLines" >> beam.io.ReadFromText(INPUT_FILE)
| "SplitWords" >> beam.FlatMap(lambda line: line.split())
| "PairWithOne" >> beam.Map(lambda word: (word, 1))
| "GroupAndSum" >> beam.CombinePerKey(sum)
| "FormatResults" >> beam.Map(lambda wc: f"{wc[0]}: {wc[1]}")
| "WriteResults" >> beam.io.WriteToText(OUTPUT_PREFIX)
)
# Serialize pipeline spec to JSON (template)
pipeline_proto = p.to_runner_api(use_fake_coders=True)
json_spec = json.loads(json.dumps(pipeline_proto.to_json()))
with open(TEMPLATE_JSON_LOCAL, "w") as f:
json.dump(json_spec, f, indent=2)
print(f"✅ Pipeline spec written to {TEMPLATE_JSON_LOCAL}")
# ----------------------------
# STEP 2: UPLOAD PIPELINE JSON TO GCS
# ----------------------------
def upload_to_gcs(local_path, gcs_uri):
"""Upload the local JSON spec to GCS."""
from google.cloud import storage
client = storage.Client(project=PROJECT_ID)
bucket_name, blob_path = gcs_uri.replace("gs://", "").split("/", 1)
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
blob.upload_from_filename(local_path)
print(f"✅ Uploaded pipeline spec to {gcs_uri}")
# ----------------------------
# STEP 3: CREATE DATAFLOW JOB VIA jobs.create
# ----------------------------
def create_dataflow_job():
"""Submit a Dataflow job using jobs.create API."""
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_KEY,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
dataflow = build("dataflow", "v1b3", credentials=credentials)
# Download the pipeline spec JSON from GCS
temp_file = tempfile.NamedTemporaryFile(delete=False)
urllib.request.urlretrieve(TEMPLATE_JSON_GCS, temp_file.name)
with open(temp_file.name) as f:
pipeline_spec = json.load(f)
job_name = f"wordcount-job-{random.randint(1000,9999)}"
job_body = {
"name": job_name,
"type": "JOB_TYPE_BATCH",
"environment": {
"serviceAccountEmail": SERVICE_ACCOUNT,
"tempLocation": f"gs://{BUCKET}/temp",
"machineType": "n1-standard-1",
"maxWorkers": 2
},
"steps": pipeline_spec.get("steps", [])
}
request = dataflow.projects().locations().jobs().create(
projectId=PROJECT_ID,
location=REGION,
body=job_body
)
response = request.execute()
print(f"🚀 Job Created: {response.get('id')}")
print(f"🔗 View in Console: https://console.cloud.google.com/dataflow/jobs/{REGION}/{response.get('id')}?project={PROJECT_ID}")
# ----------------------------
# STEP 4: MAIN EXECUTION
# ----------------------------
if __name__ == "__main__":
build_wordcount_pipeline()
upload_to_gcs(TEMPLATE_JSON_LOCAL, TEMPLATE_JSON_GCS)
create_dataflow_job()
⚙️ How It Works
- Builds the Beam WordCount pipeline and exports it as a JSON spec (
wordcount_spec.json). - Uploads the pipeline spec to Google Cloud Storage.
- Creates a Dataflow job via
projects.locations.jobs.createusing that JSON spec. - Prints the Dataflow job ID and console URL to monitor progress.
💡 Key Notes
- Do not include
parametersorsdkPipelineOptionsinjobs.create; they are not valid fields. - The
stepsarray is required; it represents your compiled pipeline DAG. - Your GCS bucket and region must match the Dataflow job location.
- Use
templates.launchonly when running a prebuilt Google template (like Pub/Sub to BigQuery).
✅ Summary
This approach allows you to create a Dataflow job from your own compiled Apache Beam pipeline
without relying on the templates.launch method.
It’s ideal for automation or controlled environments where you need to directly
invoke the projects.locations.jobs.create API.
With this script, you can:
- Build and stage your WordCount pipeline
- Submit jobs directly via REST API
- Fully control Dataflow environment options and IAM context
No comments:
Post a Comment