Wednesday, 17 September 2025

Day 1

main.tf


provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

    variable "project_id" {
  description = "Your GCP Project ID"
}
variable "region" {
  default     = "us-central1"
}
    
# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = < <EOT
resource.type="gcs_bucket"
protoPayload.methodName="storage.setIamPermissions"
(protoPayload.serviceData.policyDelta.bindingDeltas.member="allUsers"
 OR protoPayload.serviceData.policyDelta.bindingDeltas.member="allAuthenticatedUsers")
EOT

  unique_writer_identity = true
}

# Give sink permission to publish
resource "google_pubsub_topic_iam_member" "sink_pub" {
  topic  = google_pubsub_topic.log_topic.name
  role   = "roles/pubsub.publisher"
  member = google_logging_project_sink.storage_sink.writer_identity
}

# Storage bucket for function code
resource "google_storage_bucket" "function_bucket" {
  name          = "${var.project_id}-function-src"
  location      = var.region
  force_destroy = true
}

# Upload function zip
resource "google_storage_bucket_object" "function_source" {
  name   = "function-source.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = "function-source.zip"
}

# Cloud Function
resource "google_cloudfunctions_function" "notify_soc" {
  name        = "storage-public-alert"
  runtime     = "python39"
  region      = var.region
  entry_point = "process_pubsub"

  source_archive_bucket = google_storage_bucket.function_bucket.name
  source_archive_object = google_storage_bucket_object.function_source.name

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource   = google_pubsub_topic.log_topic.name
  }

  available_memory_mb = 256
  description         = "Notifies SOC when a bucket is made public"
}

# Allow function to publish to SOC topic
resource "google_pubsub_topic_iam_member" "function_pub" {
  topic  = google_pubsub_topic.soc_alerts.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:${google_cloudfunctions_function.notify_soc.service_account_email}"
}




main.py
import base64
import json
from google.cloud import pubsub_v1

SOC_TOPIC = "soc-alerts"

def process_pubsub(event, context):
    """Triggered when a bucket is made public"""
    if "data" not in event:
        print("No data found in event")
        return

    # Decode log entry
    payload = base64.b64decode(event["data"]).decode("utf-8")
    try:
        log_entry = json.loads(payload)
    except Exception as e:
        print(f"Could not parse log entry: {e}")
        return

    bucket_name = log_entry.get("resource", {}).get("labels", {}).get("bucket_name", "unknown")

    # Create alert message
    message = {
        "alert": "PUBLIC_BUCKET_DETECTED",
        "bucket": bucket_name,
        "log": log_entry
    }

    # Publish to SOC topic
    publisher = pubsub_v1.PublisherClient()
    project_id = log_entry.get("resource", {}).get("labels", {}).get("project_id", "")
    topic_path = publisher.topic_path(project_id, SOC_TOPIC)

    publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
    print(f"⚠️ SOC ALERT: Public bucket detected -> {bucket_name}")
    
    #python -c "import pathlib, shutil; [shutil.rmtree(p) for p in pathlib.Path('.').rglob('__pycache__')]"

  

Monday, 15 September 2025

AWS Data & ETL Training Master Deck

AWS Data & ETL Training Master Deck (Editable)

AWS Data & ETL Training Master Deck (Editable)

10-Day instructor-led hands-on training — outline & slides

Day 1: AWS Basics & Account Setup

  • Slide 1: Title, Duration, Instructor
    Course title slide showing Day 1, total duration for the session, and instructor name.
  • Slide 2: Agenda & Learning Objectives
    List the day's agenda and measurable learning objectives (account setup, billing monitoring, MFA, AWS infra concepts).
  • Slide 3: What is Cloud & Why AWS?
    High-level cloud concepts, benefits of cloud vs on-prem, reasons to choose AWS (services, scale, ecosystem).
  • Slide 4: AWS Global Infrastructure Diagram
    Diagram illustrating Regions, Availability Zones, and Edge Locations with brief notes on use-cases (latency, fault-isolation).
  • Slide 5: AWS Account Setup Steps (screenshots)
    Step-by-step account creation guidance with placeholders for screenshots: sign-up, billing info, support plan, root account safety.
  • Slide 6: Hands-on Demo: Billing alarm, MFA
    Step-by-step technical tasks students must perform in the lab:
    1. Enable IAM Billing Access — Console: Account settings → activate IAM access to billing info.
    2. Create CloudWatch Billing Alarm — Console: CloudWatch → Alarms → Create Alarm → Metric: Billing → Total Estimated Charge; set threshold (e.g. $5) → create SNS topic for email notifications → subscribe student email.
    3. Enable MFA on Root/Users — Console: IAM → Users → select user (or root) → Security credentials → Manage MFA → choose Virtual MFA → scan QR with Authenticator app (Google Authenticator/Authy) → verify codes.
    4. Test Access — Demonstrate logging in with an IAM user and validate MFA prompts; verify billing alarm notification by temporarily lowering threshold or using simulated billing metric if available.
    # Example AWS CLI (for reference - optional) aws cloudwatch put-metric-alarm \ --alarm-name "EstimatedChargesAlarm" \ --metric-name "EstimatedCharges" \ --namespace "AWS/Billing" \ --statistic Maximum \ --period 21600 \ --evaluation-periods 1 \ --threshold 5 \ --comparison-operator GreaterThanOrEqualToThreshold \ --dimensions Name=Currency,Value=USD \ --alarm-actions arn:aws:sns:us-east-1:123456789012:BillingAlerts
  • Slide 7: Summary & Q&A
    Recap key takeaways: cloud fundamentals, AWS infra, account safety practices (MFA, billing alarms). Open floor for questions.

Day 2: IAM & Security

  • Slide 1: Agenda & Objectives
    Outline of day: IAM concepts, hands-on user & group creation, policies, best practices.
  • Slide 2: IAM Concepts (Users, Groups, Roles, Policies)
    Explain IAM building blocks: Users, Groups, Roles, Policies, trust vs permissions.
  • Slide 3: IAM Architecture Diagram
    Diagram showing relationship between identities, roles, STS, and resources.
  • Slide 4: Hands-on: Create IAM user/group, attach policy
    Lab steps for students:
    1. Create an IAM group (e.g., etl-developers).
    2. Create an IAM user (e.g., student01) and add to group.
    3. Create and attach an inline or managed policy (least-privilege example: S3 read/write to a specific bucket).
    4. Test access using AWS CLI with generated access key (recommend temporary credentials or role-based cross-account testing).
  • Slide 5: Best Practices: Least Privilege, MFA
    Guidelines: use roles for services, avoid root, enable MFA, rotate keys, use IAM Access Analyzer, and log with CloudTrail.
  • Slide 6: Summary & Q&A
    Recap and Q&A.

Day 3: Amazon S3 Basics

  • Slide 1: Agenda & Objectives
    Intro to S3, storage classes, basic operations, versioning & lifecycle.
  • Slide 2: S3 Overview (Buckets, Objects, Storage Classes)
    Explain buckets, objects, keys, metadata, and storage classes (Standard, Intelligent-Tiering, IA, Glacier).
  • Slide 3: Versioning & Lifecycle Diagram
    Diagram and examples of versioning and lifecycle rules to transition objects to cheaper storage.
  • Slide 4: Hands-on: Create bucket, upload/download objects
    Lab steps: create bucket, set bucket policy, upload/download via console and CLI, enable versioning.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 4: Amazon S3 Advanced

  • Slide 1: Agenda & Objectives
    Encryption, bucket policies, event notifications and integration with Lambda/SNS/SQS.
  • Slide 2: Encryption & Security (SSE-S3, SSE-KMS, ACL, Bucket Policy)
    Explain server-side encryption options, KMS keys, ACLs vs bucket policies, and public access blocks.
  • Slide 3: Event Notifications Diagram (S3 → Lambda/SNS/SQS)
    Diagram showing S3 event notification flows to Lambda, SNS, and SQS for processing pipelines.
  • Slide 4: Hands-on: Trigger Lambda on S3 upload
    Lab: create Lambda function, add S3 trigger, upload object to test invocation, view CloudWatch logs.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 5: Amazon RDS

  • Slide 1: Agenda & Objectives
    Relational databases on AWS, engines, HA patterns, backups and restores.
  • Slide 2: RDS Overview (Engines, Multi-AZ, Read Replica)
    Discuss supported engines (MySQL, PostgreSQL, Aurora), Multi-AZ, read replicas, and failover behavior.
  • Slide 3: Security & VPC integration Diagram
    Diagram showing RDS inside VPC, subnets, SGs, route for application access, and IAM authentication options.
  • Slide 4: Hands-on: Launch RDS instance, connect & query
    Lab: launch a small RDS instance (free tier if available), configure security group, connect via psql/mysql client, run sample queries.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 6: AWS Glue Basics & Data Catalog

  • Slide 1: Agenda & Objectives
    Intro to Glue, Data Catalog, Crawlers, Jobs and Studio.
  • Slide 2: Glue Architecture Diagram
    Architecture showing Glue interacting with S3, Catalog, and compute (Glue jobs).
  • Slide 3: Glue Components (Catalog, Crawler, Jobs, Studio)
    Explain each component and how they fit into ETL workflows.
  • Slide 4: Hands-on: Catalog S3 CSV/JSON → Glue table
    Lab: create a Glue Crawler to catalogue S3 files and validate the Glue table schema.
  • Slide 5: Query with Athena
    Show how to query Glue cataloged tables using Athena.
  • Slide 6: Summary & Q&A
    Recap and Q&A.

Day 7: AWS Glue Advanced & PySpark ETL

  • Slide 1: Agenda & Objectives
    Advanced Glue topics and PySpark-based ETL jobs.
  • Slide 2: DynamicFrame vs DataFrame Diagram
    Explain differences, when to use DynamicFrame (schema flexibility) vs DataFrame (performance / Spark APIs).
  • Slide 3: PySpark ETL Transformations (filter, join, aggregate)
    Common transformations with examples and notes about performance and partitioning.
  • Slide 4: Hands-on Demo: CSV → Parquet → RDS
    Lab: run a PySpark job to convert CSV to Parquet, partition data, and (optionally) push results to RDS.
  • Slide 5: Sample PySpark ETL Job (code snippet)
    Include a short PySpark snippet in the slide for students to review and run (full code in appendix).
    # PySpark (Glue) snippet - pseudocode df = spark.read.csv("s3://bucket/raw/data.csv", header=True) df = df.filter("status = 'active'") \ .withColumn("event_date", to_date(col("timestamp"))) df.write.partitionBy("event_date").parquet("s3://bucket/processed/")
  • Slide 6: Integration with Athena
    Show how Athena can query the Parquet output using Glue catalog partitions.
  • Slide 7: Summary & Q&A
    Recap and Q&A.

Day 8: Amazon Athena

  • Slide 1: Agenda & Objectives
    Introduce Athena, cost model, and best practices for querying data lakes.
  • Slide 2: Athena Overview & Cost Model
    Explain pay-per-query model (data scanned), partitioning, compression, and reducing cost.
  • Slide 3: Querying Glue tables (SELECT, GROUP BY, partitions)
    Examples for common SQL queries over Glue catalog tables and partition-aware queries.
  • Slide 4: Hands-on: Athena SQL Queries
    Lab: run sample queries, test performance, and measure scanned bytes for cost awareness.
  • Slide 5: Summary & Q&A
    Recap and Q&A.

Day 9: AWS Lambda & CloudWatch

  • Slide 1: Agenda & Objectives
    Serverless compute basics, event-driven architecture, monitoring & observability.
  • Slide 2: Lambda Lifecycle Diagram
    Diagram: cold start, container reuse, concurrency limits.
  • Slide 3: Triggers: S3, Glue, RDS
    Examples of event sources and patterns to invoke Lambda for ETL steps.
  • Slide 4: CloudWatch Metrics, Logs, Alarms
    How to instrument Lambda with logs, custom metrics, and alarms for failure/latency.
  • Slide 5: Hands-on: Lambda triggered by S3
    Lab: deploy a Python Lambda, configure S3 trigger, upload object to test, observe CloudWatch logs.
  • Slide 6: Sample Python Lambda Code
    Example code snippet to include on slide:
    # sample lambda handler def handler(event, context): for record in event['Records']: key = record['s3']['object']['key'] # process object (e.g., read, transform, write) print(f"Processing {key}")
  • Slide 7: Summary & Q&A
    Recap and Q&A.

Day 10: Capstone Project & Wrap-Up

  • Slide 1: Agenda & Objectives
    Overview of final integrated pipeline and evaluation criteria for the capstone.
  • Slide 2: End-to-End ETL Pipeline Diagram (S3 → Glue → Athena → RDS)
    A diagram showing full flow: data ingest → catalog → transform → query → store and monitor.
  • Slide 3: Step-by-Step Demo Script
    Steps for the instructor & students to follow:
    1. Upload CSV to S3
    2. Glue Crawler → Catalog
    3. Glue PySpark ETL → Parquet
    4. Athena Queries
    5. Optional: Load into RDS
    6. CloudWatch Monitoring
  • Slide 4: Summary of Key Takeaways
    Highlight the major learnings from the course and recommended next steps/resources.
  • Slide 5: Final Q&A
    Open discussion, feedback, and next steps for continued learning.
Generated outline • Editable master deck for instructor use — add diagrams, screenshots and code files as needed.

Wednesday, 3 September 2025

Terrafoem Commands


 

Category Command Description
Init & Setup terraform init Initialize Terraform working directory
terraform init -reconfigure Reinitialize and ignore previous backend configs
terraform init -upgrade Reinitialize and upgrade providers/modules
terraform get Download and update modules
Planning terraform plan Show planned changes
terraform plan -out=tfplan Save execution plan to a file
Apply/Destroy terraform apply Apply changes with confirmation
terraform apply tfplan Apply using a saved plan file
terraform apply -auto-approve Apply without manual approval
terraform destroy Destroy infrastructure with confirmation
terraform destroy -auto-approve Destroy without confirmation
terraform destroy -target=aws_instance.example Destroy specific resource
Validate & Format terraform validate Validate configuration syntax
terraform fmt Format Terraform files
terraform fmt -recursive Format files in all subdirectories
Output terraform output Show output variables
terraform output -json Show outputs in JSON format
State Management terraform show Show full state or plan content
terraform state list List all resources in the state file
terraform state show <resource> Show specific resource details
terraform state pull Download current state file
terraform state push Upload local state file (used with care)
terraform refresh Update state with real infrastructure
terraform taint <resource> Mark a resource for recreation
terraform untaint <resource> Remove taint from a resource
Workspace Management terraform workspace list List all workspaces
terraform workspace new <name> Create new workspace (e.g., dev, prod)
terraform workspace select <name> Switch to another workspace
terraform workspace delete <name> Delete a workspace
Debugging & Visuals TF_LOG=DEBUG terraform plan Enable debug logging
TF_LOG_PATH=log.txt terraform apply Save logs to a file
terraform graph | dot -Tpng > graph.png Visualize resource graph (Graphviz needed)
Terraform Cloud terraform login Authenticate to Terraform Cloud
terraform logout Remove local credentials
terraform state push Manually upload state file to remote

Saturday, 30 August 2025

GCP-TF-Log-sync-CloudFunction

GCP Public Bucket Alert Setup

🚨 Google Cloud – Public Bucket Alert (Terraform + Cloud Function)

Main.tf

provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = 

variables.tf

variable "project_id" {
  description = "Your GCP Project ID"
}
variable "region" {
  default     = "us-central1"
}

šŸ”¹ Cloud Function Code (main.py)

import base64
import json
from google.cloud import pubsub_v1

SOC_TOPIC = "soc-alerts"

def process_pubsub(event, context):
    """Triggered when a bucket is made public"""
    if "data" not in event:
        print("No data found in event")
        return

    # Decode log entry
    payload = base64.b64decode(event["data"]).decode("utf-8")
    try:
        log_entry = json.loads(payload)
    except Exception as e:
        print(f"Could not parse log entry: {e}")
        return

    bucket_name = log_entry.get("resource", {}).get("labels", {}).get("bucket_name", "unknown")

    # Create alert message
    message = {
        "alert": "PUBLIC_BUCKET_DETECTED",
        "bucket": bucket_name,
        "log": log_entry
    }

    # Publish to SOC topic
    publisher = pubsub_v1.PublisherClient()
    project_id = log_entry.get("resource", {}).get("labels", {}).get("project_id", "")
    topic_path = publisher.topic_path(project_id, SOC_TOPIC)

    publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
    print(f"⚠️ SOC ALERT: Public bucket detected -> {bucket_name}")

requirements.txt

google-cloud-pubsub

šŸ”¹ Windows Packaging

Compress-Archive -Path main.py, requirements.txt -DestinationPath function-source.zip -Force

šŸ”¹ Deployment Steps

  1. Enable APIs:
  2. gcloud services enable pubsub.googleapis.com logging.googleapis.com cloudfunctions.googleapis.com
  3. Deploy Terraform:
  4. terraform init
    terraform apply
  5. Test by making a bucket public:
  6. gsutil iam ch allUsers:objectViewer gs://<your-bucket>

→ This will trigger Cloud Logging → Pub/Sub → Cloud Function → SOC Pub/Sub topic.

✅ Result

This setup works exactly like AWS S3 Public Bucket Alerts, but implemented in Google Cloud.

Terraform - main.tf (Full)

main.tf


provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = <<EOT
resource.type="gcs_bucket"
protoPayload.methodName="storage.setIamPermissions"
(protoPayload.serviceData.policyDelta.bindingDeltas.member="allUsers"
 OR protoPayload.serviceData.policyDelta.bindingDeltas.member="allAuthenticatedUsers")
EOT

  unique_writer_identity = true
}

# Give sink permission to publish
resource "google_pubsub_topic_iam_member" "sink_pub" {
  topic  = google_pubsub_topic.log_topic.name
  role   = "roles/pubsub.publisher"
  member = google_logging_project_sink.storage_sink.writer_identity
}

# Storage bucket for function code
resource "google_storage_bucket" "function_bucket" {
  name          = "${var.project_id}-function-src"
  location      = var.region
  force_destroy = true
}

# Upload function zip
resource "google_storage_bucket_object" "function_source" {
  name   = "function-source.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = "function-source.zip"
}

# Cloud Function
resource "google_cloudfunctions_function" "notify_soc" {
  name        = "storage-public-alert"
  runtime     = "python39"
  region      = var.region
  entry_point = "process_pubsub"

  source_archive_bucket = google_storage_bucket.function_bucket.name
  source_archive_object = google_storage_bucket_object.function_source.name

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource   = google_pubsub_topic.log_topic.name
  }

  available_memory_mb = 256
  description         = "Notifies SOC when a bucket is made public"
}

# Allow function to publish to SOC topic
resource "google_pubsub_topic_iam_member" "function_pub" {
  topic  = google_pubsub_topic.soc_alerts.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:${google_cloudfunctions_function.notify_soc.service_account_email}"
}
  

Thursday, 28 August 2025

GCP 1

GCP Terraform Example — Compute + Storage + Firewall

GCP Terraform Example

This page contains a ready-to-use Terraform configuration that creates a small environment on Google Cloud: a Storage Bucket, a Firewall rule (like an AWS security group), and a small Compute Engine VM (e2-micro) suitable for testing. The configuration intentionally uses values that are easy to change for compliance or security.

Important: Replace YOUR_PROJECT_ID and ensure key.json points to your service account JSON credentials. Keep credentials secret and do not commit them to source control.

Terraform configuration

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project     = "YOUR_PROJECT_ID"
  region      = "us-central1"      # free tier region
  zone        = "us-central1-a"    # free tier zone
  credentials = file("key.json")   # your service account key
}

# --------------------------
# Storage Bucket (Free Tier)
# --------------------------
resource "google_storage_bucket" "demo_bucket" {
  name     = "my-demo-bucket-${random_id.rand.hex}"
  location = "US"

  storage_class = "STANDARD"
  force_destroy = true
  uniform_bucket_level_access = true
}

# --------------------------
# Firewall (Like Security Group)
# --------------------------
resource "google_compute_firewall" "default_allow_ssh" {
  name    = "allow-ssh"
  network = "default"

  allow {
    protocol = "tcp"
    ports    = ["22"]
  }

  source_ranges = ["0.0.0.0/0"] # 🚨 Open SSH to world (not safe for prod)
  target_tags   = ["ssh-allowed"]
}

# --------------------------
# Compute Instance (Free Tier)
# --------------------------
resource "google_compute_instance" "demo_vm" {
  name         = "demo-vm"
  machine_type = "e2-micro"  # ✅ Always Free tier machine type
  zone         = "us-central1-a"

  tags = ["ssh-allowed"]

  boot_disk {
    initialize_params {
      image = "debian-cloud/debian-11"
      size  = 30   # ✅ Free tier gives you 30GB Persistent Disk
    }
  }

  network_interface {
    network = "default"
    access_config {
      # Ephemeral public IP (free)
    }
  }

  metadata_startup_script = <<-EOT
    #!/bin/bash
    echo "Hello from Terraform VM" > /var/tmp/startup.txt
  EOT
}

# --------------------------
# Random ID for bucket name
# --------------------------
resource "random_id" "rand" {
  byte_length = 4
}

Quick run instructions

  1. Install and configure GCP SDK / Terraform.
  2. Place your service-account JSON next to main.tf as key.json, or update credentials path.
  3. Initialize Terraform:
    terraform init
  4. Preview changes:
    terraform plan -out=tfplan
  5. Apply (create resources):
    terraform apply tfplan
  6. Cleanup:
    terraform destroy -auto-approve

Fields & notes

FieldNotes
machine_type = "e2-micro"Always-free eligible machine in some regions (use us-central1).
source_ranges = ["0.0.0.0/0"]Opens SSH to the world — acceptable for quick tests but change to your IP for safety.
force_destroy = trueAllows bucket deletion even when it contains objects — useful for cleanup automation.
credentials = file("key.json")Terraform reads your service account key directly — no need to run gcloud auth (unless you want to).

Safety tips

  • Prefer restricting SSH source_ranges to your IP (e.g. ["203.0.113.4/32"]).
  • Verify billing is enabled on the project; free-tier still requires billing account attached.
  • Do not commit key.json to version control.

Want this as files?

If you’d like, I can package main.tf and a small README into a downloadable .zip you can extract and run locally — tell me and I’ll prepare it.

OPA basics

OPA AWS Terraform Policy - And, Or, Not

OPA AWS Terraform Policy Example (And, Or, Not)

This example demonstrates how to use and, or, and not operators in Rego v1 syntax for AWS Terraform plans.

1. Mock Terraform Plan JSON (aws-plan.json)

{
  "resource_changes": [
    {
      "address": "aws_s3_bucket.demo",
      "type": "aws_s3_bucket",
      "change": {
        "after": {
          "acl": "public-read",
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null
        }
      }
    },
    {
      "address": "aws_instance.demo",
      "type": "aws_instance",
      "change": {
        "after": {
          "instance_type": "t2.micro",
          "associate_public_ip_address": true,
          "ebs_optimized": false,
          "ebs_block_device": [
            { "device_name": "/dev/sda1", "encrypted": false }
          ]
        }
      }
    },
    {
      "address": "aws_security_group.demo",
      "type": "aws_security_group",
      "change": {
        "after": {
          "ingress": [
            { "from_port": 22, "to_port": 22, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
          ]
        }
      }
    }
  ]
}

2. S3 Policy (policy/s3.rego)

package terraform.s3

default deny = []

# Deny if ACL is public OR versioning not enabled
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_s3_bucket"
  rc.change.after.acl == "public-read" or not rc.change.after.versioning.enabled
  msg := sprintf("S3 bucket %s is public OR lacks versioning", [rc.address])
}

# Deny if server-side encryption is missing AND bucket is public
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_s3_bucket"
  rc.change.after.acl == "public-read" and not rc.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %s is public AND unencrypted", [rc.address])
}

3. EC2 Policy (policy/ec2.rego)

package terraform.ec2

default deny = []

# Deny if instance type is t2.micro OR has a public IP
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  rc.change.after.instance_type == "t2.micro" or rc.change.after.associate_public_ip_address
  msg := sprintf("EC2 %s is t2.micro OR has public IP", [rc.address])
}

# Deny if instance is NOT EBS optimized
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  not rc.change.after.ebs_optimized
  msg := sprintf("EC2 %s is not EBS optimized", [rc.address])
}

# Deny if any EBS volume is NOT encrypted
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  vol := rc.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %s has unencrypted volume %s", [rc.address, vol.device_name])
}

4. Security Group Policy (policy/sg.rego)

package terraform.sg

default deny = []

# Deny if SG allows SSH OR RDP from world
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_security_group"
  ing := rc.change.after.ingress[_]
  (ing.from_port == 22 or ing.from_port == 3389)
  ing.cidr_blocks[_] == "0.0.0.0/0"
  msg := sprintf("Security Group %s allows SSH or RDP from world", [rc.address])
}

5. Run OPA Evaluation

opa eval -i aws-plan.json \
-d policy/s3.rego \
-d policy/ec2.rego \
-d policy/sg.rego \
"data.terraform"

Expected violations:

  • S3 bucket is public OR lacks versioning
  • S3 bucket is public AND unencrypted
  • EC2 is t2.micro OR has public IP
  • EC2 is not EBS optimized
  • EC2 has unencrypted volume
  • Security Group allows SSH or RDP from world

Wednesday, 27 August 2025

Policies use the new OPA 1.0+ syntax

Advanced OPA Policies for AWS Terraform (Mock Plan)

Advanced OPA Policies for AWS Terraform (Mock Plan)

This guide demonstrates complex OPA (Open Policy Agent) policies for AWS Terraform plans using a mock plan JSON. It includes cross-resource checks (EC2 ↔ Security Groups, EC2 ↔ EBS), S3 best practices, IAM least privilege, a single policy entrypoint to evaluate everything at once, and runner scripts (Python & PowerShell).

1) Directory Layout

C:\OPA_Advanced\
│
├── terraform-plan.json     # Mock Terraform plan JSON (intentionally violating several policies)
└── policy\
    ├── main.rego           # Single entrypoint aggregating all denials
    ├── ec2_complex.rego    # EC2 + SG + EBS cross checks
    ├── s3_complex.rego     # S3 best-practice checks
    ├── iam_complex.rego    # IAM least privilege checks
    └── sg_simple.rego      # SG hygiene (used by EC2 cross-checks)
Note: This is a mock plan; keys/IDs are simplified so cross-references are easy. In real plans, resource IDs are computed and you’ll often join using address, type, and name or inspect after_unknown and data sources.

2) Mock Terraform Plan JSON (terraform-plan.json)

This plan intentionally violates multiple controls to showcase policy failures.

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_s3_bucket.app_bucket": {
      "type": "aws_s3_bucket",
      "name": "app_bucket",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "bucket": "my-app-public-bucket",
          "acl": "public-read",
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null,
          "tags": { "Team": "Platform" }
        }
      }
    },

    "aws_security_group.web": {
      "type": "aws_security_group",
      "name": "web",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "web-sg",
          "description": "Web security group",
          "ingress": [
            { "from_port": 22,   "to_port": 22,   "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
            { "from_port": 3389, "to_port": 3389, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
            { "from_port": 80,   "to_port": 80,   "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
          ],
          "egress": [
            { "from_port": 0, "to_port": 0, "protocol": "-1", "cidr_blocks": ["0.0.0.0/0"] }
          ],
          "tags": { "Environment": "Prod" }
        }
      }
    },

    "aws_iam_role.ec2_role": {
      "type": "aws_iam_role",
      "name": "ec2_role",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "ec2-role",
          "assume_role_policy": {
            "Version": "2012-10-17",
            "Statement": [
              { "Effect": "Allow", "Principal": { "Service": "*" }, "Action": "sts:AssumeRole" }
            ]
          },
          "tags": { "Owner": "Alice" }
        }
      }
    },

    "aws_iam_policy.too_broad": {
      "type": "aws_iam_policy",
      "name": "too_broad",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "AllowEverything",
          "policy": {
            "Version": "2012-10-17",
            "Statement": [
              { "Effect": "Allow", "Action": "*", "Resource": "*" }
            ]
          }
        }
      }
    },

    "aws_instance.web1": {
      "type": "aws_instance",
      "name": "web1",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ami": "ami-12345678",
          "instance_type": "t3.small",
          "ebs_optimized": false,
          "associate_public_ip_address": true,
          "iam_instance_profile": "ec2-role",
          "vpc_security_group_ids": ["aws_security_group.web"],
          "ebs_block_device": [
            { "device_name": "/dev/sda1", "volume_size": 30, "volume_type": "standard", "encrypted": false }
          ],
          "tags": { "Environment": "Prod", "Service": "frontend" }
        }
      }
    }
  }
}

3) OPA Policies (OPA v1.0+ syntax)

3.1 Main Aggregator (policy/main.rego)

Single entrypoint so you can evaluate everything at once.

package terraform

import data.terraform.ec2_complex
import data.terraform.s3_complex
import data.terraform.iam_complex
import data.terraform.sg_simple

# Aggregate all denials into one set
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }

3.2 EC2 + SG + EBS Cross Checks (policy/ec2_complex.rego)

package terraform.ec2_complex

# Helper: iterate all EC2 instances in the plan
ec2s[ec2] if {
  some r
  ec2 := input.resource_changes[r]
  ec2.type == "aws_instance"
}

# Helper: look up SG resource by "id" (mock uses address-style id)
sg_by_id(id) := sg if {
  sg := input.resource_changes[id]
  sg.type == "aws_security_group"
}

# --- Deny rules ---

# 1) EC2 must be EBS-optimized
deny contains msg if {
  ec2 := ec2s[_]
  not ec2.change.after.ebs_optimized
  msg := sprintf("EC2 %v is not EBS optimized", [ec2.name])
}

# 2) EC2 must not have public IP
deny contains msg if {
  ec2 := ec2s[_]
  ec2.change.after.associate_public_ip_address
  msg := sprintf("EC2 %v has a public IP assigned", [ec2.name])
}

# 3) EC2 must have IAM instance profile (role)
deny contains msg if {
  ec2 := ec2s[_]
  not ec2.change.after.iam_instance_profile
  msg := sprintf("EC2 %v does not have an IAM instance profile attached", [ec2.name])
}

# 4) All attached EBS volumes must be encrypted
deny contains msg if {
  ec2 := ec2s[_]
  vol := ec2.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %v has unencrypted EBS volume %v", [ec2.name, vol.device_name])
}

# 5) If Environment=Prod, EBS volume types must be gp3 or io1
deny contains msg if {
  ec2 := ec2s[_]
  ec2.change.after.tags.Environment == "Prod"
  vol := ec2.change.after.ebs_block_device[_]
  not (vol.volume_type == "gp3" or vol.volume_type == "io1")
  msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [ec2.name, vol.volume_type, vol.device_name])
}

# 6) EC2's attached SGs must not allow SSH (22) or RDP (3389) from 0.0.0.0/0
deny contains msg if {
  ec2 := ec2s[_]
  sg_id := ec2.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}

deny contains msg if {
  ec2 := ec2s[_]
  sg_id := ec2.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}

3.3 S3 Best Practices (policy/s3_complex.rego)

package terraform.s3_complex

# Helper: all S3 buckets
buckets[b] if {
  some r
  b := input.resource_changes[r]
  b.type == "aws_s3_bucket"
}

# Require versioning
deny contains msg if {
  b := buckets[_]
  not b.change.after.versioning.enabled
  msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}

# Require server-side encryption
deny contains msg if {
  b := buckets[_]
  not b.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}

# Block public ACLs
deny contains msg if {
  b := buckets[_]
  b.change.after.acl == "public-read"
  msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}

# Require mandatory tags (Environment and Owner)
deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Environment
  msg := sprintf("S3 bucket %v: missing required tag 'Environment'", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Owner
  msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}

3.4 IAM Least Privilege (policy/iam_complex.rego)

package terraform.iam_complex

# Helper: roles and policies
roles[r] if {
  some k
  r := input.resource_changes[k]
  r.type == "aws_iam_role"
}

policies[p] if {
  some k
  p := input.resource_changes[k]
  p.type == "aws_iam_policy"
}

# 1) AssumeRole principal must not be wildcard
deny contains msg if {
  r := roles[_]
  stmt := r.change.after.assume_role_policy.Statement[_]
  stmt.Principal.Service == "*"
  msg := sprintf("IAM Role %v: assume-role Principal.Service is wildcard '*'", [r.name])
}

# 2) Managed policy statements must not have Action '*'
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Action == "*"
  msg := sprintf("IAM Policy %v: uses Action '*'", [p.name])
}

# 3) Managed policy statements must not have Resource '*'
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Resource == "*"
  msg := sprintf("IAM Policy %v: uses Resource '*'", [p.name])
}

3.5 Security Group Hygiene (Standalone) (policy/sg_simple.rego)

package terraform.sg_simple

# Helper: all SGs
sgs[sg] if {
  some r
  sg := input.resource_changes[r]
  sg.type == "aws_security_group"
}

# Disallow 0.0.0.0/0 for SSH and RDP anywhere in the plan (defense in depth)
deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}

deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}

4) Python Runner (opa_check.py)

import subprocess, json, os, sys

plan = "terraform-plan.json"
policy_dir = "policy"

# Evaluate a single entrypoint: data.terraform.deny (from main.rego)
cmd = ["opa", "eval", "-i", plan, "-d", policy_dir, "--format", "json", "data.terraform.deny"]

try:
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
    print("OPA evaluation failed:", e.stderr or e.stdout)
    sys.exit(2)

data = json.loads(result.stdout)

violations = []
for res in data.get("result", []):
    for expr in res.get("expressions", []):
        val = expr.get("value")
        if isinstance(val, list):
            violations.extend(val)

if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
    sys.exit(1)
else:
    print("✅ All policies passed.")
    sys.exit(0)

5) PowerShell Runner (opa_check.ps1)

$Plan = "C:\OPA_Advanced\terraform-plan.json"
$PolicyDir = "C:\OPA_Advanced\policy"

$Cmd = @("opa", "eval", "-i", $Plan, "-d", $PolicyDir, "--format", "json", "data.terraform.deny")

try {
  $OutRaw = & $Cmd
} catch {
  Write-Error "OPA eval failed. Ensure opa.exe in PATH and inputs exist."
  exit 2
}

$Out = $OutRaw | ConvertFrom-Json
$Violations = @()

foreach ($r in $Out.result) {
  foreach ($e in $r.expressions) {
    if ($e.value) { $Violations += $e.value }
  }
}

if ($Violations.Count -gt 0) {
  Write-Host "❌ Policy violations found:" -ForegroundColor Red
  $Violations | ForEach-Object { Write-Host "- $_" -ForegroundColor Yellow }
  exit 1
} else {
  Write-Host "✅ All policies passed." -ForegroundColor Green
  exit 0
}

6) How to Run

  1. Create C:\OPA_Advanced and the policy\ folder.
  2. Save the JSON and the four .rego files (plus main.rego) into the paths above.
  3. Run a quick syntax check (OPA 1.0+):
    opa check policy\
  4. Evaluate:
    opa eval -i terraform-plan.json -d policy "data.terraform.deny"
  5. Or use the provided Python/PowerShell scripts.

7) Expected Violations (from this Mock Plan)

  • S3: versioning disabled
  • S3: encryption missing
  • S3: ACL public-read
  • S3: missing tag Owner
  • IAM Role: assume-role principal uses wildcard *
  • IAM Policy: Action is *
  • IAM Policy: Resource is *
  • EC2: not EBS optimized
  • EC2: public IP assigned
  • EC2: unencrypted EBS volume /dev/sda1
  • EC2 (Prod): non-compliant EBS volume type standard
  • EC2/SG: SSH (22) from 0.0.0.0/0
  • EC2/SG: RDP (3389) from 0.0.0.0/0
  • SG global: (defense in depth) open SSH/RDP

8) Tips for Real Plans

  • Real plan JSON often nests values and uses computed IDs; join resources via type/name or address.
  • Inspect after_unknown if values are computed and not known at plan time.
  • Consider separate warn vs deny sets for advisory controls.
  • Add default allow := true style patterns if using allow/deny models together.

GCP - OPA basics

GCP Terraform + OPA Policy Validation

GCP Terraform Infrastructure with OPA Policy Validation

This document demonstrates how to create GCP infrastructure (Compute, Storage, Firewall) using Terraform, validate it with Open Policy Agent (OPA), and run Terraform apply only if policies pass.

1. Directory Structure

GCP_OPA_Project/
│
├── terraform/              # Terraform configuration
│   └── main.tf
├── policy/                 # OPA policies
│   ├── main.rego
│   ├── gcs.rego
│   ├── compute.rego
│   ├── firewall.rego
│   └── iam_gcp.rego
├── plan.json               # Terraform plan in JSON (generated)
└── validate_apply_gcp.py  # Python orchestrator script

2. Terraform Configuration (main.tf)

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project     = "YOUR_PROJECT_ID"
  region      = "us-central1"
  zone        = "us-central1-a"
  credentials = file("c:\\test\\credial\\key.json")
}

resource "random_id" "rand" { byte_length = 4 }

resource "google_storage_bucket" "demo_bucket" {
  name     = "my-demo-bucket-${random_id.rand.hex}"
  location = "US"
  storage_class = "STANDARD"
  force_destroy = true
  uniform_bucket_level_access = true
}

resource "google_compute_firewall" "default_allow_ssh" {
  name    = "allow-ssh"
  network = "default"
  allow {
    protocol = "tcp"
    ports    = ["22"]
  }
  source_ranges = ["0.0.0.0/0"]
  target_tags   = ["ssh-allowed"]
}

resource "google_compute_instance" "demo_vm" {
  name         = "demo-vm"
  machine_type = "e2-micro"
  zone         = "us-central1-a"
  tags = ["ssh-allowed"]
  boot_disk {
    initialize_params {
      image = "debian-cloud/debian-11"
      size  = 30
    }
  }
  network_interface {
    network = "default"
    access_config { }
  }
  metadata_startup_script = <<-EOT
    #!/bin/bash
    echo "Hello from Terraform VM" > /var/tmp/startup.txt
  EOT
}

3. OPA Policies

Main Aggregator (policy/main.rego)

package terraform

import data.terraform.gcs
import data.terraform.compute
import data.terraform.firewall
import data.terraform.iam_gcp

deny contains msg if { msg := gcs.deny[_] }
deny contains msg if { msg := compute.deny[_] }
deny contains msg if { msg := firewall.deny[_] }
deny contains msg if { msg := iam_gcp.deny[_] }

GCS Bucket (policy/gcs.rego)

package terraform.gcs
buckets[b] if { some r; b := input.resource_changes[r]; b.type == "google_storage_bucket" }

deny contains msg if { 
  b := buckets[_]; 
  b.change.after.acl == "public-read"; 
  msg := sprintf("GCS bucket %v: acl is public-read", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.versioning.enabled; 
  msg := sprintf("GCS bucket %v: versioning not enabled", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.encryption; 
  msg := sprintf("GCS bucket %v: encryption not configured", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.uniform_bucket_level_access; 
  msg := sprintf("GCS bucket %v: uniform_bucket_level_access not enabled", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.labels.Owner; 
  msg := sprintf("GCS bucket %v: missing 'Owner' label", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.labels.Environment; 
  msg := sprintf("GCS bucket %v: missing 'Environment' label", [b.name]) 
}

Compute Instance (policy/compute.rego)

package terraform.compute
instances[i] if { some r; i := input.resource_changes[r]; i.type == "google_compute_instance" }
disallowed_types := {"f1-micro","g1-small"}

deny contains msg if { inst := instances[_]; inst.change.after.machine_type in disallowed_types; msg := sprintf("Compute %v: disallowed machine type %v", [inst.name, inst.change.after.machine_type]) }
deny contains msg if { inst := instances[_]; not inst.change.after.service_account; msg := sprintf("Compute %v: missing service_account", [inst.name]) }
deny contains msg if { inst := instances[_]; nic := inst.change.after.network_interface[_]; ac := nic.access_config[_]; ac != null; msg := sprintf("Compute %v: has external IP", [inst.name]) }
deny contains msg if { inst := instances[_]; bd := inst.change.after.boot_disk; bd != null; not bd[0].disk_encryption_key; msg := sprintf("Compute %v: boot disk not encrypted", [inst.name]) }
deny contains msg if { inst := instances[_]; meta := inst.change.after.metadata_startup_script; contains(meta,"curl") & contains(meta,"bash"); msg := sprintf("Compute %v: startup script uses curl|bash pattern", [inst.name]) }
deny contains msg if { inst := instances[_]; not inst.change.after.labels.Environment; msg := sprintf("Compute %v: missing label 'Environment'", [inst.name]) }

Firewall (policy/firewall.rego)

package terraform.firewall
fws[f] if { some r; f := input.resource_changes[r]; f.type == "google_compute_firewall" }

deny contains msg if { fw := fws[_]; rule := fw.change.after; rule.allowed[_].protocol=="tcp"; rule.allowed[_].ports[_]=="22"; rule.source_ranges[_]=="0.0.0.0/0"; msg := sprintf("Firewall %v allows SSH 22 from 0.0.0.0/0", [fw.name]) }
deny contains msg if { fw := fws[_]; rule := fw.change.after; rule.allowed[_].protocol=="tcp"; rule.allowed[_].ports[_]=="3389"; rule.source_ranges[_]=="0.0.0.0/0"; msg := sprintf("Firewall %v allows RDP 3389 from 0.0.0.0/0", [fw.name]) }
deny contains msg if { fw := fws[_]; rule := fw.change.after; rule.allowed[_].protocol=="all"; rule.source_ranges[_]=="0.0.0.0/0"; msg := sprintf("Firewall %v allows all traffic from 0.0.0.0/0", [fw.name]) }

IAM / Service Account (policy/iam_gcp.rego)

package terraform.iam_gcp
service_accounts[s] if { some r; s := input.resource_changes[r]; s.type == "google_service_account" }
iam_bindings[b] if { some r; b := input.resource_changes[r]; b.type == "google_project_iam_binding" }

deny contains msg if { sa := service_accounts[_]; not sa.change.after.display_name; msg := sprintf("Service Account %v: missing display_name", [sa.name]) }
deny contains msg if { b := iam_bindings[_]; member := b.change.after.members[_]; member == "allUsers"; msg := sprintf("IAM binding %v grants role %v to allUsers", [b.name, b.change.after.role]) }
deny contains msg if { b := iam_bindings[_]; b.change.after.role == "roles/owner"; msg := sprintf("IAM binding %v uses broad role roles/owner", [b.name]) }

4. Python Orchestrator (validate_apply_gcp.py)

import subprocess, json, os, shutil, sys

TERRAFORM_DIR = "terraform"
PLAN_BIN = "tfplan"
PLAN_JSON = "plan.json"
POLICY_DIR = "policy"

def run(cmd, cwd=None, check=True):
    print("šŸ‘‰", " ".join(cmd))
    proc = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
    if proc.returncode != 0 and check:
        print("❌ Command failed:", " ".join(cmd))
        print("STDOUT:", proc.stdout)
        print("STDERR:", proc.stderr)
        sys.exit(proc.returncode)
    return proc.stdout

def terraform_plan_and_show():
    run(["terraform", "init", "-input=false"], cwd=TERRAFORM_DIR)
    run(["terraform", "plan", "-out", PLAN_BIN, "-input=false"], cwd=TERRAFORM_DIR)
    out = run(["terraform", "show", "-json", os.path.join(TERRAFORM_DIR, PLAN_BIN)], cwd=None)
    with open(PLAN_JSON, "w") as f: f.write(out)

def opa_eval():
    opa_path = shutil.which("opa")
    if not opa_path:
        print("❌ opa not found in PATH"); sys.exit(1)
    cmd = [opa_path,"eval","-i",PLAN_JSON,"-d",POLICY_DIR,"--format","json","data.terraform.deny"]
    out = run(cmd)
    return json.loads(out)

def extract_violations(opa_json):
    violations = []
    for item in opa_json.get("result",[]):
        for expr in item.get("expressions",[]):
            val = expr.get("value")
            if isinstance(val,list): violations.extend(val)
    return violations

def terraform_apply():
    run(["terraform","apply","-auto-approve"], cwd=TERRAFORM_DIR)

def main():
    terraform_plan_and_show()
    opa_json = opa_eval()
    violations = extract_violations(opa_json)
    if violations:
        print("\n❌ Policy violations found:")
        for v in violations: print(" -",v)
        print("🚫 Aborting terraform apply.")
        sys.exit(1)
    else:
        print("\n✅ No policy violations. Applying Terraform...")
        terraform_apply()

if __name__=="__main__":
    main()

5. Workflow Summary

  1. Put Terraform code in terraform/.
  2. Put Rego policies in policy/.
  3. Run python validate_apply_gcp.py.
  4. The script will generate plan.json, evaluate OPA policies, and abort if violations exist.
  5. If clean, it will automatically apply Terraform to create GCP infrastructure.

TF and OPA

AWS + Terraform + OPA — Full Project (HTML)

AWS + Terraform + OPA — Full Project (mock, failing by design)

This file contains everything you need to run an automated pipeline locally: Terraform configuration (non-compliant by value), OPA Rego policies (OPA v1.0+ syntax), a single policy entrypoint, and runner scripts (Python + PowerShell). The TF config has all properties present so you can toggle values to change behavior from fail to pass.


Project layout (what to create locally)

aws-opa-tf/
├── main.tf               # Terraform config (all resources present — intentionally non-compliant values)
├── deploy.py             # Python orchestrator (plan → json → opa eval → apply)
├── opa_check.ps1         # PowerShell runner (optional)
└── policy/
    ├── main.rego         # aggregator -> data.terraform.deny
    ├── ec2_complex.rego
    ├── s3_complex.rego
    ├── iam_complex.rego
    └── sg_simple.rego

1) Terraform (non-compliant but complete) — main.tf

All resources are fully defined. Values chosen here intentionally violate the policy rules (so OPA will report violations). When you want to pass, just update the flagged values in the comments.

// main.tf
provider "aws" {
  region = "us-east-1"
}

# ---------- S3 (complete, but non-compliant values) ----------
resource "aws_s3_bucket" "bad_bucket" {
  bucket = "opa-violation-bucket-12345"
  acl    = "public-read"            # ❌ non-compliant: should be "private"
  versioning {
    enabled = false                 # ❌ non-compliant: should be true
  }
  # encryption block present but we will treat as missing by policy (simulate misconfigured)
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }

  tags = {
    Environment = "Dev"
    Team        = "Platform"
  }
}

# ---------- IAM Role (present but invalid trust policy) ----------
resource "aws_iam_role" "bad_role" {
  name = "bad-role"

  # intentionally empty object (invalid trust) to trigger policy check
  assume_role_policy = jsonencode({})   # ❌ non-compliant: should have Version and Statement
}

# ---------- IAM Policy (too broad) ----------
resource "aws_iam_policy" "too_broad" {
  name   = "AllowEverything"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = "*",      # ❌ non-compliant
        Resource = "*"     # ❌ non-compliant
      }
    ]
  })
}

# ---------- Security Group (open SSH/RDP) ----------
resource "aws_security_group" "bad_sg" {
  name   = "bad-sg"
  vpc_id = "vpc-12345678"  # replace for real runs

  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]   # ❌ non-compliant: open SSH
  }

  ingress {
    from_port   = 3389
    to_port     = 3389
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]   # ❌ non-compliant: open RDP
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

# ---------- EC2 Instance (present but misconfigured) ----------
resource "aws_instance" "bad_ec2" {
  ami           = "ami-12345678"         # replace for real runs
  instance_type = "t2.micro"             # ❌ disallowed type per policy
  subnet_id     = "subnet-12345678"      # replace for real runs
  vpc_security_group_ids = [aws_security_group.bad_sg.id]
  iam_instance_profile   = aws_iam_role.bad_role.name

  ebs_optimized = false                  # ❌ should be true
  ebs_block_device {
    device_name = "/dev/sda1"
    volume_size = 30
    volume_type = "standard"             # ❌ non-compliant for Prod (policy expects gp3/io1)
    encrypted   = false                  # ❌ must be true
  }

  tags = {
    Environment = "Prod"
    Service     = "frontend"
  }
}

2) OPA policies (OPA v1.0+ syntax) — put these files in policy/

All policies use deny contains msg if { ... } style so they work with OPA 1.0+.

2.1 Aggregator — policy/main.rego

package terraform

import data.terraform.ec2_complex as ec2_complex
import data.terraform.s3_complex as s3_complex
import data.terraform.iam_complex as iam_complex
import data.terraform.sg_simple as sg_simple

# Aggregate all denials into one entrypoint:
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }

2.2 EC2 + EBS + SG cross checks — policy/ec2_complex.rego

package terraform.ec2_complex

# Helper: find all aws_instance resources
instances[i] if {
  some r
  inst := input.resource_changes[r]
  inst.type == "aws_instance"
  i = inst
}

# Helper: find SG by address/name (mock plan uses address-like id)
sg_by_id(id) := sg if {
  sg := input.resource_changes[id]
  sg.type == "aws_security_group"
}

# 1) EBS optimized required
deny contains msg if {
  inst := instances[_]
  not inst.change.after.ebs_optimized
  msg := sprintf("EC2 %v is not EBS optimized", [inst.name])
}

# 2) No public IP
deny contains msg if {
  inst := instances[_]
  inst.change.after.associate_public_ip_address
  msg := sprintf("EC2 %v has a public IP assigned", [inst.name])
}

# 3) IAM instance profile must be attached
deny contains msg if {
  inst := instances[_]
  not inst.change.after.iam_instance_profile
  msg := sprintf("EC2 %v does not have an IAM instance profile", [inst.name])
}

# 4) EBS volumes must be encrypted
deny contains msg if {
  inst := instances[_]
  vol := inst.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %v has unencrypted EBS volume %v", [inst.name, vol.device_name])
}

# 5) For Prod environment, EBS types must be gp3 or io1
deny contains msg if {
  inst := instances[_]
  inst.change.after.tags.Environment == "Prod"
  vol := inst.change.after.ebs_block_device[_]
  not (vol.volume_type == "gp3" or vol.volume_type == "io1")
  msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [inst.name, vol.volume_type, vol.device_name])
}

# 6) Check attached SGs: no SSH/RDP from 0.0.0.0/0
deny contains msg if {
  inst := instances[_]
  sg_id := inst.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [inst.name, sg.name])
}

deny contains msg if {
  inst := instances[_]
  sg_id := inst.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [inst.name, sg.name])
}

2.3 S3 best practices — policy/s3_complex.rego

package terraform.s3_complex

# all buckets
buckets[b] if {
  some r
  b := input.resource_changes[r]
  b.type == "aws_s3_bucket"
}

deny contains msg if {
  b := buckets[_]
  b.change.after.acl == "public-read"
  msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.versioning.enabled
  msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}

# require tag Owner for operational traceability
deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Owner
  msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}

2.4 IAM least privilege — policy/iam_complex.rego

package terraform.iam_complex

# helpers
roles[r] if {
  some k
  r := input.resource_changes[k]
  r.type == "aws_iam_role"
}
policies[p] if {
  some k
  p := input.resource_changes[k]
  p.type == "aws_iam_policy"
}

# 1) assume_role_policy must be an object with Version and Statement
deny contains msg if {
  r := roles[_]
  # missing Version or Statement -> deny
  not r.change.after.assume_role_policy.Version
  msg := sprintf("IAM Role %v: assume_role_policy missing 'Version'", [r.name])
}

deny contains msg if {
  r := roles[_]
  not r.change.after.assume_role_policy.Statement
  msg := sprintf("IAM Role %v: assume_role_policy missing 'Statement'", [r.name])
}

# 2) Policy statements must not use wildcard Action or Resource
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Action == "*"
  msg := sprintf("IAM Policy %v: Statement uses Action '*'", [p.name])
}

deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Resource == "*"
  msg := sprintf("IAM Policy %v: Statement uses Resource '*'", [p.name])
}

2.5 SG hygiene (standalone) — policy/sg_simple.rego

package terraform.sg_simple

# all security groups
sgs[sg] if {
  some r
  sg := input.resource_changes[r]
  sg.type == "aws_security_group"
}

# deny open SSH/RDP anywhere
deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}

deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}

3) Python orchestrator — deploy.py

Place this at project root. It runs terraform plan → writes plan.json → runs opa eval (using the single policy folder) → aborts or applies based on violations.

#!/usr/bin/env python3
# deploy.py
import subprocess, json, os, sys

PLAN_FILE = "plan.tfplan"
PLAN_JSON = "plan.json"
POLICY_DIR = "policy"

def run_cmd(cmd, check=True):
    print("šŸ‘‰", " ".join(cmd))
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if check and proc.returncode != 0:
        print("❌ Command failed:", " ".join(cmd))
        print(proc.stderr or proc.stdout)
        sys.exit(proc.returncode)
    return proc.stdout

def terraform_plan():
    run_cmd(["terraform", "init", "-input=false"])
    run_cmd(["terraform", "plan", "-out", PLAN_FILE, "-input=false"])
    plan_json = run_cmd(["terraform", "show", "-json", PLAN_FILE])
    with open(PLAN_JSON, "w") as f:
        f.write(plan_json)

def opa_eval():
    # Single entrypoint: data.terraform.deny (from policy/main.rego aggregator)
    cmd = ["opa", "eval", "-i", PLAN_JSON, "-d", POLICY_DIR, "--format", "json", "data.terraform.deny"]
    out = run_cmd(cmd)
    return json.loads(out)

def extract_violations(opa_json):
    violations = []
    for item in opa_json.get("result", []):
        for expr in item.get("expressions", []):
            val = expr.get("value")
            if isinstance(val, list):
                violations.extend(val)
    return violations

def terraform_apply():
    run_cmd(["terraform", "apply", "-auto-approve"])

if __name__ == "__main__":
    terraform_plan()
    opa_json = opa_eval()
    violations = extract_violations(opa_json)
    if violations:
        print("\n\033[91m❌ Policy violations detected:\033[0m")
        for v in violations:
            print(" -", v)
        print("\n🚫 Aborting. Fix policy violations and run again.")
        sys.exit(1)
    else:
        print("\n\033[92m✅ All policies passed. Applying infra...\033[0m")
        terraform_apply()

4) PowerShell runner (optional) — opa_check.ps1

# opa_check.ps1 (optional)
$Plan = "C:\path\to\aws-opa-tf\plan.json"
$Policy = "C:\path\to\aws-opa-tf\policy"

$cmd = @("opa","eval","-i",$Plan,"-d",$Policy,"--format","json","data.terraform.deny")
try {
  $raw = & $cmd
} catch {
  Write-Error "OPA eval failed. Ensure opa.exe in PATH."
  exit 2
}
$out = $raw | ConvertFrom-Json
$violations = @()
foreach ($r in $out.result) {
  foreach ($e in $r.expressions) {
    if ($e.value) { $violations += $e.value }
  }
}
if ($violations.Count -gt 0) {
  Write-Host "❌ Policy violations:" -ForegroundColor Red
  $violations | ForEach-Object { Write-Host " - $_" -ForegroundColor Yellow }
  exit 1
} else {
  Write-Host "✅ All policies passed." -ForegroundColor Green
  exit 0
}

5) Quick help / run checklist

  1. Install prerequisites: Terraform, OPA (v1.0+), Python 3.x, AWS CLI (configured).
  2. Create folder aws-opa-tf and paste files: main.tf, deploy.py, policy/*.rego.
  3. Replace placeholder VPC/subnet/AMI values in main.tf with real ones for apply (or leave them if you only plan to run plan+OPA check).
  4. Run the pipeline: python deploy.py. This will:
    • run terraform plan and produce plan.json
    • run opa eval -d policy -i plan.json data.terraform.deny
    • if violations → shows them and exits; if none → runs terraform apply -auto-approve
  5. To test a passing run: change TF values to the compliant ones (see comments in main.tf), then rerun python deploy.py.

6) Expected violations (with the provided non-compliant TF values)

  • S3: ACL is public-read, versioning is false, SSE missing — fail
  • IAM Role: assume_role_policy is empty/invalid — fail
  • IAM Policy: allows Action: * and Resource: * — fail
  • Security Group: SSH/RDP open to 0.0.0.0/0 — fail
  • EC2: disallowed instance type, not EBS optimized, unencrypted EBS, Prod uses non-gp3/io1 — fail

Notes & small gotchas

  • OPA 1.0+ requires the deny contains msg if { ... } pattern (no mixing of default assignments with contains). The policies above follow that format.
  • When using real Terraform plans, resource addresses and IDs can be different (e.g. computed IDs). The Rego helpers above assume a simplified mock plan structure where the security group reference is the resource address (e.g. aws_security_group.bad_sg). For real plans, you may need to match by address fields or resource instance keys.
  • If some values are computed at apply time, check for after_unknown in the JSON and handle accordingly (you may treat unknowns as violations or warnings depending on policy).

If you want, I can:

  • Provide a clean/compliant version of main.tf so you can quickly toggle between fail/pass.
  • Add unit tests (small Python tests) that run OPA against multiple plan JSON variants.
  • Help you adapt the Rego rules to the exact real Terraform plan JSON output your environment produces (I can analyze a sample plan.json you produce and tune the rules).

Terraform + OPA Validation with Python

This Python script validates a Terraform plan using multiple OPA policies (S3, EC2, IAM, Security Group) and applies the infrastructure only if there are no policy violations.

Python Script (opa_tf_apply.py)

import subprocess
import json
import os
import shutil
import sys

# ----------------------------
# Configuration
# ----------------------------
plan_file = "terraform-plan.json"
policy_dir = "policy"

# Change working directory if needed
os.chdir(os.path.dirname(os.path.abspath(__file__)))

# ----------------------------
# Check if OPA exists
# ----------------------------
opa_path = shutil.which("opa")
if not opa_path:
    print("❌ OPA executable not found in PATH. Please install OPA and add it to PATH.")
    sys.exit(1)

# ----------------------------
# Collect all Rego policy files
# ----------------------------
rego_files = [os.path.join(policy_dir, f) for f in os.listdir(policy_dir) if f.endswith(".rego")]
if not rego_files:
    print(f"❌ No Rego files found in '{policy_dir}'. Please add policy files.")
    sys.exit(1)

# ----------------------------
# Build OPA eval command
# ----------------------------
cmd = [opa_path, "eval", "-i", plan_file, "--format", "json"]
for rego in rego_files:
    cmd.extend(["-d", rego])

# ----------------------------
# Run OPA eval
# ----------------------------
print("šŸ”Ž OPA validation start...")

try:
    result = subprocess.run(cmd, capture_output=True, text=True)
except Exception as e:
    print("❌ Error running OPA:", str(e))
    sys.exit(1)

# ----------------------------
# Check return code
# ----------------------------
if result.returncode != 0:
    print("❌ OPA command failed.")
    print("STDOUT:", result.stdout)
    print("STDERR:", result.stderr)
    sys.exit(1)

# ----------------------------
# Parse OPA output
# ----------------------------
try:
    opa_output = json.loads(result.stdout)
except json.JSONDecodeError:
    print("❌ Failed to parse OPA JSON output:")
    print(result.stdout)
    sys.exit(1)

violations = []
for res in opa_output.get("result", []):
    for expr in res.get("expressions", []):
        violations.extend(expr.get("value", []))

# ----------------------------
# Display violations
# ----------------------------
if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
    sys.exit(1)
else:
    print("✅ All policies passed. Proceeding with Terraform apply...")

    # ----------------------------
    # Run Terraform apply
    # ----------------------------
    try:
        tf_apply = subprocess.run(["terraform", "apply", "-auto-approve"], capture_output=True, text=True)
        print(tf_apply.stdout)
        if tf_apply.returncode != 0:
            print("❌ Terraform apply failed:")
            print(tf_apply.stderr)
            sys.exit(1)
        else:
            print("✅ Terraform infrastructure created successfully!")
    except Exception as e:
        print("❌ Error running Terraform apply:", str(e))
        sys.exit(1)

Tuesday, 26 August 2025

Centralize OPA policy

OPA Policy Validation for Multi-Resource Terraform Plan (Mock)

OPA Policy Validation for Mock Terraform Plan

This document demonstrates how to validate a mock Terraform plan JSON containing multiple resources (EC2, S3, Security Group, and IAM) using Open Policy Agent (OPA). It includes JSON input, Rego policies, and both Python and PowerShell scripts.

1. Directory Structure

C:\OPA_Mock_Project\
│
├── terraform-plan.json     # Mock Terraform plan JSON for all resources
└── policy\                 # OPA policies
    ├── s3.rego
    ├── ec2.rego
    ├── iam.rego
    └── sg.rego

2. Mock Terraform Plan JSON (terraform-plan.json)

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_s3_bucket.example": {
      "type": "aws_s3_bucket",
      "name": "example",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "bucket": "my-opa-test-bucket-12345",
          "acl": "public-read",
          "versioning": {"enabled": false},
          "server_side_encryption_configuration": null
        }
      }
    },
    "aws_instance.example_ec2": {
      "type": "aws_instance",
      "name": "example_ec2",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ami": "ami-12345678",
          "instance_type": "t2.micro",
          "ebs_optimized": false,
          "associate_public_ip_address": true,
          "iam_instance_profile": "my-ec2-role",
          "vpc_security_group_ids": ["sg-12345678"],
          "ebs_block_device": [
            {"device_name": "/dev/sda1", "volume_size": 30, "encrypted": false},
            {"device_name": "/dev/sdb", "volume_size": 50, "encrypted": true}
          ],
          "tags": {"Environment": "Dev"}
        }
      }
    },
    "aws_security_group.example_sg": {
      "type": "aws_security_group",
      "name": "example_sg",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ingress": [
            {"from_port": 22, "to_port": 22, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"]}
          ]
        }
      }
    },
    "aws_iam_role.example_role": {
      "type": "aws_iam_role",
      "name": "example_role",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "assume_role_policy": {
            "Version": "2012-10-17",
            "Statement": [
              {"Action":"sts:AssumeRole","Effect":"Allow","Principal":{"Service":"ec2.amazonaws.com"}}
            ]
          }
        }
      }
    }
  }
}

3. OPA Policies

S3 Policy (policy/s3.rego)

package terraform.s3

# Disallow public-read ACL
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  input.resource_changes[resource].change.after.acl == "public-read"
  msg := sprintf("Bucket %v has public-read ACL", [input.resource_changes[resource].name])
}

# Require versioning enabled
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.versioning.enabled
  msg := sprintf("Bucket %v does not have versioning enabled", [input.resource_changes[resource].name])
}

# Require encryption
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.server_side_encryption_configuration
  msg := sprintf("Bucket %v does not have server-side encryption", [input.resource_changes[resource].name])
}

EC2 Policy (policy/ec2.rego)

package terraform.ec2

# Disallow t2.micro instances
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  input.resource_changes[resource].change.after.instance_type == "t2.micro"
  msg := sprintf("Instance %v uses disallowed type t2.micro", [input.resource_changes[resource].name])
}

# Require EBS optimization
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  not input.resource_changes[resource].change.after.ebs_optimized
  msg := sprintf("Instance %v is not EBS optimized", [input.resource_changes[resource].name])
}

# Disallow public IP
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  input.resource_changes[resource].change.after.associate_public_ip_address
  msg := sprintf("Instance %v has a public IP assigned", [input.resource_changes[resource].name])
}

# EBS volumes must be encrypted
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  volume := input.resource_changes[resource].change.after.ebs_block_device[_]
  not volume.encrypted
  msg := sprintf("Instance %v has unencrypted volume %v", [input.resource_changes[resource].name, volume.device_name])
}

Security Group Policy (policy/sg.rego)

package terraform.sg

# Disallow open ingress 0.0.0.0/0
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_security_group"
  ingress := input.resource_changes[resource].change.after.ingress[_]
  ingress.cidr_blocks[_] == "0.0.0.0/0"
  msg := sprintf("Security Group %v has open ingress to 0.0.0.0/0", [input.resource_changes[resource].name])
}

IAM Policy (policy/iam.rego)

package terraform.iam

# Require assume role policy
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_iam_role"
  not input.resource_changes[resource].change.after.assume_role_policy
  msg := sprintf("IAM Role %v does not have an assume role policy", [input.resource_changes[resource].name])
}

4. Python Script (opa_check.py)

import subprocess
import json
import os

plan_file = "terraform-plan.json"
policy_dir = "policy"

rego_files = [os.path.join(policy_dir, f) for f in os.listdir(policy_dir) if f.endswith(".rego")]

cmd = ["opa", "eval", "-i", plan_file, "--format", "json", "data"]
for rego in rego_files:
    cmd.extend(["-d", rego])

result = subprocess.run(cmd, capture_output=True, text=True)
opa_output = json.loads(result.stdout)

violations = []
for res in opa_output["result"]:
    for expr in res["expressions"]:
        if expr["value"]:
            violations.extend(expr["value"])

if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
else:
    print("✅ All policies passed.")

5. PowerShell Script (opa_check.ps1)

$PlanFile = "C:\OPA_Mock_Project\terraform-plan.json"
$PolicyFolder = "C:\OPA_Mock_Project\policy"

$RegoFiles = Get-ChildItem -Path $PolicyFolder -Filter *.rego | ForEach-Object { $_.FullName }

$OpaCommand = @("opa", "eval", "-i", $PlanFile, "--format", "json", "data")
foreach ($rego in $RegoFiles) { $OpaCommand += @("-d", $rego) }

try {
    $OpaOutputRaw = & $OpaCommand
} catch {
    Write-Error "Failed to run OPA. Ensure opa.exe is in PATH."
    exit 1
}

$OpaOutput = $OpaOutputRaw | ConvertFrom-Json
$Violations = @()
foreach ($res in $OpaOutput.result) {
    foreach ($expr in $res.expressions) {
        if ($expr.value) { $Violations += $expr.value }
    }
}

if ($Violations.Count -gt 0) {
    Write-Host "❌ Policy violations found:" -ForegroundColor Red
    foreach ($v in $Violations) { Write-Host "- $v" -ForegroundColor Yellow }
    exit 1
} else {
    Write-Host "✅ All policies passed." -ForegroundColor Green
}

6. Expected Violations for This Mock Plan

  • S3 bucket has public-read ACL
  • S3 bucket does not have versioning enabled
  • S3 bucket does not have server-side encryption
  • EC2 instance uses disallowed type t2.micro
  • EC2 instance is not EBS optimized
  • EC2 instance has a public IP assigned
  • EC2 instance has unencrypted volume /dev/sda1
  • Security Group has open ingress 0.0.0.0/0