Monday, 22 December 2025

simple Flex template

Simple Way to Test GCP Dataflow Flex Templates Using Python

Simple Way to Test GCP Dataflow Dynamic (Flex) Templates Using Python

When working with GCP Dataflow Flex Templates, many engineers think they must:

  • Build Docker images
  • Use gcloud
  • Create complex CI pipelines

In reality, you can test a dynamic (Flex) template using a very small Python script. This post shows the simplest and cleanest approach.


What Is a Dynamic (Flex) Template?

A Flex Template allows you to:

  • Pass runtime parameters
  • Use Python UDFs
  • Avoid rebuilding templates for every change

Unlike classic templates, Flex Templates are container-first, even when using Google-provided images.


Option 1: Simplest Logic Test (No Template)

This method only validates your pipeline logic. It does not test the Flex Template itself.


python wordcount.py \
  --runner=DataflowRunner \
  --project=my-project \
  --region=us-east4 \
  --temp_location=gs://my-bucket/temp \
  --staging_location=gs://my-bucket/staging \
  --input=gs://my-bucket/input/input.txt \
  --output=gs://my-bucket/output/wordcount-test

✔ Confirms code works
❌ Does not validate Flex Template JSON


Option 2 (Recommended): Test Flex Template Using Python

This is the best and simplest way to test a dynamic template end-to-end.

Minimal Python Test Script


from googleapiclient.discovery import build
from google.oauth2 import service_account

PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build(
    "dataflow",
    "v1b3",
    credentials=credentials,
    cache_discovery=False
)

body = {
    "launchParameter": {
        "jobName": "flex-test-wordcount",
        "containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
        "parameters": {
            "pythonFile": "gs://my-bucket/pipelines/wordcount.py",
            "requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
            "input": "gs://my-bucket/input/input.txt",
            "output": "gs://my-bucket/output/wordcount-test"
        },
        "environment": {
            "tempLocation": "gs://my-bucket/temp",
            "stagingLocation": "gs://my-bucket/staging"
        }
    }
}

response = (
    dataflow.projects()
    .locations()
    .flexTemplates()
    .launch(
        projectId=PROJECT_ID,
        location=REGION,
        body=body
    )
    .execute()
)

print("Job launched:", response["job"]["id"])

✔ Tests Flex JSON
✔ Tests container
✔ Tests runtime parameters
✔ No Docker
✔ No gcloud


Option 3: Local Development (Fastest Feedback)

For local development, use the DirectRunner:


python wordcount.py \
  --runner=DirectRunner \
  --input=local.txt \
  --output=out

✔ Fast logic testing
❌ No GCP validation


Recommended Testing Flow

Stage Method
Local development DirectRunner
GCP validation DataflowRunner
Template validation Flex Template via Python

Common Issues and Fixes

  • Job stuck in STARTING → Missing compute.networkUser
  • Pipeline fails instantly → Missing requirements.txt
  • Workers fail → Private subnet without NAT
  • Template invalid → Wrong container image

Final Takeaway

The simplest way to test a dynamic Dataflow template is:

  • Use Google’s Flex Template image
  • Launch via flexTemplates().launch()
  • Use a small Python script

No Docker. No gcloud. Fully production-safe.

dataflow

GCP Dataflow Flex Template – Python WordCount (No Docker)

GCP Dataflow Flex Template – Python WordCount (Complete ZIP)

This is a fully working Apache Beam WordCount Flex Template using:

  • ✅ Google-provided Dataflow container image
  • ✅ No Docker build
  • ✅ No gcloud
  • ✅ Dynamic parameters

📦 ZIP Structure

wordcount-flex-template/
├── wordcount.py
├── requirements.txt
└── flex_template.json

1️⃣ wordcount.py


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class WordCountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            "--input",
            type=str,
            help="GCS input file"
        )
        parser.add_value_provider_argument(
            "--output",
            type=str,
            help="GCS output prefix"
        )


def run():
    options = PipelineOptions(save_main_session=True)
    custom_options = options.view_as(WordCountOptions)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read" >> beam.io.ReadFromText(custom_options.input)
            | "Split" >> beam.FlatMap(lambda x: x.split())
            | "PairWithOne" >> beam.Map(lambda x: (x, 1))
            | "Count" >> beam.CombinePerKey(sum)
            | "Format" >> beam.Map(lambda kv: f"{kv[0]}: {kv[1]}")
            | "Write" >> beam.io.WriteToText(custom_options.output)
        )


if __name__ == "__main__":
    run()

2️⃣ requirements.txt


apache-beam[gcp]==2.53.0

Why required?
Google launcher image does NOT auto-install Beam SDK.


3️⃣ flex_template.json


{
  "sdkInfo": {
    "language": "PYTHON"
  },
  "containerSpec": {
    "image": "gcr.io/dataflow-templates-base/python3-template-launcher-base",
    "metadata": {
      "name": "Python WordCount Flex Template",
      "description": "Minimal WordCount Flex Template using Google Dataflow image",
      "parameters": [
        {
          "name": "pythonFile",
          "label": "Python Pipeline File",
          "helpText": "GCS path to the Python pipeline"
        },
        {
          "name": "requirementsFile",
          "label": "Requirements File",
          "helpText": "GCS path to requirements.txt"
        },
        {
          "name": "input",
          "label": "Input File",
          "helpText": "GCS input file"
        },
        {
          "name": "output",
          "label": "Output Prefix",
          "helpText": "GCS output prefix"
        }
      ]
    }
  }
}

4️⃣ Upload to GCS

gs://my-bucket/pipelines/wordcount.py
gs://my-bucket/pipelines/requirements.txt
gs://my-bucket/templates/flex_wordcount.json

5️⃣ Launch via Python API


from googleapiclient.discovery import build
from google.oauth2 import service_account
import uuid

PROJECT_ID = "my-project"
REGION = "us-east4"
SERVICE_ACCOUNT_FILE = "dataflow_sa.json"

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build(
    "dataflow",
    "v1b3",
    credentials=credentials,
    cache_discovery=False
)

job_name = f"wordcount-flex-{uuid.uuid4().hex[:6]}"

body = {
    "launchParameter": {
        "jobName": job_name,
        "containerSpecGcsPath": "gs://my-bucket/templates/flex_wordcount.json",
        "parameters": {
            "pythonFile": "gs://my-bucket/pipelines/wordcount.py",
            "requirementsFile": "gs://my-bucket/pipelines/requirements.txt",
            "input": "gs://my-bucket/input/input.txt",
            "output": "gs://my-bucket/output/wordcount"
        },
        "environment": {
            "serviceAccountEmail": "dataflow-sa@my-project.iam.gserviceaccount.com",
            "tempLocation": "gs://my-bucket/temp",
            "stagingLocation": "gs://my-bucket/staging",
            "maxWorkers": 5
        }
    }
}

response = (
    dataflow.projects()
    .locations()
    .flexTemplates()
    .launch(
        projectId=PROJECT_ID,
        location=REGION,
        body=body
    )
    .execute()
)

print(response)

6️⃣ Required IAM Roles

roles/dataflow.developer
roles/storage.objectAdmin
roles/iam.serviceAccountUser
roles/compute.networkUser

✅ Final Verification

  • ✔ Works with private VPC
  • ✔ No Docker build
  • ✔ No gcloud
  • ✔ Production-safe

🎯 Blog Takeaway

Flex Templates are container-first. Even with Google images, you must supply:

  • Python pipeline
  • requirements.txt
  • flex_template.json

Nothing else is required.

Tuesday, 9 December 2025

Databricks on AWS - Technical Guide

Databricks on AWS - Technical Guide

Databricks on AWS - Technical Guide

1. What is Databricks?

Databricks is a unified data and AI platform built on Apache Spark. It is designed for:

  • Big Data Processing (ETL, analytics)
  • Machine Learning (ML, AI)
  • Data Science and Analytics Exploration
  • Data Lakehouse architecture combining data lakes and warehouses
---

2. Databricks Architecture on AWS

Databricks separates control plane (managed by Databricks) from data plane (your AWS account).

Control Plane

  • Managed by Databricks in Databricks AWS accounts
  • Manages authentication, notebooks, job scheduling, cluster orchestration
  • Serverless for users, no EC2 instances to manage

Data Plane

  • Runs in your AWS account
  • Compute: EC2 instances in your VPC
  • Storage: S3 (Delta Lake), EBS (temporary storage for nodes)
  • Networking: Private subnets, VPC Peering, PrivateLink
Databricks AWS Architecture ---

3. Key AWS Integrations

AWS ServiceUse Case in Databricks
S3Delta Lake storage, raw data, ML datasets
EC2Compute for clusters
EBSTemporary cluster storage (shuffle, logs)
Kinesis / KafkaReal-time streaming ingestion
Redshift / RDS / AuroraData warehousing and relational DB queries
Glue Data CatalogMetadata management for Delta tables
KMSEncrypt S3 buckets and cluster storage
IAMSecure access to S3, KMS, and other services
---

4. Cluster Types & Runtimes

Cluster TypeUse CaseNotes
StandardETL, analyticsOptional auto-scaling
High ConcurrencyMultiple SQL usersOptimized for concurrent queries
Job ClusterBatch job executionTerminate after job completes
ML / GPU ClusterMachine Learning / Deep LearningGPU-enabled EC2 instances

Databricks Runtimes

  • Standard Runtime: Spark workloads
  • ML Runtime: MLlib, TensorFlow, PyTorch, scikit-learn
  • GPU Runtime: Deep learning / model training
  • Photon Engine: Optimized Delta queries
---

5. Storage & Delta Lake

Delta Lake provides:

  • ACID Transactions
  • Time Travel Queries
  • Schema Enforcement
  • Batch + Streaming support
// Mount S3 bucket to DBFS
dbutils.fs.mount(
  source="s3a://my-bucket",
  mount_point="/mnt/mydata",
  extra_configs={"fs.s3a.awsAccessKeyId":"YOUR_KEY","fs.s3a.awsSecretAccessKey":"YOUR_SECRET"}
)
---

6. Data Ingestion & Streaming

// Kafka Example
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers","broker:9092") \
  .option("subscribe","topic_name") \
  .load()
- Supports **Kinesis Data Streams** for AWS-native ingestion - **Checkpointing** in S3 or DBFS to track streaming state - Watermarking for late-arriving data ---

7. ML / AI Workloads

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import mlflow
import mlflow.spark

# Load data
data = spark.read.format("delta").load("/mnt/delta/customer_data")

# Prepare features
assembler = VectorAssembler(inputCols=["age","account_age","num_transactions"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="churn")
pipeline = Pipeline(stages=[assembler, rf])

# Train model
model = pipeline.fit(data)

# Log model to MLflow
mlflow.start_run()
mlflow.spark.log_model(model, "churn_model")
mlflow.end_run()
---

8. Security

  • IAM Roles & Policies: Assign Databricks role for S3, Kinesis access
  • KMS: Encrypt S3 buckets and cluster storage
  • Private Networking: VPC Peering, PrivateLink
  • Audit Logging: CloudTrail, S3 access logs
---

9. Autoscaling & Performance

  • Auto-scaling: min/max worker nodes per cluster
  • Photon Engine: faster Delta table queries
  • Caching: df.cache() for repeated computations
---

10. Sample Data Flow (AWS + Databricks)

Scenario: Real-time e-commerce analytics

  • Ingest customer clickstreams → Kinesis Data Stream
  • Databricks Streaming Cluster processes & cleans data → Delta Lake (S3)
  • Analyze via Databricks SQL / Redshift Spectrum dashboards
  • Train recommendation ML models on Delta Lake
  • Secure via IAM roles, KMS encryption, private VPC
AWS Databricks Data Flow ---

11. Quick Hands-On Use Cases in Databricks on AWS

ETL Pipeline

df = spark.read.json("s3://my-bucket/raw_logs/")
df_clean = df.select("userId","eventType","timestamp").filter(df.eventType!="ignore")
df_clean.write.format("delta").mode("overwrite").save("/mnt/delta/clean_logs")

Streaming Analytics

df_stream = spark.readStream.format("kinesis")\
  .option("streamName","clickstream")\
  .option("region","us-east-1").load()

# Process streaming data
df_clean = df_stream.filter(df_stream.eventType!="ignore")
df_clean.writeStream.format("delta").option("checkpointLocation","/mnt/delta/checkpoint").start("/mnt/delta/streaming_data")

ML Model Training

assembler = VectorAssembler(inputCols=["age","account_age"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="churn")
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(data)
mlflow.spark.log_model(model, "churn_model")
---

References

Databricks Beginner Guide

Databricks Beginner Guide & Use Cases

Databricks Beginner Guide & Use Cases

Basics of Databricks

Databricks is a unified data and AI platform built on Apache Spark. It is used for big data processing, analytics, and machine learning. Databricks simplifies distributed computing and provides a collaborative workspace for Data Engineers, Data Scientists, and Analysts.

Key Features of Databricks

  • Delta Lake: ACID-compliant storage for batch & streaming data.
  • Scalable Clusters: Auto-scaling Spark clusters for big data workloads.
  • MLflow Integration: Experiment tracking and model management.
  • Structured Streaming: Real-time data processing.
  • Notebooks: Support for Python, SQL, Scala, and R in a collaborative workspace.
  • Data Lakehouse: Unified architecture for structured and unstructured data.
  • Integrations: Works with AWS S3, Azure Data Lake Storage, Kafka, and more.

5 Use Cases with Technical Implementation

Use Case Cluster Type / Runtime Notebook Name Technical Implementation
1. ETL Pipeline Standard Spark Cluster ETL_Pipeline
# Load JSON from S3
df = spark.read.json("s3://my-bucket/raw_logs/")
# Clean data
df_clean = df.select("userId","eventType","timestamp").filter(df.eventType!="ignore")
# Save as Delta
df_clean.write.format("delta").mode("overwrite").save("/mnt/delta/clean_logs")
2. Real-Time Streaming Databricks ML / Spark Streaming Runtime Streaming_IoT
# Read from Kafka
streaming_df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","iot_sensors").load()

# Parse JSON & filter anomalies
from pyspark.sql.functions import from_json, col, schema_of_json
json_schema = schema_of_json('{"sensorId":"string","temperature":"double"}')
parsed_df = streaming_df.select(from_json(col("value").cast("string"), json_schema).alias("data")).select("data.*")
anomalies = parsed_df.filter(col("temperature") > 100)

# Write to console
query = anomalies.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
3. ML Model Training Databricks ML Runtime ML_Churn_Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

data = spark.read.format("delta").load("/mnt/delta/customer_data")
assembler = VectorAssembler(inputCols=["age","account_age","num_transactions"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="churn")
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(data)
model.write().overwrite().save("/mnt/models/churn_model")
4. Data Analytics / Exploration Standard Spark Cluster Sales_Analysis
import matplotlib.pyplot as plt

sales_df = spark.read.format("delta").load("/mnt/delta/sales_data")
sales_pd = sales_df.groupBy("month").sum("revenue").toPandas()
plt.plot(sales_pd["month"], sales_pd["sum(revenue)"])
plt.title("Monthly Sales Trend")
plt.xlabel("Month")
plt.ylabel("Revenue")
plt.show()
5. Data Lakehouse / Delta Lake Standard Spark Cluster Delta_Lakehouse
# Load CSV & JSON
csv_df = spark.read.format("csv").option("header","true").load("/mnt/raw/csv_data")
json_df = spark.read.format("json").load("/mnt/raw/json_data")

# Combine & save as Delta
combined_df = csv_df.unionByName(json_df)
combined_df.write.format("delta").mode("overwrite").save("/mnt/delta/combined_data")

# SQL query
spark.sql("SELECT count(*) FROM delta.`/mnt/delta/combined_data`").show()

Technical Flow in Databricks

  1. Create Cluster → select runtime (Spark / ML / GPU if needed).
  2. Mount storage → S3, ADLS, or DBFS.
  3. Create Notebook → Python / SQL / Scala.
  4. Write code → ETL, ML, Streaming, Analytics.
  5. Attach Notebook to Cluster → Run & Monitor.
  6. Save results → Delta Tables, ML Models, Visualizations.

GCP Dataflow Flex Template with Python UDF

GCP Dataflow Flex Template with Python UDF

GCP Dataflow Flex Template with Python UDF

This guide demonstrates how to build and launch a Dataflow Flex Template in Python using a custom UDF. The entire workflow is handled in Python without using any gcloud CLI commands for launching.


1. Folder Structure

Organize your files as follows:

dataflow_flex_udf/
├── main.py
├── my_udf/
│   └── custom_udf.py
├── requirements.txt
├── metadata.json
├── build_flex_template.py
└── launch_flex_template.py

2. Python UDF

Create your UDF function in my_udf/custom_udf.py:


def my_uppercase_udf(text: str) -> str:
    """
    Example UDF to convert text to uppercase
    """
    return text.upper()
---

3. Dataflow Pipeline (main.py)

This pipeline reads input text from GCS, applies the UDF, and writes output back to GCS:


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from my_udf.custom_udf import my_uppercase_udf

def run():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    args, pipeline_args = parser.parse_known_args()

    options = PipelineOptions(pipeline_args, save_main_session=True)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read" >> beam.io.ReadFromText(args.input)
            | "ApplyUDF" >> beam.Map(my_uppercase_udf)
            | "Write" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    run()
---

4. Requirements File (requirements.txt)

apache-beam[gcp]==2.57.0
---

5. Metadata File (metadata.json)

Define the parameters for the Flex Template:


{
  "name": "Python UDF Flex Template",
  "description": "Run Python Beam pipeline with UDF",
  "parameters": [
    { "name": "input", "label": "Input File", "helpText": "GCS path to input" },
    { "name": "output", "label": "Output File", "helpText": "GCS path to output" }
  ]
}
---

6. Build Flex Template in Python (build_flex_template.py)

This script builds the Flex Template JSON using Google’s Python base image:


import subprocess

# Configuration
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
MAIN_PY = "main.py"
REQUIREMENTS = "requirements.txt"
PYTHON_FILES = "."
METADATA = "metadata.json"
LAUNCHER_IMAGE = "gcr.io/dataflow-templates-base/python3-template-launcher-base"

# Build command
cmd = [
    "gcloud", "dataflow", "flex-template", "build", TEMPLATE_JSON_GCS,
    f"--image={LAUNCHER_IMAGE}",
    "--sdk-language=PYTHON",
    f"--metadata-file={METADATA}",
    f"--python-requirements-file={REQUIREMENTS}",
    f"--python-entry-point={MAIN_PY}",
    f"--python-files={PYTHON_FILES}"
]

# Execute
subprocess.run(cmd, check=True)
print("Flex Template JSON created at:", TEMPLATE_JSON_GCS)
Note: Even though we use Python here, building the template requires calling gcloud to generate the JSON. Launching can be done fully via Python API.
---

7. Launch Flex Template via Python (launch_flex_template.py)


import json
import uuid
from googleapiclient.discovery import build
from google.oauth2 import service_account

# Config
PROJECT_ID = "your-project-id"
REGION = "us-central1"
BUCKET = "your-bucket"
TEMPLATE_JSON_GCS = f"gs://{BUCKET}/templates/python_udf_template.json"
SERVICE_ACCOUNT_FILE = "service-account.json"
INPUT_FILE = f"gs://{BUCKET}/input/input.txt"
OUTPUT_FILE = f"gs://{BUCKET}/output/result"

# Authenticate
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build("dataflow", "v1b3", credentials=credentials)

# Unique job name
job_name = f"flex-udf-job-{uuid.uuid4().hex[:6]}"

# Launch request body
request = dataflow.projects().locations().flexTemplates().launch(
    projectId=PROJECT_ID,
    location=REGION,
    body={
        "launchParameter": {
            "containerSpecGcsPath": TEMPLATE_JSON_GCS,
            "jobName": job_name,
            "parameters": {"input": INPUT_FILE, "output": OUTPUT_FILE},
            "environment": {"tempLocation": f"gs://{BUCKET}/temp"}
        }
    }
)

response = request.execute()
print(json.dumps(response, indent=2))
---

8. IAM Permissions Required

  • For building template / launching: roles/dataflow.admin, roles/storage.admin
  • For Dataflow workers: roles/dataflow.worker, roles/storage.objectViewer, roles/storage.objectCreator, roles/logging.logWriter
---

9. How It Works

  1. Build Flex Template JSON: package pipeline + UDF
  2. Upload to GCS: store template JSON
  3. Launch via Python API: start Dataflow job
  4. Pipeline executes: reads input, applies UDF, writes output
---

10. Benefits

  • No custom Docker needed — uses Google-provided Python base image
  • Full Python automation — build + launch + monitoring
  • Supports complex UDFs and multiple Python modules
  • Integrates easily into CI/CD pipelines

GCP Dataflow Classic Template — Python UDF in GCS (Step-by-Step)

GCP Dataflow Classic Template — Python UDF in GCS (Step-by-Step)

GCP Dataflow — Classic Python Template with UDF in GCS (Step-by-Step)

Complete, beginner-friendly guide: which files to upload, exact commands, launcher script, pipeline code, IAM roles, troubleshooting, and a final pre-launch checklist. Copy & paste into your blog or CMS.

1. Quick Overview

In the Classic Dataflow approach you keep two distinct things:

  • main.py — your Beam pipeline (this must be uploaded to GCS; workers download & run it).
  • udf/my_udf.py — optional user-defined function (upload to GCS if main.py loads it from GCS).
  • launch_dataflow.py — local script that calls the Dataflow REST API to create the job (runs on your laptop/CI; do not upload).
Note: The launch script only submits the job. The pipeline code is what runs on Dataflow worker VMs and must be in Cloud Storage (GCS).

2. Replace placeholders first

Before you run any command, decide and/or replace these values:

  • PROJECT_ID → your GCP project id
  • REGION → Dataflow region (e.g. us-east4)
  • BUCKET → a GCS bucket name (unique), e.g. my-dataflow-bucket-12345
  • SERVICE_ACCOUNT_FILE → path to service account JSON (optional if using gcloud auth)
  • SERVICE_ACCOUNT_EMAIL → email of the SA used by Dataflow workers (optional)

3. Local folder structure

my-dataflow-project/
├── main.py                # Beam pipeline (uploaded to GCS)
├── udf/
│   └── my_udf.py          # UDF (uploaded to GCS if loaded from GCS)
└── launch_dataflow.py     # job submitter (run locally)

4. File contents — copy these exact examples

a) udf/my_udf.py

# simple example UDF
def process(line):
    # transform input line to upper-case
    return line.upper()

b) main.py — pipeline that downloads UDF from GCS

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import importlib.util
import os
import tempfile

class LoadUDF(beam.DoFn):
    def __init__(self, udf_gcs_path):
        self.udf_gcs_path = udf_gcs_path

    def setup(self):
        # download udf file from GCS to local path
        local_tmp = tempfile.mkdtemp()
        local_path = os.path.join(local_tmp, "udf.py")

        # Use gsutil to copy the file (workers must have gsutil available)
        os.system(f"gsutil cp {self.udf_gcs_path} {local_path}")

        spec = importlib.util.spec_from_file_location("udf", local_path)
        self.udf = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(self.udf)

    def process(self, element):
        yield self.udf.process(element)

def run():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    parser.add_argument("--udf_file", required=True)
    args, pipeline_args = parser.parse_known_args()

    options = PipelineOptions(pipeline_args, save_main_session=True)

    with beam.Pipeline(options=options) as p:
        (p
         | "Read" >> beam.io.ReadFromText(args.input)
         | "ApplyUDF" >> beam.ParDo(LoadUDF(args.udf_file))
         | "Write" >> beam.io.WriteToText(args.output)
        )

if __name__ == "__main__":
    run()

c) launch_dataflow.py — job submitter using Dataflow REST client

import uuid
import json
from google.oauth2 import service_account
from googleapiclient.discovery import build

PROJECT_ID = "YOUR_PROJECT_ID"
REGION = "us-east4"
BUCKET = "YOUR_BUCKET_NAME"
SERVICE_ACCOUNT_FILE = "service-account.json"  # or leave to use gcloud default auth

MAIN_PY_GCS = f"gs://{BUCKET}/pipeline/main.py"
UDF_GCS = f"gs://{BUCKET}/udf/my_udf.py"
TEMP_LOCATION = f"gs://{BUCKET}/temp"
STAGING_LOCATION = f"gs://{BUCKET}/staging"

JOB_NAME = f"classic-udf-{uuid.uuid4().hex[:6]}"

# If using service account file
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

dataflow = build("dataflow", "v1b3", credentials=credentials)

job_body = {
    "jobName": JOB_NAME,
    "parameters": {
        "input": f"gs://{BUCKET}/input/input.txt",
        "output": f"gs://{BUCKET}/output/result",
        "udf_file": UDF_GCS
    },
    "environment": {
        "tempLocation": TEMP_LOCATION,
        "stagingLocation": STAGING_LOCATION,
        "serviceAccountEmail": f"{PROJECT_ID}@appspot.gserviceaccount.com"
    }
}

request = dataflow.projects().locations().templates().launch(
    projectId=PROJECT_ID,
    location=REGION,
    gcsPath=MAIN_PY_GCS,
    body=job_body
)

response = request.execute()
print(json.dumps(response, indent=2))
Tip: Replace the placeholder values (PROJECT_ID, BUCKET, service account file path) before running the launcher script.

5. Create the GCS bucket (if not exists)

gcloud config set project YOUR_PROJECT_ID
gsutil mb -l us-east4 gs://YOUR_BUCKET_NAME

Use the same region as your Dataflow region for lower latency and fewer surprises.

6. Upload files to GCS (exact commands)

# upload pipeline and udf
gsutil cp main.py gs://YOUR_BUCKET_NAME/pipeline/main.py
gsutil cp udf/my_udf.py gs://YOUR_BUCKET_NAME/udf/my_udf.py

# create an example input file and upload it
echo "hello world" > input.txt
gsutil cp input.txt gs://YOUR_BUCKET_NAME/input/input.txt

7. IAM & Service Account (minimum permissions)

The launcher (the account that creates jobs) and the worker service account require appropriate roles:

EntityMinimum Roles
Launcher (account that calls Dataflow API) roles/dataflow.admin (or permissions to create Dataflow jobs)
Worker service account (used by Dataflow workers)
  • roles/dataflow.worker
  • roles/storage.objectViewer (read UDF & inputs)
  • roles/storage.objectCreator or roles/storage.objectAdmin (if writing outputs)
# Example: grant worker SA the storage.objectViewer role
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
  --member="serviceAccount:YOUR_WORKER_SA_EMAIL" \
  --role="roles/storage.objectViewer"

8. Launch the job (two ways)

A) Using the Python launcher (recommended)

Edit launch_dataflow.py with correct placeholders, then run:

python launch_dataflow.py

B) Using gcloud CLI (alternate)

gcloud dataflow jobs run my-job-name \
  --gcs-location=gs://YOUR_BUCKET_NAME/pipeline/main.py \
  --region=us-east4 \
  --parameters=input=gs://YOUR_BUCKET_NAME/input/input.txt,output=gs://YOUR_BUCKET_NAME/output/result,udf_file=gs://YOUR_BUCKET_NAME/udf/my_udf.py
Note: The Python launcher calls the Dataflow REST API and returns richer JSON job details. Use whichever method fits your workflow.

9. Monitor job and logs

  • GCP Console → Dataflow (select region) — job graph & status.
  • Cloud Logging — filter logs by Dataflow job id or resource.type="dataflow_step".
  • Check worker logs for startup errors (missing dependencies, permission issues reading GCS, or gsutil not found).

10. Common errors & fixes

  • Permission denied reading gs://...
    → Ensure worker SA has storage.objectViewer.
  • Job stuck in STARTING
    → Verify tempLocation & stagingLocation exist and are writable; check project quotas and region availability.
  • ModuleNotFoundError inside UDF
    → Ensure your UDF is a single Python file or that the worker image contains required libs. For more complex deps use Flex Templates (container).
  • gsutil not found on worker
    → Some worker images lack gsutil. If so either vendor the UDF into the pipeline bundle or use a Flex Template with a container that includes gsutil.

11. Optional: test locally (quick sanity)

Run the pipeline locally to validate behavior (you may need to adapt LoadUDF to accept local paths):

python main.py --input input.txt --output local-output --udf_file udf/my_udf.py

This helps verify logic before uploading to GCS and launching on Dataflow.

12. Final pre-launch checklist

  • [ ] main.py uploaded to gs://BUCKET/pipeline/main.py
  • [ ] udf/my_udf.py uploaded to gs://BUCKET/udf/my_udf.py (if referenced)
  • [ ] Input file uploaded to gs://BUCKET/input/input.txt
  • [ ] TEMP and STAGING set to gs://BUCKET/temp and gs://BUCKET/staging
  • [ ] Launcher configured with correct PROJECT_ID and SERVICE_ACCOUNT_FILE
  • [ ] IAM roles granted to launcher SA and worker SA
Written for beginners — adapt paths and service accounts to your project's conventions. Always avoid uploading sensitive keys directly into buckets that are publicly accessible.

Monday, 20 October 2025

Tokenizer

🧩 What is a Tokenizer?

A tokenizer is a text preprocessing tool used in NLP (Natural Language Processing) that converts human-readable text into numbers so that models like BERT or DistilBERT can understand it.

💬 Why do we need it?

Machine learning models cannot understand raw text like:

"I love this movie!"

So, we must convert text → tokens → numbers (IDs).

🔤 Step-by-step example

Let’s see what a tokenizer does with DistilBERT.

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

text = "I love this movie!"
tokens = tokenizer.tokenize(text)
print(tokens)

📊 Output:

['i', 'love', 'this', 'movie', '!']

So, it breaks the sentence into small pieces called tokens.

🔢 Convert tokens to IDs

Next, we convert these words into numeric IDs (the model’s vocabulary).

ids = tokenizer.convert_tokens_to_ids(tokens)
print(ids)

Example:

[1045, 2293, 2023, 3185, 999]

Each number corresponds to a token in BERT’s vocabulary.

Wednesday, 15 October 2025

GCP pubsub data and how to get it

GCP Pub/Sub Subscription JSON & Python Example

GCP Pub/Sub Subscription Example

JSON File

{
  "protoPayload": {
    "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
    "status": {},
    "authenticationInfo": {
      "principalEmail": "admin@gcp-project.iam.gserviceaccount.com",
      "serviceAccountDelegationInfo": []
    },
    "requestMetadata": {
      "callerIp": "35.203.120.14",
      "callerSuppliedUserAgent": "google-cloud-sdk gcloud/463.0.0"
    },
    "serviceName": "pubsub.googleapis.com",
    "methodName": "google.pubsub.v1.Subscriber.CreateSubscription",
    "resourceName": "projects/my-gcp-project/subscriptions/bq_export_subscription",
    "request": {
      "@type": "type.googleapis.com/google.pubsub.v1.Subscription",
      "name": "projects/my-gcp-project/subscriptions/bq_export_subscription",
      "topic": "projects/my-gcp-project/topics/ticket-updates",
      "bigqueryConfig": {
        "table": "projects/my-gcp-project/datasets/pubsub_exports/tables/ticket_updates",
        "writeMetadata": true,
        "useTopicSchema": true,
        "dropUnknownFields": false,
        "state": "ACTIVE",
        "serviceAccountEmail": "pubsub-bq-writer@my-gcp-project.iam.gserviceaccount.com"
      },
      "ackDeadlineSeconds": 30,
      "retainAckedMessages": false,
      "expirationPolicy": {
        "ttl": "2678400s"
      },
      "messageRetentionDuration": "604800s",
      "enableMessageOrdering": false,
      "labels": {
        "env": "prod",
        "team": "data-engineering"
      }
    },
    "response": {
      "@type": "type.googleapis.com/google.pubsub.v1.Subscription",
      "name": "projects/my-gcp-project/subscriptions/bq_export_subscription",
      "topic": "projects/my-gcp-project/topics/ticket-updates",
      "bigqueryConfig": {
        "table": "projects/my-gcp-project/datasets/pubsub_exports/tables/ticket_updates",
        "state": "ACTIVE"
      }
    }
  },
  "insertId": "px1a12bc3d4e56",
  "resource": {
    "type": "pubsub_subscription",
    "labels": {
      "subscription_id": "bq_export_subscription",
      "project_id": "my-gcp-project"
    }
  },
  "timestamp": "2025-10-15T22:03:19.123Z",
  "severity": "INFO",
  "logName": "projects/my-gcp-project/logs/cloudaudit.googleapis.com%2Factivity",
  "operation": {
    "id": "operation-1234567890",
    "producer": "pubsub.googleapis.com",
    "last": true
  },
  "receiveTimestamp": "2025-10-15T22:03:20.456Z"
}

Python Code

from google.cloud import pubsub_v1

def get_subscription_details(project_id: str, subscription_id: str):
    """
    Retrieves a Pub/Sub subscription and prints its BigQuery service account email (if configured).
    """
    # Create a Subscriber client
    subscriber = pubsub_v1.SubscriberClient()

    # Build the fully qualified subscription name
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    print(f"Fetching subscription: {subscription_path}")

    try:
        # Get subscription details
        subscription = subscriber.get_subscription(request={"subscription": subscription_path})

        print("\\n✅ Subscription Details:")
        print(f"Name: {subscription.name}")
        print(f"Topic: {subscription.topic}")
        print(f"Ack Deadline: {subscription.ack_deadline_seconds} seconds")

        # Check for BigQuery configuration
        if subscription.bigquery_config and subscription.bigquery_config.service_account_email:
            print(f"BigQuery Table: {subscription.bigquery_config.table}")
            print(f"Service Account Email: {subscription.bigquery_config.service_account_email}")
        else:
            print("No BigQuery configuration or service account email found for this subscription.")

    except Exception as e:
        print(f"❌ Error fetching subscription: {e}")


if __name__ == "__main__":
    PROJECT_ID = "my-gcp-project"
    SUBSCRIPTION_ID = "bq_export_subscription"
    get_subscription_details(PROJECT_ID, SUBSCRIPTION_ID)

Wednesday, 17 September 2025

Day 1

main.tf


provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

    variable "project_id" {
  description = "Your GCP Project ID"
}
variable "region" {
  default     = "us-central1"
}
    
# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = < <EOT
resource.type="gcs_bucket"
protoPayload.methodName="storage.setIamPermissions"
(protoPayload.serviceData.policyDelta.bindingDeltas.member="allUsers"
 OR protoPayload.serviceData.policyDelta.bindingDeltas.member="allAuthenticatedUsers")
EOT

  unique_writer_identity = true
}

# Give sink permission to publish
resource "google_pubsub_topic_iam_member" "sink_pub" {
  topic  = google_pubsub_topic.log_topic.name
  role   = "roles/pubsub.publisher"
  member = google_logging_project_sink.storage_sink.writer_identity
}

# Storage bucket for function code
resource "google_storage_bucket" "function_bucket" {
  name          = "${var.project_id}-function-src"
  location      = var.region
  force_destroy = true
}

# Upload function zip
resource "google_storage_bucket_object" "function_source" {
  name   = "function-source.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = "function-source.zip"
}

# Cloud Function
resource "google_cloudfunctions_function" "notify_soc" {
  name        = "storage-public-alert"
  runtime     = "python39"
  region      = var.region
  entry_point = "process_pubsub"

  source_archive_bucket = google_storage_bucket.function_bucket.name
  source_archive_object = google_storage_bucket_object.function_source.name

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource   = google_pubsub_topic.log_topic.name
  }

  available_memory_mb = 256
  description         = "Notifies SOC when a bucket is made public"
}

# Allow function to publish to SOC topic
resource "google_pubsub_topic_iam_member" "function_pub" {
  topic  = google_pubsub_topic.soc_alerts.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:${google_cloudfunctions_function.notify_soc.service_account_email}"
}




main.py
import base64
import json
from google.cloud import pubsub_v1

SOC_TOPIC = "soc-alerts"

def process_pubsub(event, context):
    """Triggered when a bucket is made public"""
    if "data" not in event:
        print("No data found in event")
        return

    # Decode log entry
    payload = base64.b64decode(event["data"]).decode("utf-8")
    try:
        log_entry = json.loads(payload)
    except Exception as e:
        print(f"Could not parse log entry: {e}")
        return

    bucket_name = log_entry.get("resource", {}).get("labels", {}).get("bucket_name", "unknown")

    # Create alert message
    message = {
        "alert": "PUBLIC_BUCKET_DETECTED",
        "bucket": bucket_name,
        "log": log_entry
    }

    # Publish to SOC topic
    publisher = pubsub_v1.PublisherClient()
    project_id = log_entry.get("resource", {}).get("labels", {}).get("project_id", "")
    topic_path = publisher.topic_path(project_id, SOC_TOPIC)

    publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
    print(f"⚠️ SOC ALERT: Public bucket detected -> {bucket_name}")
    
    #python -c "import pathlib, shutil; [shutil.rmtree(p) for p in pathlib.Path('.').rglob('__pycache__')]"

  

Monday, 15 September 2025

AWS Data & ETL Training Master Deck

AWS Data & ETL Training Master Deck (Editable)

AWS Data & ETL Training Master Deck (Editable)

10-Day instructor-led hands-on training — outline & slides

Day 1: AWS Basics & Account Setup

  • Slide 1: Title, Duration, Instructor
    Course title slide showing Day 1, total duration for the session, and instructor name.
  • Slide 2: Agenda & Learning Objectives
    List the day's agenda and measurable learning objectives (account setup, billing monitoring, MFA, AWS infra concepts).
  • Slide 3: What is Cloud & Why AWS?
    High-level cloud concepts, benefits of cloud vs on-prem, reasons to choose AWS (services, scale, ecosystem).
  • Slide 4: AWS Global Infrastructure Diagram
    Diagram illustrating Regions, Availability Zones, and Edge Locations with brief notes on use-cases (latency, fault-isolation).
  • Slide 5: AWS Account Setup Steps (screenshots)
    Step-by-step account creation guidance with placeholders for screenshots: sign-up, billing info, support plan, root account safety.
  • Slide 6: Hands-on Demo: Billing alarm, MFA
    Step-by-step technical tasks students must perform in the lab:
    1. Enable IAM Billing Access — Console: Account settings → activate IAM access to billing info.
    2. Create CloudWatch Billing Alarm — Console: CloudWatch → Alarms → Create Alarm → Metric: Billing → Total Estimated Charge; set threshold (e.g. $5) → create SNS topic for email notifications → subscribe student email.
    3. Enable MFA on Root/Users — Console: IAM → Users → select user (or root) → Security credentials → Manage MFA → choose Virtual MFA → scan QR with Authenticator app (Google Authenticator/Authy) → verify codes.
    4. Test Access — Demonstrate logging in with an IAM user and validate MFA prompts; verify billing alarm notification by temporarily lowering threshold or using simulated billing metric if available.
    # Example AWS CLI (for reference - optional) aws cloudwatch put-metric-alarm \ --alarm-name "EstimatedChargesAlarm" \ --metric-name "EstimatedCharges" \ --namespace "AWS/Billing" \ --statistic Maximum \ --period 21600 \ --evaluation-periods 1 \ --threshold 5 \ --comparison-operator GreaterThanOrEqualToThreshold \ --dimensions Name=Currency,Value=USD \ --alarm-actions arn:aws:sns:us-east-1:123456789012:BillingAlerts
  • Slide 7: Summary & Q&A
    Recap key takeaways: cloud fundamentals, AWS infra, account safety practices (MFA, billing alarms). Open floor for questions.

Day 2: IAM & Security

  • Slide 1: Agenda & Objectives
    Outline of day: IAM concepts, hands-on user & group creation, policies, best practices.
  • Slide 2: IAM Concepts (Users, Groups, Roles, Policies)
    Explain IAM building blocks: Users, Groups, Roles, Policies, trust vs permissions.
  • Slide 3: IAM Architecture Diagram
    Diagram showing relationship between identities, roles, STS, and resources.
  • Slide 4: Hands-on: Create IAM user/group, attach policy
    Lab steps for students:
    1. Create an IAM group (e.g., etl-developers).
    2. Create an IAM user (e.g., student01) and add to group.
    3. Create and attach an inline or managed policy (least-privilege example: S3 read/write to a specific bucket).
    4. Test access using AWS CLI with generated access key (recommend temporary credentials or role-based cross-account testing).
  • Slide 5: Best Practices: Least Privilege, MFA
    Guidelines: use roles for services, avoid root, enable MFA, rotate keys, use IAM Access Analyzer, and log with CloudTrail.
  • Slide 6: Summary & Q&A
    Recap and Q&A.

Day 3: Amazon S3 Basics

  • Slide 1: Agenda & Objectives
    Intro to S3, storage classes, basic operations, versioning & lifecycle.
  • Slide 2: S3 Overview (Buckets, Objects, Storage Classes)
    Explain buckets, objects, keys, metadata, and storage classes (Standard, Intelligent-Tiering, IA, Glacier).
  • Slide 3: Versioning & Lifecycle Diagram
    Diagram and examples of versioning and lifecycle rules to transition objects to cheaper storage.
  • Slide 4: Hands-on: Create bucket, upload/download objects
    Lab steps: create bucket, set bucket policy, upload/download via console and CLI, enable versioning.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 4: Amazon S3 Advanced

  • Slide 1: Agenda & Objectives
    Encryption, bucket policies, event notifications and integration with Lambda/SNS/SQS.
  • Slide 2: Encryption & Security (SSE-S3, SSE-KMS, ACL, Bucket Policy)
    Explain server-side encryption options, KMS keys, ACLs vs bucket policies, and public access blocks.
  • Slide 3: Event Notifications Diagram (S3 → Lambda/SNS/SQS)
    Diagram showing S3 event notification flows to Lambda, SNS, and SQS for processing pipelines.
  • Slide 4: Hands-on: Trigger Lambda on S3 upload
    Lab: create Lambda function, add S3 trigger, upload object to test invocation, view CloudWatch logs.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 5: Amazon RDS

  • Slide 1: Agenda & Objectives
    Relational databases on AWS, engines, HA patterns, backups and restores.
  • Slide 2: RDS Overview (Engines, Multi-AZ, Read Replica)
    Discuss supported engines (MySQL, PostgreSQL, Aurora), Multi-AZ, read replicas, and failover behavior.
  • Slide 3: Security & VPC integration Diagram
    Diagram showing RDS inside VPC, subnets, SGs, route for application access, and IAM authentication options.
  • Slide 4: Hands-on: Launch RDS instance, connect & query
    Lab: launch a small RDS instance (free tier if available), configure security group, connect via psql/mysql client, run sample queries.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 6: AWS Glue Basics & Data Catalog

  • Slide 1: Agenda & Objectives
    Intro to Glue, Data Catalog, Crawlers, Jobs and Studio.
  • Slide 2: Glue Architecture Diagram
    Architecture showing Glue interacting with S3, Catalog, and compute (Glue jobs).
  • Slide 3: Glue Components (Catalog, Crawler, Jobs, Studio)
    Explain each component and how they fit into ETL workflows.
  • Slide 4: Hands-on: Catalog S3 CSV/JSON → Glue table
    Lab: create a Glue Crawler to catalogue S3 files and validate the Glue table schema.
  • Slide 5: Query with Athena
    Show how to query Glue cataloged tables using Athena.
  • Slide 6: Summary & Q&A
    Recap and Q&A.

Day 7: AWS Glue Advanced & PySpark ETL

  • Slide 1: Agenda & Objectives
    Advanced Glue topics and PySpark-based ETL jobs.
  • Slide 2: DynamicFrame vs DataFrame Diagram
    Explain differences, when to use DynamicFrame (schema flexibility) vs DataFrame (performance / Spark APIs).
  • Slide 3: PySpark ETL Transformations (filter, join, aggregate)
    Common transformations with examples and notes about performance and partitioning.
  • Slide 4: Hands-on Demo: CSV → Parquet → RDS
    Lab: run a PySpark job to convert CSV to Parquet, partition data, and (optionally) push results to RDS.
  • Slide 5: Sample PySpark ETL Job (code snippet)
    Include a short PySpark snippet in the slide for students to review and run (full code in appendix).
    # PySpark (Glue) snippet - pseudocode df = spark.read.csv("s3://bucket/raw/data.csv", header=True) df = df.filter("status = 'active'") \ .withColumn("event_date", to_date(col("timestamp"))) df.write.partitionBy("event_date").parquet("s3://bucket/processed/")
  • Slide 6: Integration with Athena
    Show how Athena can query the Parquet output using Glue catalog partitions.
  • Slide 7: Summary & Q&A
    Recap and Q&A.

Day 8: Amazon Athena

  • Slide 1: Agenda & Objectives
    Introduce Athena, cost model, and best practices for querying data lakes.
  • Slide 2: Athena Overview & Cost Model
    Explain pay-per-query model (data scanned), partitioning, compression, and reducing cost.
  • Slide 3: Querying Glue tables (SELECT, GROUP BY, partitions)
    Examples for common SQL queries over Glue catalog tables and partition-aware queries.
  • Slide 4: Hands-on: Athena SQL Queries
    Lab: run sample queries, test performance, and measure scanned bytes for cost awareness.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 9: AWS Lambda & CloudWatch

  • Slide 1: Agenda & Objectives
    Serverless compute basics, event-driven architecture, monitoring & observability.
  • Slide 2: Lambda Lifecycle Diagram
    Diagram: cold start, container reuse, concurrency limits.
  • Slide 3: Triggers: S3, Glue, RDS
    Examples of event sources and patterns to invoke Lambda for ETL steps.
  • Slide 4: CloudWatch Metrics, Logs, Alarms
    How to instrument Lambda with logs, custom metrics, and alarms for failure/latency.
  • Slide 5: Hands-on: Lambda triggered by S3
    Lab: deploy a Python Lambda, configure S3 trigger, upload object to test, observe CloudWatch logs.
  • Slide 6: Sample Python Lambda Code
    Example code snippet to include on slide:
    # sample lambda handler def handler(event, context): for record in event['Records']: key = record['s3']['object']['key'] # process object (e.g., read, transform, write) print(f"Processing {key}")
  • Slide 7: Summary & Q&A
    Recap and Q&A.

Day 10: Capstone Project & Wrap-Up

  • Slide 1: Agenda & Objectives
    Overview of final integrated pipeline and evaluation criteria for the capstone.
  • Slide 2: End-to-End ETL Pipeline Diagram (S3 → Glue → Athena → RDS)
    A diagram showing full flow: data ingest → catalog → transform → query → store and monitor.
  • Slide 3: Step-by-Step Demo Script
    Steps for the instructor & students to follow:
    1. Upload CSV to S3
    2. Glue Crawler → Catalog
    3. Glue PySpark ETL → Parquet
    4. Athena Queries
    5. Optional: Load into RDS
    6. CloudWatch Monitoring
  • Slide 4: Summary of Key Takeaways
    Highlight the major learnings from the course and recommended next steps/resources.
  • Slide 5: Final Q&A
    Open discussion, feedback, and next steps for continued learning.
Generated outline • Editable master deck for instructor use — add diagrams, screenshots and code files as needed.

Wednesday, 3 September 2025

Terrafoem Commands


 

Category Command Description
Init & Setup terraform init Initialize Terraform working directory
terraform init -reconfigure Reinitialize and ignore previous backend configs
terraform init -upgrade Reinitialize and upgrade providers/modules
terraform get Download and update modules
Planning terraform plan Show planned changes
terraform plan -out=tfplan Save execution plan to a file
Apply/Destroy terraform apply Apply changes with confirmation
terraform apply tfplan Apply using a saved plan file
terraform apply -auto-approve Apply without manual approval
terraform destroy Destroy infrastructure with confirmation
terraform destroy -auto-approve Destroy without confirmation
terraform destroy -target=aws_instance.example Destroy specific resource
Validate & Format terraform validate Validate configuration syntax
terraform fmt Format Terraform files
terraform fmt -recursive Format files in all subdirectories
Output terraform output Show output variables
terraform output -json Show outputs in JSON format
State Management terraform show Show full state or plan content
terraform state list List all resources in the state file
terraform state show <resource> Show specific resource details
terraform state pull Download current state file
terraform state push Upload local state file (used with care)
terraform refresh Update state with real infrastructure
terraform taint <resource> Mark a resource for recreation
terraform untaint <resource> Remove taint from a resource
Workspace Management terraform workspace list List all workspaces
terraform workspace new <name> Create new workspace (e.g., dev, prod)
terraform workspace select <name> Switch to another workspace
terraform workspace delete <name> Delete a workspace
Debugging & Visuals TF_LOG=DEBUG terraform plan Enable debug logging
TF_LOG_PATH=log.txt terraform apply Save logs to a file
terraform graph | dot -Tpng > graph.png Visualize resource graph (Graphviz needed)
Terraform Cloud terraform login Authenticate to Terraform Cloud
terraform logout Remove local credentials
terraform state push Manually upload state file to remote

Saturday, 30 August 2025

GCP-TF-Log-sync-CloudFunction

GCP Public Bucket Alert Setup

🚨 Google Cloud – Public Bucket Alert (Terraform + Cloud Function)

Main.tf

provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = 

variables.tf

variable "project_id" {
  description = "Your GCP Project ID"
}
variable "region" {
  default     = "us-central1"
}

🔹 Cloud Function Code (main.py)

import base64
import json
from google.cloud import pubsub_v1

SOC_TOPIC = "soc-alerts"

def process_pubsub(event, context):
    """Triggered when a bucket is made public"""
    if "data" not in event:
        print("No data found in event")
        return

    # Decode log entry
    payload = base64.b64decode(event["data"]).decode("utf-8")
    try:
        log_entry = json.loads(payload)
    except Exception as e:
        print(f"Could not parse log entry: {e}")
        return

    bucket_name = log_entry.get("resource", {}).get("labels", {}).get("bucket_name", "unknown")

    # Create alert message
    message = {
        "alert": "PUBLIC_BUCKET_DETECTED",
        "bucket": bucket_name,
        "log": log_entry
    }

    # Publish to SOC topic
    publisher = pubsub_v1.PublisherClient()
    project_id = log_entry.get("resource", {}).get("labels", {}).get("project_id", "")
    topic_path = publisher.topic_path(project_id, SOC_TOPIC)

    publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
    print(f"⚠️ SOC ALERT: Public bucket detected -> {bucket_name}")

requirements.txt

google-cloud-pubsub

🔹 Windows Packaging

Compress-Archive -Path main.py, requirements.txt -DestinationPath function-source.zip -Force

🔹 Deployment Steps

  1. Enable APIs:
  2. gcloud services enable pubsub.googleapis.com logging.googleapis.com cloudfunctions.googleapis.com
  3. Deploy Terraform:
  4. terraform init
    terraform apply
  5. Test by making a bucket public:
  6. gsutil iam ch allUsers:objectViewer gs://<your-bucket>

→ This will trigger Cloud Logging → Pub/Sub → Cloud Function → SOC Pub/Sub topic.

✅ Result

This setup works exactly like AWS S3 Public Bucket Alerts, but implemented in Google Cloud.

Terraform - main.tf (Full)

main.tf


provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = <<EOT
resource.type="gcs_bucket"
protoPayload.methodName="storage.setIamPermissions"
(protoPayload.serviceData.policyDelta.bindingDeltas.member="allUsers"
 OR protoPayload.serviceData.policyDelta.bindingDeltas.member="allAuthenticatedUsers")
EOT

  unique_writer_identity = true
}

# Give sink permission to publish
resource "google_pubsub_topic_iam_member" "sink_pub" {
  topic  = google_pubsub_topic.log_topic.name
  role   = "roles/pubsub.publisher"
  member = google_logging_project_sink.storage_sink.writer_identity
}

# Storage bucket for function code
resource "google_storage_bucket" "function_bucket" {
  name          = "${var.project_id}-function-src"
  location      = var.region
  force_destroy = true
}

# Upload function zip
resource "google_storage_bucket_object" "function_source" {
  name   = "function-source.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = "function-source.zip"
}

# Cloud Function
resource "google_cloudfunctions_function" "notify_soc" {
  name        = "storage-public-alert"
  runtime     = "python39"
  region      = var.region
  entry_point = "process_pubsub"

  source_archive_bucket = google_storage_bucket.function_bucket.name
  source_archive_object = google_storage_bucket_object.function_source.name

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource   = google_pubsub_topic.log_topic.name
  }

  available_memory_mb = 256
  description         = "Notifies SOC when a bucket is made public"
}

# Allow function to publish to SOC topic
resource "google_pubsub_topic_iam_member" "function_pub" {
  topic  = google_pubsub_topic.soc_alerts.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:${google_cloudfunctions_function.notify_soc.service_account_email}"
}
  

Thursday, 28 August 2025

GCP 1

GCP Terraform Example — Compute + Storage + Firewall

GCP Terraform Example

This page contains a ready-to-use Terraform configuration that creates a small environment on Google Cloud: a Storage Bucket, a Firewall rule (like an AWS security group), and a small Compute Engine VM (e2-micro) suitable for testing. The configuration intentionally uses values that are easy to change for compliance or security.

Important: Replace YOUR_PROJECT_ID and ensure key.json points to your service account JSON credentials. Keep credentials secret and do not commit them to source control.

Terraform configuration

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project     = "YOUR_PROJECT_ID"
  region      = "us-central1"      # free tier region
  zone        = "us-central1-a"    # free tier zone
  credentials = file("key.json")   # your service account key
}

# --------------------------
# Storage Bucket (Free Tier)
# --------------------------
resource "google_storage_bucket" "demo_bucket" {
  name     = "my-demo-bucket-${random_id.rand.hex}"
  location = "US"

  storage_class = "STANDARD"
  force_destroy = true
  uniform_bucket_level_access = true
}

# --------------------------
# Firewall (Like Security Group)
# --------------------------
resource "google_compute_firewall" "default_allow_ssh" {
  name    = "allow-ssh"
  network = "default"

  allow {
    protocol = "tcp"
    ports    = ["22"]
  }

  source_ranges = ["0.0.0.0/0"] # 🚨 Open SSH to world (not safe for prod)
  target_tags   = ["ssh-allowed"]
}

# --------------------------
# Compute Instance (Free Tier)
# --------------------------
resource "google_compute_instance" "demo_vm" {
  name         = "demo-vm"
  machine_type = "e2-micro"  # ✅ Always Free tier machine type
  zone         = "us-central1-a"

  tags = ["ssh-allowed"]

  boot_disk {
    initialize_params {
      image = "debian-cloud/debian-11"
      size  = 30   # ✅ Free tier gives you 30GB Persistent Disk
    }
  }

  network_interface {
    network = "default"
    access_config {
      # Ephemeral public IP (free)
    }
  }

  metadata_startup_script = <<-EOT
    #!/bin/bash
    echo "Hello from Terraform VM" > /var/tmp/startup.txt
  EOT
}

# --------------------------
# Random ID for bucket name
# --------------------------
resource "random_id" "rand" {
  byte_length = 4
}

Quick run instructions

  1. Install and configure GCP SDK / Terraform.
  2. Place your service-account JSON next to main.tf as key.json, or update credentials path.
  3. Initialize Terraform:
    terraform init
  4. Preview changes:
    terraform plan -out=tfplan
  5. Apply (create resources):
    terraform apply tfplan
  6. Cleanup:
    terraform destroy -auto-approve

Fields & notes

FieldNotes
machine_type = "e2-micro"Always-free eligible machine in some regions (use us-central1).
source_ranges = ["0.0.0.0/0"]Opens SSH to the world — acceptable for quick tests but change to your IP for safety.
force_destroy = trueAllows bucket deletion even when it contains objects — useful for cleanup automation.
credentials = file("key.json")Terraform reads your service account key directly — no need to run gcloud auth (unless you want to).

Safety tips

  • Prefer restricting SSH source_ranges to your IP (e.g. ["203.0.113.4/32"]).
  • Verify billing is enabled on the project; free-tier still requires billing account attached.
  • Do not commit key.json to version control.

Want this as files?

If you’d like, I can package main.tf and a small README into a downloadable .zip you can extract and run locally — tell me and I’ll prepare it.

OPA basics

OPA AWS Terraform Policy - And, Or, Not

OPA AWS Terraform Policy Example (And, Or, Not)

This example demonstrates how to use and, or, and not operators in Rego v1 syntax for AWS Terraform plans.

1. Mock Terraform Plan JSON (aws-plan.json)

{
  "resource_changes": [
    {
      "address": "aws_s3_bucket.demo",
      "type": "aws_s3_bucket",
      "change": {
        "after": {
          "acl": "public-read",
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null
        }
      }
    },
    {
      "address": "aws_instance.demo",
      "type": "aws_instance",
      "change": {
        "after": {
          "instance_type": "t2.micro",
          "associate_public_ip_address": true,
          "ebs_optimized": false,
          "ebs_block_device": [
            { "device_name": "/dev/sda1", "encrypted": false }
          ]
        }
      }
    },
    {
      "address": "aws_security_group.demo",
      "type": "aws_security_group",
      "change": {
        "after": {
          "ingress": [
            { "from_port": 22, "to_port": 22, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
          ]
        }
      }
    }
  ]
}

2. S3 Policy (policy/s3.rego)

package terraform.s3

default deny = []

# Deny if ACL is public OR versioning not enabled
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_s3_bucket"
  rc.change.after.acl == "public-read" or not rc.change.after.versioning.enabled
  msg := sprintf("S3 bucket %s is public OR lacks versioning", [rc.address])
}

# Deny if server-side encryption is missing AND bucket is public
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_s3_bucket"
  rc.change.after.acl == "public-read" and not rc.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %s is public AND unencrypted", [rc.address])
}

3. EC2 Policy (policy/ec2.rego)

package terraform.ec2

default deny = []

# Deny if instance type is t2.micro OR has a public IP
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  rc.change.after.instance_type == "t2.micro" or rc.change.after.associate_public_ip_address
  msg := sprintf("EC2 %s is t2.micro OR has public IP", [rc.address])
}

# Deny if instance is NOT EBS optimized
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  not rc.change.after.ebs_optimized
  msg := sprintf("EC2 %s is not EBS optimized", [rc.address])
}

# Deny if any EBS volume is NOT encrypted
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  vol := rc.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %s has unencrypted volume %s", [rc.address, vol.device_name])
}

4. Security Group Policy (policy/sg.rego)

package terraform.sg

default deny = []

# Deny if SG allows SSH OR RDP from world
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_security_group"
  ing := rc.change.after.ingress[_]
  (ing.from_port == 22 or ing.from_port == 3389)
  ing.cidr_blocks[_] == "0.0.0.0/0"
  msg := sprintf("Security Group %s allows SSH or RDP from world", [rc.address])
}

5. Run OPA Evaluation

opa eval -i aws-plan.json \
-d policy/s3.rego \
-d policy/ec2.rego \
-d policy/sg.rego \
"data.terraform"

Expected violations:

  • S3 bucket is public OR lacks versioning
  • S3 bucket is public AND unencrypted
  • EC2 is t2.micro OR has public IP
  • EC2 is not EBS optimized
  • EC2 has unencrypted volume
  • Security Group allows SSH or RDP from world

Wednesday, 27 August 2025

Policies use the new OPA 1.0+ syntax

Advanced OPA Policies for AWS Terraform (Mock Plan)

Advanced OPA Policies for AWS Terraform (Mock Plan)

This guide demonstrates complex OPA (Open Policy Agent) policies for AWS Terraform plans using a mock plan JSON. It includes cross-resource checks (EC2 ↔ Security Groups, EC2 ↔ EBS), S3 best practices, IAM least privilege, a single policy entrypoint to evaluate everything at once, and runner scripts (Python & PowerShell).

1) Directory Layout

C:\OPA_Advanced\
│
├── terraform-plan.json     # Mock Terraform plan JSON (intentionally violating several policies)
└── policy\
    ├── main.rego           # Single entrypoint aggregating all denials
    ├── ec2_complex.rego    # EC2 + SG + EBS cross checks
    ├── s3_complex.rego     # S3 best-practice checks
    ├── iam_complex.rego    # IAM least privilege checks
    └── sg_simple.rego      # SG hygiene (used by EC2 cross-checks)
Note: This is a mock plan; keys/IDs are simplified so cross-references are easy. In real plans, resource IDs are computed and you’ll often join using address, type, and name or inspect after_unknown and data sources.

2) Mock Terraform Plan JSON (terraform-plan.json)

This plan intentionally violates multiple controls to showcase policy failures.

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_s3_bucket.app_bucket": {
      "type": "aws_s3_bucket",
      "name": "app_bucket",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "bucket": "my-app-public-bucket",
          "acl": "public-read",
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null,
          "tags": { "Team": "Platform" }
        }
      }
    },

    "aws_security_group.web": {
      "type": "aws_security_group",
      "name": "web",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "web-sg",
          "description": "Web security group",
          "ingress": [
            { "from_port": 22,   "to_port": 22,   "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
            { "from_port": 3389, "to_port": 3389, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
            { "from_port": 80,   "to_port": 80,   "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
          ],
          "egress": [
            { "from_port": 0, "to_port": 0, "protocol": "-1", "cidr_blocks": ["0.0.0.0/0"] }
          ],
          "tags": { "Environment": "Prod" }
        }
      }
    },

    "aws_iam_role.ec2_role": {
      "type": "aws_iam_role",
      "name": "ec2_role",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "ec2-role",
          "assume_role_policy": {
            "Version": "2012-10-17",
            "Statement": [
              { "Effect": "Allow", "Principal": { "Service": "*" }, "Action": "sts:AssumeRole" }
            ]
          },
          "tags": { "Owner": "Alice" }
        }
      }
    },

    "aws_iam_policy.too_broad": {
      "type": "aws_iam_policy",
      "name": "too_broad",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "AllowEverything",
          "policy": {
            "Version": "2012-10-17",
            "Statement": [
              { "Effect": "Allow", "Action": "*", "Resource": "*" }
            ]
          }
        }
      }
    },

    "aws_instance.web1": {
      "type": "aws_instance",
      "name": "web1",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ami": "ami-12345678",
          "instance_type": "t3.small",
          "ebs_optimized": false,
          "associate_public_ip_address": true,
          "iam_instance_profile": "ec2-role",
          "vpc_security_group_ids": ["aws_security_group.web"],
          "ebs_block_device": [
            { "device_name": "/dev/sda1", "volume_size": 30, "volume_type": "standard", "encrypted": false }
          ],
          "tags": { "Environment": "Prod", "Service": "frontend" }
        }
      }
    }
  }
}

3) OPA Policies (OPA v1.0+ syntax)

3.1 Main Aggregator (policy/main.rego)

Single entrypoint so you can evaluate everything at once.

package terraform

import data.terraform.ec2_complex
import data.terraform.s3_complex
import data.terraform.iam_complex
import data.terraform.sg_simple

# Aggregate all denials into one set
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }

3.2 EC2 + SG + EBS Cross Checks (policy/ec2_complex.rego)

package terraform.ec2_complex

# Helper: iterate all EC2 instances in the plan
ec2s[ec2] if {
  some r
  ec2 := input.resource_changes[r]
  ec2.type == "aws_instance"
}

# Helper: look up SG resource by "id" (mock uses address-style id)
sg_by_id(id) := sg if {
  sg := input.resource_changes[id]
  sg.type == "aws_security_group"
}

# --- Deny rules ---

# 1) EC2 must be EBS-optimized
deny contains msg if {
  ec2 := ec2s[_]
  not ec2.change.after.ebs_optimized
  msg := sprintf("EC2 %v is not EBS optimized", [ec2.name])
}

# 2) EC2 must not have public IP
deny contains msg if {
  ec2 := ec2s[_]
  ec2.change.after.associate_public_ip_address
  msg := sprintf("EC2 %v has a public IP assigned", [ec2.name])
}

# 3) EC2 must have IAM instance profile (role)
deny contains msg if {
  ec2 := ec2s[_]
  not ec2.change.after.iam_instance_profile
  msg := sprintf("EC2 %v does not have an IAM instance profile attached", [ec2.name])
}

# 4) All attached EBS volumes must be encrypted
deny contains msg if {
  ec2 := ec2s[_]
  vol := ec2.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %v has unencrypted EBS volume %v", [ec2.name, vol.device_name])
}

# 5) If Environment=Prod, EBS volume types must be gp3 or io1
deny contains msg if {
  ec2 := ec2s[_]
  ec2.change.after.tags.Environment == "Prod"
  vol := ec2.change.after.ebs_block_device[_]
  not (vol.volume_type == "gp3" or vol.volume_type == "io1")
  msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [ec2.name, vol.volume_type, vol.device_name])
}

# 6) EC2's attached SGs must not allow SSH (22) or RDP (3389) from 0.0.0.0/0
deny contains msg if {
  ec2 := ec2s[_]
  sg_id := ec2.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}

deny contains msg if {
  ec2 := ec2s[_]
  sg_id := ec2.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}

3.3 S3 Best Practices (policy/s3_complex.rego)

package terraform.s3_complex

# Helper: all S3 buckets
buckets[b] if {
  some r
  b := input.resource_changes[r]
  b.type == "aws_s3_bucket"
}

# Require versioning
deny contains msg if {
  b := buckets[_]
  not b.change.after.versioning.enabled
  msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}

# Require server-side encryption
deny contains msg if {
  b := buckets[_]
  not b.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}

# Block public ACLs
deny contains msg if {
  b := buckets[_]
  b.change.after.acl == "public-read"
  msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}

# Require mandatory tags (Environment and Owner)
deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Environment
  msg := sprintf("S3 bucket %v: missing required tag 'Environment'", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Owner
  msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}

3.4 IAM Least Privilege (policy/iam_complex.rego)

package terraform.iam_complex

# Helper: roles and policies
roles[r] if {
  some k
  r := input.resource_changes[k]
  r.type == "aws_iam_role"
}

policies[p] if {
  some k
  p := input.resource_changes[k]
  p.type == "aws_iam_policy"
}

# 1) AssumeRole principal must not be wildcard
deny contains msg if {
  r := roles[_]
  stmt := r.change.after.assume_role_policy.Statement[_]
  stmt.Principal.Service == "*"
  msg := sprintf("IAM Role %v: assume-role Principal.Service is wildcard '*'", [r.name])
}

# 2) Managed policy statements must not have Action '*'
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Action == "*"
  msg := sprintf("IAM Policy %v: uses Action '*'", [p.name])
}

# 3) Managed policy statements must not have Resource '*'
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Resource == "*"
  msg := sprintf("IAM Policy %v: uses Resource '*'", [p.name])
}

3.5 Security Group Hygiene (Standalone) (policy/sg_simple.rego)

package terraform.sg_simple

# Helper: all SGs
sgs[sg] if {
  some r
  sg := input.resource_changes[r]
  sg.type == "aws_security_group"
}

# Disallow 0.0.0.0/0 for SSH and RDP anywhere in the plan (defense in depth)
deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}

deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}

4) Python Runner (opa_check.py)

import subprocess, json, os, sys

plan = "terraform-plan.json"
policy_dir = "policy"

# Evaluate a single entrypoint: data.terraform.deny (from main.rego)
cmd = ["opa", "eval", "-i", plan, "-d", policy_dir, "--format", "json", "data.terraform.deny"]

try:
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
    print("OPA evaluation failed:", e.stderr or e.stdout)
    sys.exit(2)

data = json.loads(result.stdout)

violations = []
for res in data.get("result", []):
    for expr in res.get("expressions", []):
        val = expr.get("value")
        if isinstance(val, list):
            violations.extend(val)

if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
    sys.exit(1)
else:
    print("✅ All policies passed.")
    sys.exit(0)

5) PowerShell Runner (opa_check.ps1)

$Plan = "C:\OPA_Advanced\terraform-plan.json"
$PolicyDir = "C:\OPA_Advanced\policy"

$Cmd = @("opa", "eval", "-i", $Plan, "-d", $PolicyDir, "--format", "json", "data.terraform.deny")

try {
  $OutRaw = & $Cmd
} catch {
  Write-Error "OPA eval failed. Ensure opa.exe in PATH and inputs exist."
  exit 2
}

$Out = $OutRaw | ConvertFrom-Json
$Violations = @()

foreach ($r in $Out.result) {
  foreach ($e in $r.expressions) {
    if ($e.value) { $Violations += $e.value }
  }
}

if ($Violations.Count -gt 0) {
  Write-Host "❌ Policy violations found:" -ForegroundColor Red
  $Violations | ForEach-Object { Write-Host "- $_" -ForegroundColor Yellow }
  exit 1
} else {
  Write-Host "✅ All policies passed." -ForegroundColor Green
  exit 0
}

6) How to Run

  1. Create C:\OPA_Advanced and the policy\ folder.
  2. Save the JSON and the four .rego files (plus main.rego) into the paths above.
  3. Run a quick syntax check (OPA 1.0+):
    opa check policy\
  4. Evaluate:
    opa eval -i terraform-plan.json -d policy "data.terraform.deny"
  5. Or use the provided Python/PowerShell scripts.

7) Expected Violations (from this Mock Plan)

  • S3: versioning disabled
  • S3: encryption missing
  • S3: ACL public-read
  • S3: missing tag Owner
  • IAM Role: assume-role principal uses wildcard *
  • IAM Policy: Action is *
  • IAM Policy: Resource is *
  • EC2: not EBS optimized
  • EC2: public IP assigned
  • EC2: unencrypted EBS volume /dev/sda1
  • EC2 (Prod): non-compliant EBS volume type standard
  • EC2/SG: SSH (22) from 0.0.0.0/0
  • EC2/SG: RDP (3389) from 0.0.0.0/0
  • SG global: (defense in depth) open SSH/RDP

8) Tips for Real Plans

  • Real plan JSON often nests values and uses computed IDs; join resources via type/name or address.
  • Inspect after_unknown if values are computed and not known at plan time.
  • Consider separate warn vs deny sets for advisory controls.
  • Add default allow := true style patterns if using allow/deny models together.