Dataflow WordCount Job using jobs.create API
This blog explains how to submit a Google WordCount Dataflow job using the projects.locations.jobs.create API in Python. This approach avoids using templates.launch and works fully programmatically.
Workflow
- Stage a WordCount pipeline using Apache Beam SDK to GCS.
- Download the staged pipeline JSON.
- Submit the job using
jobs.create.
Step 0: Configuration
# Configuration
PROJECT_ID = "your-gcp-project-id"
REGION = "us-central1"
SERVICE_ACCOUNT_EMAIL = "dataflow@your-project-id.iam.gserviceaccount.com"
SERVICE_ACCOUNT_KEY_FILE = "path/to/service-account-key.json"
TEMP_BUCKET = f"gs://{PROJECT_ID}-temp"
TEMPLATE_JSON_GCS = f"{TEMP_BUCKET}/templates/wordcount_pipeline.json"
INPUT_FILE = "gs://dataflow-samples/shakespeare/kinglear.txt"
OUTPUT_PATH = f"gs://{PROJECT_ID}-output/wordcount-output"
Explanation: Set your GCP project, region, service account, and buckets. TEMPLATE_JSON_GCS is the staged pipeline JSON location.
Step 1: Stage WordCount Pipeline Using Apache Beam
import subprocess
def stage_pipeline():
print("Staging WordCount pipeline to GCS...")
```
pipeline_code = """
```
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class WordCountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
def run():
options = WordCountOptions()
p = beam.Pipeline(options=options)
```
(p
| 'Read' >> beam.io.ReadFromText(options.input)
| 'Split' >> beam.FlatMap(lambda x: x.split())
| 'Pair' >> beam.Map(lambda word: (word, 1))
| 'Group' >> beam.CombinePerKey(sum)
| 'Write' >> beam.io.WriteToText(options.output)
)
p.run().wait_until_finish()
```
if **name** == '**main**':
run()
"""
```
with open("wordcount_pipeline.py", "w") as f:
f.write(pipeline_code)
subprocess.run([
"python", "wordcount_pipeline.py",
f"--runner=DataflowRunner",
f"--project={PROJECT_ID}",
f"--region={REGION}",
f"--staging_location={TEMP_BUCKET}/staging",
f"--temp_location={TEMP_BUCKET}/temp",
f"--template_location={TEMPLATE_JSON_GCS}",
f"--input={INPUT_FILE}",
f"--output={OUTPUT_PATH}"
], check=True)
print("Pipeline staged to:", TEMPLATE_JSON_GCS)
```
stage_pipeline()
Explanation: Creates a small Apache Beam WordCount pipeline and stages it as JSON in GCS. This JSON contains all steps required for jobs.create.
Step 2: Authenticate Dataflow Service
from googleapiclient import discovery
from google.oauth2 import service_account
def get_dataflow_service():
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_KEY_FILE,
scopes=["[https://www.googleapis.com/auth/cloud-platform](https://www.googleapis.com/auth/cloud-platform)"]
)
return discovery.build("dataflow", "v1b3", credentials=credentials)
# Authenticate
dataflow = get_dataflow_service()
Explanation: Uses your service account JSON key and builds the Dataflow REST API client.
Step 3: Create Dataflow Job Using jobs.create
import tempfile
import urllib.request
def create_job(dataflow):
print("Submitting Dataflow job using jobs.create...")
```
temp_file = tempfile.NamedTemporaryFile(delete=False)
urllib.request.urlretrieve(TEMPLATE_JSON_GCS, temp_file.name)
with open(temp_file.name) as f:
pipeline_spec = json.load(f)
job_body = {
"jobName": "wordcount-job-create-api",
"environment": {
"serviceAccountEmail": SERVICE_ACCOUNT_EMAIL,
"tempLocation": f"{TEMP_BUCKET}/temp",
"zone": f"{REGION}-a",
"machineType": "n1-standard-1",
"maxWorkers": 2
},
"steps": pipeline_spec["steps"],
"type": "JOB_TYPE_BATCH"
}
result = dataflow.projects().locations().jobs().create(
projectId=PROJECT_ID,
location=REGION,
body=job_body
).execute()
print("Job created successfully!")
print("Job ID:", result["id"])
print(f"View job: https://console.cloud.google.com/dataflow/jobs/{REGION}/{result['id']}?project={PROJECT_ID}")
```
create_job(dataflow)
Explanation: Downloads the staged JSON pipeline and submits the job using projects.locations.jobs.create. Prints the job ID and a link to view it in GCP Console.
Summary
- Stages a WordCount pipeline using Beam SDK.
- Submits it entirely via
jobs.create. - No use of
templates.launch. - Supports service account and region configuration.
Notes
- Ensure your service account has
roles/dataflow.developerandroles/storage.admin. - Create buckets for temp and output.
- Install required packages and enable APIs before running:
pip install apache-beam[gcp] google-api-python-client google-auth
gcloud services enable dataflow.googleapis.com storage.googleapis.com
gsutil mb -l us-central1 gs://your-gcp-project-id-temp/
gsutil mb -l us-central1 gs://your-gcp-project-id-output/
This blog provides a fully end-to-end working example of using jobs.create for a Dataflow WordCount job.
No comments:
Post a Comment