GCP Dataflow Utility (Full) — Discovery API¶
This notebook implements a comprehensive DataflowUtility
class that uses the Google Discovery API to manage Dataflow jobs (templates, flex templates, direct jobs), and contains extensive mocked unit tests that cover operations such as create, launch, get, update, cancel, delete (simulated), list, and a specific Splunk template usage example.
Notes
- This notebook uses the
google-api-python-client
Discovery API. For real runs, setGOOGLE_APPLICATION_CREDENTIALS
to your service account JSON.- Unit tests are fully mocked and safe to run locally without GCP access.
# Install dependencies (uncomment to run in a real environment)
!pip install --quiet google-api-python-client google-auth google-auth-httplib2 google-auth-oauthlib nbformat
import json
import time
from googleapiclient import discovery
from google.auth import default
from typing import Dict, Any, Optional
class DataflowUtility:
"""
Utility class for managing Google Cloud Dataflow via Discovery API.
Provides methods to launch templates, flex templates, create direct jobs,
list/get/update/cancel (and simulated delete) jobs.
"""
def __init__(self, project_id: str, region: str = "us-central1"):
self.project_id = project_id
self.region = region
# Obtain application default credentials
self.credentials, _ = default()
self.service = discovery.build("dataflow", "v1b3", credentials=self.credentials)
# Launch classic template (GCS template)
def launch_template(self, template_path: str, job_name: str, parameters: Dict[str, str], environment: Optional[Dict[str, Any]] = None):
body = {
"jobName": job_name,
"parameters": parameters or {},
}
if environment:
body["environment"] = environment
else:
body["environment"] = {"zone": f"{self.region}-a"}
request = self.service.projects().locations().templates().launch(
projectId=self.project_id,
location=self.region,
gcsPath=template_path,
body=body
)
return request.execute()
# Launch Flex Template
def launch_flex_template(self, container_spec_gcs_path: str, job_name: str, parameters: Dict[str, str], environment: Optional[Dict[str, Any]] = None):
launch_param = {
"jobName": job_name,
"containerSpecGcsPath": container_spec_gcs_path,
"parameters": parameters or {}
}
if environment:
launch_param["environment"] = environment
else:
launch_param["environment"] = {"zone": f"{self.region}-a"}
body = {"launchParameter": launch_param}
request = self.service.projects().locations().flexTemplates().launch(
projectId=self.project_id,
location=self.region,
body=body
)
return request.execute()
# Create direct job (jobs.create)
def create_job(self, job_name: str, pipeline_options: Dict[str, Any]):
body = {
"name": job_name,
"environment": pipeline_options.get("environment", {}),
# steps is optional and for demonstration only; real direct job creation typically comes from runners
"steps": pipeline_options.get("steps", [])
}
request = self.service.projects().locations().jobs().create(
projectId=self.project_id,
location=self.region,
body=body
)
return request.execute()
# List jobs
def list_jobs(self, page_size: int = 50, filter: Optional[str] = None):
request = self.service.projects().locations().jobs().list(
projectId=self.project_id,
location=self.region,
pageSize=page_size,
filter=filter
)
response = request.execute()
return response.get("jobs", [])
# Get job details
def get_job(self, job_id: str):
request = self.service.projects().locations().jobs().get(
projectId=self.project_id,
location=self.region,
jobId=job_id
)
return request.execute()
# Update job (jobs.update) - mostly used to change labels or requestedState
def update_job(self, job_id: str, update_body: Dict[str, Any]):
request = self.service.projects().locations().jobs().update(
projectId=self.project_id,
location=self.region,
jobId=job_id,
body=update_body
)
return request.execute()
# Cancel job (update requestedState)
def cancel_job(self, job_id: str):
body = {"requestedState": "JOB_STATE_CANCELLED"}
request = self.service.projects().locations().jobs().update(
projectId=self.project_id,
location=self.region,
jobId=job_id,
body=body
)
return request.execute()
# Delete job (Dataflow API does not provide a direct delete; simulate cleanup by cancel and remove metadata)
def delete_job(self, job_id: str):
# Best-effort simulation: cancel then return a "deleted" response structure.
cancel_resp = self.cancel_job(job_id)
# Simulate deletion metadata
result = {
"jobId": job_id,
"cancelResponse": cancel_resp,
"deleted": True,
"message": "Dataflow jobs cannot be permanently deleted via API; simulated deletion performed (cancel + metadata cleanup)."
}
return result
# Utility: wait for job completion
def wait_for_completion(self, job_id: str, poll_interval: int = 15, timeout_seconds: int = 3600):
elapsed = 0
while elapsed < timeout_seconds:
job = self.get_job(job_id)
state = job.get("currentState") or job.get("state") or ""
print(f"[wait] job {job_id} state: {state}")
if state in ("JOB_STATE_DONE", "JOB_STATE_FAILED", "JOB_STATE_CANCELLED", "JOB_STATE_UPDATED"):
return state
time.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(f"Job {job_id} did not reach a terminal state within {timeout_seconds} seconds.")
Examples: Operations (these will be mocked in tests)¶
The following example snippets show how you'd call the utility methods. They are safe in this notebook but require valid credentials and network access to run for real.
# Example (do not run unless you have credentials and real resources)
# from dataflow_utility import DataflowUtility
# df = DataflowUtility(project_id="my-gcp-project", region="us-central1")
# Launch classic template (WordCount example)
# resp = df.launch_template("gs://dataflow-templates/latest/Word_Count", "wordcount-job-001", {"inputFile":"gs://my-bucket/input.txt", "output":"gs://my-bucket/output"})
# print(resp)
# Launch flex template
# resp = df.launch_flex_template("gs://my-bucket/container_spec.json", "flex-job-001", {"param1":"value"})
# print(resp)
# Create direct job (demo)
# resp = df.create_job("direct-job-001", {"environment": {"tempLocation":"gs://my-bucket/tmp"}, "steps": []})
# print(resp)
# Get job
# print(df.get_job("job-id-123"))
# Update job labels
# print(df.update_job("job-id-123", {"labels": {"env":"test"}}))
# Cancel job
# print(df.cancel_job("job-id-123"))
# Delete job (simulated)
# print(df.delete_job("job-id-123"))
Splunk Dataflow Template Example (Testing Data)¶
This section contains a ready-to-use testing parameter set for the Google-provided Cloud_PubSub_to_Splunk
template.
It's intended for testing in a sandbox (you should replace URLs and tokens with real test values if you run for real).
# Splunk template path
SPLUNK_TEMPLATE = "gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk"
# Sample testing parameters (replace for real runs)
splunk_params = {
"inputSubscription": "projects/my-gcp-project/subscriptions/test-log-subscription",
"url": "https://splunk-test.example.com:8088",
"token": "12345678-ABCD-90EF-TEST-TOKEN",
"batchCount": "50",
"disableCertificateValidation": "true"
}
# Example usage (mock or real)
# df = DataflowUtility("my-gcp-project", "us-central1")
# resp = df.launch_template(SPLUNK_TEMPLATE, "test-splunk-job", splunk_params)
# print(resp)
Unit Tests (mocked)¶
The following unittest
tests fully mock the googleapiclient.discovery.build
output and test each method of DataflowUtility
. They do not call real GCP APIs.
import unittest
from unittest.mock import patch, MagicMock
# Import DataflowUtility from this notebook's namespace
# When running tests inside this notebook, DataflowUtility is available in globals()
# If running externally, import from the module where it's defined.
class TestDataflowUtilityFull(unittest.TestCase):
@patch("googleapiclient.discovery.build")
def setUp(self, mock_build):
# Setup a mock service with chained return values for the fluent API
self.mock_service = MagicMock()
mock_build.return_value = self.mock_service
# Instantiate the utility (it will use the mocked discovery.build)
from __main__ import DataflowUtility
self.df = DataflowUtility("test-project", "us-central1")
# replace service with the mock (redundant but explicit)
self.df.service = self.mock_service
def test_launch_template(self):
expected = {"job": {"id": "job-123", "name": "tmpl-job"}}
self.df.service.projects().locations().templates().launch().execute.return_value = expected
resp = self.df.launch_template("gs://template/path", "tmpl-job", {"p":"v"})
self.assertEqual(resp["job"]["id"], "job-123")
def test_launch_flex_template(self):
expected = {"job": {"id": "flex-1"}}
self.df.service.projects().locations().flexTemplates().launch().execute.return_value = expected
resp = self.df.launch_flex_template("gs://spec/path", "flex-job", {"a":"b"})
self.assertEqual(resp["job"]["id"], "flex-1")
def test_create_job(self):
expected = {"id": "create-1", "name": "direct-job"}
self.df.service.projects().locations().jobs().create().execute.return_value = expected
resp = self.df.create_job("direct-job", {"environment": {"tempLocation":"gs://tmp"}})
self.assertEqual(resp["id"], "create-1")
def test_list_jobs(self):
expected = {"jobs": [{"id":"j1"}, {"id":"j2"}]}
self.df.service.projects().locations().jobs().list().execute.return_value = expected
resp = self.df.list_jobs()
self.assertEqual(len(resp), 2)
def test_get_job(self):
expected = {"id": "j-42", "currentState": "JOB_STATE_RUNNING"}
self.df.service.projects().locations().jobs().get().execute.return_value = expected
resp = self.df.get_job("j-42")
self.assertEqual(resp["id"], "j-42")
def test_update_job(self):
expected = {"id": "j-42", "labels": {"env":"prod"}}
self.df.service.projects().locations().jobs().update().execute.return_value = expected
resp = self.df.update_job("j-42", {"labels": {"env":"prod"}})
self.assertEqual(resp["labels"]["env"], "prod")
def test_cancel_job(self):
expected = {"id": "j-42", "state": "JOB_STATE_CANCELLED"}
self.df.service.projects().locations().jobs().update().execute.return_value = expected
resp = self.df.cancel_job("j-42")
self.assertEqual(resp["state"], "JOB_STATE_CANCELLED")
def test_delete_job(self):
# delete_job calls cancel_job which we've mocked above
cancel_result = {"id": "j-99", "state": "JOB_STATE_CANCELLED"}
self.df.service.projects().locations().jobs().update().execute.return_value = cancel_result
resp = self.df.delete_job("j-99")
self.assertTrue(resp["deleted"])
self.assertEqual(resp["jobId"], "j-99")
def test_splunk_template_launch(self):
expected = {"job": {"id": "splunk-1", "name": "test-splunk-dataflow-job"}}
self.df.service.projects().locations().templates().launch().execute.return_value = expected
SPLUNK_TEMPLATE = "gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk"
params = {
"inputSubscription": "projects/test/subscriptions/logs",
"url": "https://splunk.local:8088",
"token": "dummy-token",
"disableCertificateValidation": "true"
}
resp = self.df.launch_template(SPLUNK_TEMPLATE, "test-splunk-dataflow-job", params)
self.assertEqual(resp["job"]["name"], "test-splunk-dataflow-job")
# Run tests when the cell is executed
if __name__ == "__main__":
unittest.main(argv=['first-arg-is-ignored'], exit=False)
Notebook saved as gcp_dataflow_utility_full.ipynb
.¶
- Run the test cell to execute mocked unit tests.
- Replace sample parameters with real values to run actual Dataflow operations (requires network and credentials).
No comments:
Post a Comment