GCP Dataflow — Classic Python Template with UDF in GCS (Step-by-Step)
Complete, beginner-friendly guide: which files to upload, exact commands, launcher script, pipeline code, IAM roles, troubleshooting, and a final pre-launch checklist. Copy & paste into your blog or CMS.
1. Quick Overview
In the Classic Dataflow approach you keep two distinct things:
main.py— your Beam pipeline (this must be uploaded to GCS; workers download & run it).udf/my_udf.py— optional user-defined function (upload to GCS ifmain.pyloads it from GCS).launch_dataflow.py— local script that calls the Dataflow REST API to create the job (runs on your laptop/CI; do not upload).
2. Replace placeholders first
Before you run any command, decide and/or replace these values:
PROJECT_ID→ your GCP project idREGION→ Dataflow region (e.g.us-east4)BUCKET→ a GCS bucket name (unique), e.g.my-dataflow-bucket-12345SERVICE_ACCOUNT_FILE→ path to service account JSON (optional if using gcloud auth)SERVICE_ACCOUNT_EMAIL→ email of the SA used by Dataflow workers (optional)
3. Local folder structure
my-dataflow-project/
├── main.py # Beam pipeline (uploaded to GCS)
├── udf/
│ └── my_udf.py # UDF (uploaded to GCS if loaded from GCS)
└── launch_dataflow.py # job submitter (run locally)
4. File contents — copy these exact examples
a) udf/my_udf.py
# simple example UDF
def process(line):
# transform input line to upper-case
return line.upper()
b) main.py — pipeline that downloads UDF from GCS
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import importlib.util
import os
import tempfile
class LoadUDF(beam.DoFn):
def __init__(self, udf_gcs_path):
self.udf_gcs_path = udf_gcs_path
def setup(self):
# download udf file from GCS to local path
local_tmp = tempfile.mkdtemp()
local_path = os.path.join(local_tmp, "udf.py")
# Use gsutil to copy the file (workers must have gsutil available)
os.system(f"gsutil cp {self.udf_gcs_path} {local_path}")
spec = importlib.util.spec_from_file_location("udf", local_path)
self.udf = importlib.util.module_from_spec(spec)
spec.loader.exec_module(self.udf)
def process(self, element):
yield self.udf.process(element)
def run():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", required=True)
parser.add_argument("--udf_file", required=True)
args, pipeline_args = parser.parse_known_args()
options = PipelineOptions(pipeline_args, save_main_session=True)
with beam.Pipeline(options=options) as p:
(p
| "Read" >> beam.io.ReadFromText(args.input)
| "ApplyUDF" >> beam.ParDo(LoadUDF(args.udf_file))
| "Write" >> beam.io.WriteToText(args.output)
)
if __name__ == "__main__":
run()
c) launch_dataflow.py — job submitter using Dataflow REST client
import uuid
import json
from google.oauth2 import service_account
from googleapiclient.discovery import build
PROJECT_ID = "YOUR_PROJECT_ID"
REGION = "us-east4"
BUCKET = "YOUR_BUCKET_NAME"
SERVICE_ACCOUNT_FILE = "service-account.json" # or leave to use gcloud default auth
MAIN_PY_GCS = f"gs://{BUCKET}/pipeline/main.py"
UDF_GCS = f"gs://{BUCKET}/udf/my_udf.py"
TEMP_LOCATION = f"gs://{BUCKET}/temp"
STAGING_LOCATION = f"gs://{BUCKET}/staging"
JOB_NAME = f"classic-udf-{uuid.uuid4().hex[:6]}"
# If using service account file
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
dataflow = build("dataflow", "v1b3", credentials=credentials)
job_body = {
"jobName": JOB_NAME,
"parameters": {
"input": f"gs://{BUCKET}/input/input.txt",
"output": f"gs://{BUCKET}/output/result",
"udf_file": UDF_GCS
},
"environment": {
"tempLocation": TEMP_LOCATION,
"stagingLocation": STAGING_LOCATION,
"serviceAccountEmail": f"{PROJECT_ID}@appspot.gserviceaccount.com"
}
}
request = dataflow.projects().locations().templates().launch(
projectId=PROJECT_ID,
location=REGION,
gcsPath=MAIN_PY_GCS,
body=job_body
)
response = request.execute()
print(json.dumps(response, indent=2))
5. Create the GCS bucket (if not exists)
gcloud config set project YOUR_PROJECT_ID
gsutil mb -l us-east4 gs://YOUR_BUCKET_NAME
Use the same region as your Dataflow region for lower latency and fewer surprises.
6. Upload files to GCS (exact commands)
# upload pipeline and udf
gsutil cp main.py gs://YOUR_BUCKET_NAME/pipeline/main.py
gsutil cp udf/my_udf.py gs://YOUR_BUCKET_NAME/udf/my_udf.py
# create an example input file and upload it
echo "hello world" > input.txt
gsutil cp input.txt gs://YOUR_BUCKET_NAME/input/input.txt
7. IAM & Service Account (minimum permissions)
The launcher (the account that creates jobs) and the worker service account require appropriate roles:
| Entity | Minimum Roles |
|---|---|
| Launcher (account that calls Dataflow API) | roles/dataflow.admin (or permissions to create Dataflow jobs) |
| Worker service account (used by Dataflow workers) |
|
# Example: grant worker SA the storage.objectViewer role
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:YOUR_WORKER_SA_EMAIL" \
--role="roles/storage.objectViewer"
8. Launch the job (two ways)
A) Using the Python launcher (recommended)
Edit launch_dataflow.py with correct placeholders, then run:
python launch_dataflow.py
B) Using gcloud CLI (alternate)
gcloud dataflow jobs run my-job-name \
--gcs-location=gs://YOUR_BUCKET_NAME/pipeline/main.py \
--region=us-east4 \
--parameters=input=gs://YOUR_BUCKET_NAME/input/input.txt,output=gs://YOUR_BUCKET_NAME/output/result,udf_file=gs://YOUR_BUCKET_NAME/udf/my_udf.py
9. Monitor job and logs
- GCP Console → Dataflow (select region) — job graph & status.
- Cloud Logging — filter logs by Dataflow job id or
resource.type="dataflow_step". - Check worker logs for startup errors (missing dependencies, permission issues reading GCS, or gsutil not found).
10. Common errors & fixes
- Permission denied reading gs://...
→ Ensure worker SA hasstorage.objectViewer. - Job stuck in STARTING
→ VerifytempLocation&stagingLocationexist and are writable; check project quotas and region availability. - ModuleNotFoundError inside UDF
→ Ensure your UDF is a single Python file or that the worker image contains required libs. For more complex deps use Flex Templates (container). - gsutil not found on worker
→ Some worker images lack gsutil. If so either vendor the UDF into the pipeline bundle or use a Flex Template with a container that includes gsutil.
11. Optional: test locally (quick sanity)
Run the pipeline locally to validate behavior (you may need to adapt LoadUDF to accept local paths):
python main.py --input input.txt --output local-output --udf_file udf/my_udf.py
This helps verify logic before uploading to GCS and launching on Dataflow.
12. Final pre-launch checklist
- [ ]
main.pyuploaded togs://BUCKET/pipeline/main.py - [ ]
udf/my_udf.pyuploaded togs://BUCKET/udf/my_udf.py(if referenced) - [ ] Input file uploaded to
gs://BUCKET/input/input.txt - [ ]
TEMPandSTAGINGset togs://BUCKET/tempandgs://BUCKET/staging - [ ] Launcher configured with correct
PROJECT_IDandSERVICE_ACCOUNT_FILE - [ ] IAM roles granted to launcher SA and worker SA
No comments:
Post a Comment