Friday, 31 October 2025

jobs.create

Dataflow WordCount Job using jobs.create API

Dataflow WordCount Job using jobs.create API

This blog explains how to submit a Google WordCount Dataflow job using the projects.locations.jobs.create API in Python. This approach avoids using templates.launch and works fully programmatically.

Workflow

  1. Stage a WordCount pipeline using Apache Beam SDK to GCS.
  2. Download the staged pipeline JSON.
  3. Submit the job using jobs.create.

Step 0: Configuration

# Configuration
PROJECT_ID = "your-gcp-project-id"
REGION = "us-central1"
SERVICE_ACCOUNT_EMAIL = "dataflow@your-project-id.iam.gserviceaccount.com"
SERVICE_ACCOUNT_KEY_FILE = "path/to/service-account-key.json"

TEMP_BUCKET = f"gs://{PROJECT_ID}-temp"
TEMPLATE_JSON_GCS = f"{TEMP_BUCKET}/templates/wordcount_pipeline.json"

INPUT_FILE = "gs://dataflow-samples/shakespeare/kinglear.txt"
OUTPUT_PATH = f"gs://{PROJECT_ID}-output/wordcount-output" 

Explanation: Set your GCP project, region, service account, and buckets. TEMPLATE_JSON_GCS is the staged pipeline JSON location.

Step 1: Stage WordCount Pipeline Using Apache Beam

import subprocess

def stage_pipeline():
print("Staging WordCount pipeline to GCS...")

```
pipeline_code = """
```

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class WordCountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')

def run():
options = WordCountOptions()
p = beam.Pipeline(options=options)

```
(p
 | 'Read' >> beam.io.ReadFromText(options.input)
 | 'Split' >> beam.FlatMap(lambda x: x.split())
 | 'Pair' >> beam.Map(lambda word: (word, 1))
 | 'Group' >> beam.CombinePerKey(sum)
 | 'Write' >> beam.io.WriteToText(options.output)
)

p.run().wait_until_finish()
```

if **name** == '**main**':
run()
"""

```
with open("wordcount_pipeline.py", "w") as f:
    f.write(pipeline_code)

subprocess.run([
    "python", "wordcount_pipeline.py",
    f"--runner=DataflowRunner",
    f"--project={PROJECT_ID}",
    f"--region={REGION}",
    f"--staging_location={TEMP_BUCKET}/staging",
    f"--temp_location={TEMP_BUCKET}/temp",
    f"--template_location={TEMPLATE_JSON_GCS}",
    f"--input={INPUT_FILE}",
    f"--output={OUTPUT_PATH}"
], check=True)

print("Pipeline staged to:", TEMPLATE_JSON_GCS)
```

stage_pipeline() 

Explanation: Creates a small Apache Beam WordCount pipeline and stages it as JSON in GCS. This JSON contains all steps required for jobs.create.

Step 2: Authenticate Dataflow Service

from googleapiclient import discovery
from google.oauth2 import service_account

def get_dataflow_service():
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_KEY_FILE,
scopes=["[https://www.googleapis.com/auth/cloud-platform](https://www.googleapis.com/auth/cloud-platform)"]
)
return discovery.build("dataflow", "v1b3", credentials=credentials)

# Authenticate

dataflow = get_dataflow_service() 

Explanation: Uses your service account JSON key and builds the Dataflow REST API client.

Step 3: Create Dataflow Job Using jobs.create

import tempfile
import urllib.request

def create_job(dataflow):
print("Submitting Dataflow job using jobs.create...")

```
temp_file = tempfile.NamedTemporaryFile(delete=False)
urllib.request.urlretrieve(TEMPLATE_JSON_GCS, temp_file.name)

with open(temp_file.name) as f:
    pipeline_spec = json.load(f)

job_body = {
    "jobName": "wordcount-job-create-api",
    "environment": {
        "serviceAccountEmail": SERVICE_ACCOUNT_EMAIL,
        "tempLocation": f"{TEMP_BUCKET}/temp",
        "zone": f"{REGION}-a",
        "machineType": "n1-standard-1",
        "maxWorkers": 2
    },
    "steps": pipeline_spec["steps"],
    "type": "JOB_TYPE_BATCH"
}

result = dataflow.projects().locations().jobs().create(
    projectId=PROJECT_ID,
    location=REGION,
    body=job_body
).execute()

print("Job created successfully!")
print("Job ID:", result["id"])
print(f"View job: https://console.cloud.google.com/dataflow/jobs/{REGION}/{result['id']}?project={PROJECT_ID}")
```

create_job(dataflow) 

Explanation: Downloads the staged JSON pipeline and submits the job using projects.locations.jobs.create. Prints the job ID and a link to view it in GCP Console.

Summary

  • Stages a WordCount pipeline using Beam SDK.
  • Submits it entirely via jobs.create.
  • No use of templates.launch.
  • Supports service account and region configuration.

Notes

  1. Ensure your service account has roles/dataflow.developer and roles/storage.admin.
  2. Create buckets for temp and output.
  3. Install required packages and enable APIs before running:
pip install apache-beam[gcp] google-api-python-client google-auth

gcloud services enable dataflow.googleapis.com storage.googleapis.com

gsutil mb -l us-central1 gs://your-gcp-project-id-temp/
gsutil mb -l us-central1 gs://your-gcp-project-id-output/ 

This blog provides a fully end-to-end working example of using jobs.create for a Dataflow WordCount job.

No comments:

Post a Comment