Saturday, 9 August 2025

Dataflow Job Lifecycle and Log Explorer Tracking

🌩️ Understanding Dataflow Job Lifecycle and Log Explorer Entries

🌩️ Understanding Dataflow Job Lifecycle and Log Explorer Entries

When you manage Google Cloud Dataflow jobs using the REST API or Python client, each operation automatically generates an audit log entry in Cloud Logging. These logs are visible in Log Explorer under your project.

⚙️ Job Lifecycle Events and Corresponding Log Names

Operation (in code) API Method Called Log Name in Log Explorer Description
Launch a new job dataflow.jobs.create projects/YOUR_PROJECT/logs/cloudaudit.googleapis.com%2Factivity Triggered when you submit a Dataflow job using the templates.launch or jobs.create API. Shows job metadata, parameters, and user identity.
Get job details dataflow.jobs.get dataflow.googleapis.com%2Frequests (optional) Generated when fetching job state using .jobs().get(). Typically informational only.
Update or cancel a job dataflow.jobs.update dataflow.jobs.cancel (as seen in log) Triggered when you change a job’s state — e.g., request JOB_STATE_CANCELLED. Shows up as a cancel entry even though the API method is update.
Delete job ❌ Not available Dataflow does not support a “delete” API. Jobs remain in history for reference and are cleaned up automatically by Google. Cancelling is effectively the same as deletion.

🐍 Python Code Example — Dataflow Classic GCS Template Lifecycle

Below is a Python script that demonstrates the full lifecycle: launching, getting, and cancelling a Dataflow job. Each operation corresponds to a specific log entry in Cloud Logging as shown above.


"""Launch, get, update, and cancel a Dataflow job using a classic GCS template.

Requires:
    pip install google-api-python-client google-auth google-cloud-storage
"""

import os
import time
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.cloud import storage

# ---------------- CONFIG ----------------
PROJECT_ID = "dark-torch-477001-j4"
REGION = "us-east4"
BUCKET_NAME = "dark-torch-477001-j4-dataflow-bucket"
CREDENTIALS_PATH = "dataflow_cre.json"

# The classic template path for WordCount
TEMPLATE_GCS_PATH = "gs://dataflow-templates/latest/Word_Count"

# Input/output for WordCount
PARAMETERS = {
    "inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt",
    "output": f"gs://{BUCKET_NAME}/wordcount-output/results"
}

TEMP_LOCATION = f"gs://{BUCKET_NAME}/temp"
STAGING_LOCATION = f"gs://{BUCKET_NAME}/staging"

# ----------------------------------------

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = CREDENTIALS_PATH
creds = service_account.Credentials.from_service_account_file(CREDENTIALS_PATH)
dataflow = build("dataflow", "v1b3", credentials=creds, cache_discovery=False)
storage_client = storage.Client(project=PROJECT_ID, credentials=creds)


def ensure_bucket_exists(bucket_name):
    """Ensure GCS bucket exists."""
    bucket = storage_client.bucket(bucket_name)
    if not bucket.exists():
        storage_client.create_bucket(bucket_name, location="US")
        print(f"✅ Created bucket: {bucket_name}")
    else:
        print(f"ℹ️ Bucket already exists: {bucket_name}")


def launch_job(job_name):
    """Launch Dataflow classic template."""
    body = {
        "jobName": job_name,
        "parameters": PARAMETERS,
        "environment": {
            "tempLocation": TEMP_LOCATION,
            "zone": f"{REGION}-a"
        },
    }

    print(f"🚀 Launching Dataflow template '{job_name}' from {TEMPLATE_GCS_PATH} ...")

    try:
        request = dataflow.projects().locations().templates().launch(
            projectId=PROJECT_ID,
            location=REGION,
            gcsPath=TEMPLATE_GCS_PATH,
            body=body,
        )
        response = request.execute()
        job = response.get("job", {})
        job_id = job.get("id")
        print(f"✅ Job launched successfully! Job ID: {job_id}")
        print(f"👉 View in console: https://console.cloud.google.com/dataflow/jobsDetail/locations/{REGION}/jobs/{job_id}?project={PROJECT_ID}")
        return job_id
    except HttpError as e:
        print("❌ Failed to launch job:", e)
        return None


def get_job(job_id):
    """Fetch job details."""
    try:
        request = dataflow.projects().locations().jobs().get(
            projectId=PROJECT_ID,
            location=REGION,
            jobId=job_id,
        )
        response = request.execute()
        print(f"ℹ️ Job '{job_id}' state: {response.get('currentState')}")
        return response
    except HttpError as e:
        print("❌ Failed to get job:", e)
        return None


def update_job(job_id, new_state="JOB_STATE_CANCELLED"):
    """Update job to cancel/drain."""
    try:
        print(f"🔁 Updating job '{job_id}' state to {new_state} ...")
        request = dataflow.projects().locations().jobs().update(
            projectId=PROJECT_ID,
            location=REGION,
            jobId=job_id,
            body={"requestedState": new_state},
        )
        resp = request.execute()
        print(f"✅ Job '{job_id}' new state:", resp.get("currentState"))
        return resp
    except HttpError as e:
        print("❌ Failed to update job:", e)
        return None


def main():
    ensure_bucket_exists(BUCKET_NAME)
    job_name = f"wordcount-job-{int(time.time())}"

    # 1️⃣ Launch job (logs dataflow.jobs.create)
    job_id = launch_job(job_name)
    if not job_id:
        return

    # 2️⃣ Get status
    time.sleep(10)
    get_job(job_id)

    # 3️⃣ Cancel job (logs dataflow.jobs.cancel)
    print("\n⏳ Waiting before cancelling job ...")
    time.sleep(20)
    update_job(job_id, "JOB_STATE_CANCELLED")
    time.sleep(5)
    get_job(job_id)


if __name__ == "__main__":
    main()

No comments:

Post a Comment