Saturday, 30 August 2025

GCP-TF-Log-sync-CloudFunction

GCP Public Bucket Alert Setup

🚨 Google Cloud – Public Bucket Alert (Terraform + Cloud Function)

Main.tf

provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = 

variables.tf

variable "project_id" {
  description = "Your GCP Project ID"
}
variable "region" {
  default     = "us-central1"
}

🔹 Cloud Function Code (main.py)

import base64
import json
from google.cloud import pubsub_v1

SOC_TOPIC = "soc-alerts"

def process_pubsub(event, context):
    """Triggered when a bucket is made public"""
    if "data" not in event:
        print("No data found in event")
        return

    # Decode log entry
    payload = base64.b64decode(event["data"]).decode("utf-8")
    try:
        log_entry = json.loads(payload)
    except Exception as e:
        print(f"Could not parse log entry: {e}")
        return

    bucket_name = log_entry.get("resource", {}).get("labels", {}).get("bucket_name", "unknown")

    # Create alert message
    message = {
        "alert": "PUBLIC_BUCKET_DETECTED",
        "bucket": bucket_name,
        "log": log_entry
    }

    # Publish to SOC topic
    publisher = pubsub_v1.PublisherClient()
    project_id = log_entry.get("resource", {}).get("labels", {}).get("project_id", "")
    topic_path = publisher.topic_path(project_id, SOC_TOPIC)

    publisher.publish(topic_path, json.dumps(message).encode("utf-8"))
    print(f"⚠️ SOC ALERT: Public bucket detected -> {bucket_name}")

requirements.txt

google-cloud-pubsub

🔹 Windows Packaging

Compress-Archive -Path main.py, requirements.txt -DestinationPath function-source.zip -Force

🔹 Deployment Steps

  1. Enable APIs:
  2. gcloud services enable pubsub.googleapis.com logging.googleapis.com cloudfunctions.googleapis.com
  3. Deploy Terraform:
  4. terraform init
    terraform apply
  5. Test by making a bucket public:
  6. gsutil iam ch allUsers:objectViewer gs://<your-bucket>

→ This will trigger Cloud Logging → Pub/Sub → Cloud Function → SOC Pub/Sub topic.

✅ Result

This setup works exactly like AWS S3 Public Bucket Alerts, but implemented in Google Cloud.

Terraform - main.tf (Full)

main.tf


provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required services
resource "google_project_service" "services" {
  for_each = toset([
    "pubsub.googleapis.com",
    "logging.googleapis.com",
    "cloudfunctions.googleapis.com"
  ])
  service = each.key
}

# Pub/Sub topic that receives log events
resource "google_pubsub_topic" "log_topic" {
  name = "storage-policy-violations"
}

# Pub/Sub topic for SOC alerts
resource "google_pubsub_topic" "soc_alerts" {
  name = "soc-alerts"
}

# Log sink to capture public bucket IAM changes
resource "google_logging_project_sink" "storage_sink" {
  name        = "storage-public-bucket-sink"
  destination = "pubsub.googleapis.com/${google_pubsub_topic.log_topic.id}"

  # Filter: only when public access is granted
  filter = <<EOT
resource.type="gcs_bucket"
protoPayload.methodName="storage.setIamPermissions"
(protoPayload.serviceData.policyDelta.bindingDeltas.member="allUsers"
 OR protoPayload.serviceData.policyDelta.bindingDeltas.member="allAuthenticatedUsers")
EOT

  unique_writer_identity = true
}

# Give sink permission to publish
resource "google_pubsub_topic_iam_member" "sink_pub" {
  topic  = google_pubsub_topic.log_topic.name
  role   = "roles/pubsub.publisher"
  member = google_logging_project_sink.storage_sink.writer_identity
}

# Storage bucket for function code
resource "google_storage_bucket" "function_bucket" {
  name          = "${var.project_id}-function-src"
  location      = var.region
  force_destroy = true
}

# Upload function zip
resource "google_storage_bucket_object" "function_source" {
  name   = "function-source.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = "function-source.zip"
}

# Cloud Function
resource "google_cloudfunctions_function" "notify_soc" {
  name        = "storage-public-alert"
  runtime     = "python39"
  region      = var.region
  entry_point = "process_pubsub"

  source_archive_bucket = google_storage_bucket.function_bucket.name
  source_archive_object = google_storage_bucket_object.function_source.name

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource   = google_pubsub_topic.log_topic.name
  }

  available_memory_mb = 256
  description         = "Notifies SOC when a bucket is made public"
}

# Allow function to publish to SOC topic
resource "google_pubsub_topic_iam_member" "function_pub" {
  topic  = google_pubsub_topic.soc_alerts.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:${google_cloudfunctions_function.notify_soc.service_account_email}"
}
  

Thursday, 28 August 2025

GCP 1

GCP Terraform Example — Compute + Storage + Firewall

GCP Terraform Example

This page contains a ready-to-use Terraform configuration that creates a small environment on Google Cloud: a Storage Bucket, a Firewall rule (like an AWS security group), and a small Compute Engine VM (e2-micro) suitable for testing. The configuration intentionally uses values that are easy to change for compliance or security.

Important: Replace YOUR_PROJECT_ID and ensure key.json points to your service account JSON credentials. Keep credentials secret and do not commit them to source control.

Terraform configuration

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project     = "YOUR_PROJECT_ID"
  region      = "us-central1"      # free tier region
  zone        = "us-central1-a"    # free tier zone
  credentials = file("key.json")   # your service account key
}

# --------------------------
# Storage Bucket (Free Tier)
# --------------------------
resource "google_storage_bucket" "demo_bucket" {
  name     = "my-demo-bucket-${random_id.rand.hex}"
  location = "US"

  storage_class = "STANDARD"
  force_destroy = true
  uniform_bucket_level_access = true
}

# --------------------------
# Firewall (Like Security Group)
# --------------------------
resource "google_compute_firewall" "default_allow_ssh" {
  name    = "allow-ssh"
  network = "default"

  allow {
    protocol = "tcp"
    ports    = ["22"]
  }

  source_ranges = ["0.0.0.0/0"] # 🚨 Open SSH to world (not safe for prod)
  target_tags   = ["ssh-allowed"]
}

# --------------------------
# Compute Instance (Free Tier)
# --------------------------
resource "google_compute_instance" "demo_vm" {
  name         = "demo-vm"
  machine_type = "e2-micro"  # ✅ Always Free tier machine type
  zone         = "us-central1-a"

  tags = ["ssh-allowed"]

  boot_disk {
    initialize_params {
      image = "debian-cloud/debian-11"
      size  = 30   # ✅ Free tier gives you 30GB Persistent Disk
    }
  }

  network_interface {
    network = "default"
    access_config {
      # Ephemeral public IP (free)
    }
  }

  metadata_startup_script = <<-EOT
    #!/bin/bash
    echo "Hello from Terraform VM" > /var/tmp/startup.txt
  EOT
}

# --------------------------
# Random ID for bucket name
# --------------------------
resource "random_id" "rand" {
  byte_length = 4
}

Quick run instructions

  1. Install and configure GCP SDK / Terraform.
  2. Place your service-account JSON next to main.tf as key.json, or update credentials path.
  3. Initialize Terraform:
    terraform init
  4. Preview changes:
    terraform plan -out=tfplan
  5. Apply (create resources):
    terraform apply tfplan
  6. Cleanup:
    terraform destroy -auto-approve

Fields & notes

FieldNotes
machine_type = "e2-micro"Always-free eligible machine in some regions (use us-central1).
source_ranges = ["0.0.0.0/0"]Opens SSH to the world — acceptable for quick tests but change to your IP for safety.
force_destroy = trueAllows bucket deletion even when it contains objects — useful for cleanup automation.
credentials = file("key.json")Terraform reads your service account key directly — no need to run gcloud auth (unless you want to).

Safety tips

  • Prefer restricting SSH source_ranges to your IP (e.g. ["203.0.113.4/32"]).
  • Verify billing is enabled on the project; free-tier still requires billing account attached.
  • Do not commit key.json to version control.

Want this as files?

If you’d like, I can package main.tf and a small README into a downloadable .zip you can extract and run locally — tell me and I’ll prepare it.

OPA basics

OPA AWS Terraform Policy - And, Or, Not

OPA AWS Terraform Policy Example (And, Or, Not)

This example demonstrates how to use and, or, and not operators in Rego v1 syntax for AWS Terraform plans.

1. Mock Terraform Plan JSON (aws-plan.json)

{
  "resource_changes": [
    {
      "address": "aws_s3_bucket.demo",
      "type": "aws_s3_bucket",
      "change": {
        "after": {
          "acl": "public-read",
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null
        }
      }
    },
    {
      "address": "aws_instance.demo",
      "type": "aws_instance",
      "change": {
        "after": {
          "instance_type": "t2.micro",
          "associate_public_ip_address": true,
          "ebs_optimized": false,
          "ebs_block_device": [
            { "device_name": "/dev/sda1", "encrypted": false }
          ]
        }
      }
    },
    {
      "address": "aws_security_group.demo",
      "type": "aws_security_group",
      "change": {
        "after": {
          "ingress": [
            { "from_port": 22, "to_port": 22, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
          ]
        }
      }
    }
  ]
}

2. S3 Policy (policy/s3.rego)

package terraform.s3

default deny = []

# Deny if ACL is public OR versioning not enabled
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_s3_bucket"
  rc.change.after.acl == "public-read" or not rc.change.after.versioning.enabled
  msg := sprintf("S3 bucket %s is public OR lacks versioning", [rc.address])
}

# Deny if server-side encryption is missing AND bucket is public
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_s3_bucket"
  rc.change.after.acl == "public-read" and not rc.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %s is public AND unencrypted", [rc.address])
}

3. EC2 Policy (policy/ec2.rego)

package terraform.ec2

default deny = []

# Deny if instance type is t2.micro OR has a public IP
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  rc.change.after.instance_type == "t2.micro" or rc.change.after.associate_public_ip_address
  msg := sprintf("EC2 %s is t2.micro OR has public IP", [rc.address])
}

# Deny if instance is NOT EBS optimized
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  not rc.change.after.ebs_optimized
  msg := sprintf("EC2 %s is not EBS optimized", [rc.address])
}

# Deny if any EBS volume is NOT encrypted
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_instance"
  vol := rc.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %s has unencrypted volume %s", [rc.address, vol.device_name])
}

4. Security Group Policy (policy/sg.rego)

package terraform.sg

default deny = []

# Deny if SG allows SSH OR RDP from world
deny[msg] if {
  rc := input.resource_changes[_]
  rc.type == "aws_security_group"
  ing := rc.change.after.ingress[_]
  (ing.from_port == 22 or ing.from_port == 3389)
  ing.cidr_blocks[_] == "0.0.0.0/0"
  msg := sprintf("Security Group %s allows SSH or RDP from world", [rc.address])
}

5. Run OPA Evaluation

opa eval -i aws-plan.json \
-d policy/s3.rego \
-d policy/ec2.rego \
-d policy/sg.rego \
"data.terraform"

Expected violations:

  • S3 bucket is public OR lacks versioning
  • S3 bucket is public AND unencrypted
  • EC2 is t2.micro OR has public IP
  • EC2 is not EBS optimized
  • EC2 has unencrypted volume
  • Security Group allows SSH or RDP from world

Wednesday, 27 August 2025

Policies use the new OPA 1.0+ syntax

Advanced OPA Policies for AWS Terraform (Mock Plan)

Advanced OPA Policies for AWS Terraform (Mock Plan)

This guide demonstrates complex OPA (Open Policy Agent) policies for AWS Terraform plans using a mock plan JSON. It includes cross-resource checks (EC2 ↔ Security Groups, EC2 ↔ EBS), S3 best practices, IAM least privilege, a single policy entrypoint to evaluate everything at once, and runner scripts (Python & PowerShell).

1) Directory Layout

C:\OPA_Advanced\
│
├── terraform-plan.json     # Mock Terraform plan JSON (intentionally violating several policies)
└── policy\
    ├── main.rego           # Single entrypoint aggregating all denials
    ├── ec2_complex.rego    # EC2 + SG + EBS cross checks
    ├── s3_complex.rego     # S3 best-practice checks
    ├── iam_complex.rego    # IAM least privilege checks
    └── sg_simple.rego      # SG hygiene (used by EC2 cross-checks)
Note: This is a mock plan; keys/IDs are simplified so cross-references are easy. In real plans, resource IDs are computed and you’ll often join using address, type, and name or inspect after_unknown and data sources.

2) Mock Terraform Plan JSON (terraform-plan.json)

This plan intentionally violates multiple controls to showcase policy failures.

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_s3_bucket.app_bucket": {
      "type": "aws_s3_bucket",
      "name": "app_bucket",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "bucket": "my-app-public-bucket",
          "acl": "public-read",
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null,
          "tags": { "Team": "Platform" }
        }
      }
    },

    "aws_security_group.web": {
      "type": "aws_security_group",
      "name": "web",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "web-sg",
          "description": "Web security group",
          "ingress": [
            { "from_port": 22,   "to_port": 22,   "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
            { "from_port": 3389, "to_port": 3389, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
            { "from_port": 80,   "to_port": 80,   "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
          ],
          "egress": [
            { "from_port": 0, "to_port": 0, "protocol": "-1", "cidr_blocks": ["0.0.0.0/0"] }
          ],
          "tags": { "Environment": "Prod" }
        }
      }
    },

    "aws_iam_role.ec2_role": {
      "type": "aws_iam_role",
      "name": "ec2_role",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "ec2-role",
          "assume_role_policy": {
            "Version": "2012-10-17",
            "Statement": [
              { "Effect": "Allow", "Principal": { "Service": "*" }, "Action": "sts:AssumeRole" }
            ]
          },
          "tags": { "Owner": "Alice" }
        }
      }
    },

    "aws_iam_policy.too_broad": {
      "type": "aws_iam_policy",
      "name": "too_broad",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "name": "AllowEverything",
          "policy": {
            "Version": "2012-10-17",
            "Statement": [
              { "Effect": "Allow", "Action": "*", "Resource": "*" }
            ]
          }
        }
      }
    },

    "aws_instance.web1": {
      "type": "aws_instance",
      "name": "web1",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ami": "ami-12345678",
          "instance_type": "t3.small",
          "ebs_optimized": false,
          "associate_public_ip_address": true,
          "iam_instance_profile": "ec2-role",
          "vpc_security_group_ids": ["aws_security_group.web"],
          "ebs_block_device": [
            { "device_name": "/dev/sda1", "volume_size": 30, "volume_type": "standard", "encrypted": false }
          ],
          "tags": { "Environment": "Prod", "Service": "frontend" }
        }
      }
    }
  }
}

3) OPA Policies (OPA v1.0+ syntax)

3.1 Main Aggregator (policy/main.rego)

Single entrypoint so you can evaluate everything at once.

package terraform

import data.terraform.ec2_complex
import data.terraform.s3_complex
import data.terraform.iam_complex
import data.terraform.sg_simple

# Aggregate all denials into one set
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }

3.2 EC2 + SG + EBS Cross Checks (policy/ec2_complex.rego)

package terraform.ec2_complex

# Helper: iterate all EC2 instances in the plan
ec2s[ec2] if {
  some r
  ec2 := input.resource_changes[r]
  ec2.type == "aws_instance"
}

# Helper: look up SG resource by "id" (mock uses address-style id)
sg_by_id(id) := sg if {
  sg := input.resource_changes[id]
  sg.type == "aws_security_group"
}

# --- Deny rules ---

# 1) EC2 must be EBS-optimized
deny contains msg if {
  ec2 := ec2s[_]
  not ec2.change.after.ebs_optimized
  msg := sprintf("EC2 %v is not EBS optimized", [ec2.name])
}

# 2) EC2 must not have public IP
deny contains msg if {
  ec2 := ec2s[_]
  ec2.change.after.associate_public_ip_address
  msg := sprintf("EC2 %v has a public IP assigned", [ec2.name])
}

# 3) EC2 must have IAM instance profile (role)
deny contains msg if {
  ec2 := ec2s[_]
  not ec2.change.after.iam_instance_profile
  msg := sprintf("EC2 %v does not have an IAM instance profile attached", [ec2.name])
}

# 4) All attached EBS volumes must be encrypted
deny contains msg if {
  ec2 := ec2s[_]
  vol := ec2.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %v has unencrypted EBS volume %v", [ec2.name, vol.device_name])
}

# 5) If Environment=Prod, EBS volume types must be gp3 or io1
deny contains msg if {
  ec2 := ec2s[_]
  ec2.change.after.tags.Environment == "Prod"
  vol := ec2.change.after.ebs_block_device[_]
  not (vol.volume_type == "gp3" or vol.volume_type == "io1")
  msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [ec2.name, vol.volume_type, vol.device_name])
}

# 6) EC2's attached SGs must not allow SSH (22) or RDP (3389) from 0.0.0.0/0
deny contains msg if {
  ec2 := ec2s[_]
  sg_id := ec2.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}

deny contains msg if {
  ec2 := ec2s[_]
  sg_id := ec2.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}

3.3 S3 Best Practices (policy/s3_complex.rego)

package terraform.s3_complex

# Helper: all S3 buckets
buckets[b] if {
  some r
  b := input.resource_changes[r]
  b.type == "aws_s3_bucket"
}

# Require versioning
deny contains msg if {
  b := buckets[_]
  not b.change.after.versioning.enabled
  msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}

# Require server-side encryption
deny contains msg if {
  b := buckets[_]
  not b.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}

# Block public ACLs
deny contains msg if {
  b := buckets[_]
  b.change.after.acl == "public-read"
  msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}

# Require mandatory tags (Environment and Owner)
deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Environment
  msg := sprintf("S3 bucket %v: missing required tag 'Environment'", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Owner
  msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}

3.4 IAM Least Privilege (policy/iam_complex.rego)

package terraform.iam_complex

# Helper: roles and policies
roles[r] if {
  some k
  r := input.resource_changes[k]
  r.type == "aws_iam_role"
}

policies[p] if {
  some k
  p := input.resource_changes[k]
  p.type == "aws_iam_policy"
}

# 1) AssumeRole principal must not be wildcard
deny contains msg if {
  r := roles[_]
  stmt := r.change.after.assume_role_policy.Statement[_]
  stmt.Principal.Service == "*"
  msg := sprintf("IAM Role %v: assume-role Principal.Service is wildcard '*'", [r.name])
}

# 2) Managed policy statements must not have Action '*'
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Action == "*"
  msg := sprintf("IAM Policy %v: uses Action '*'", [p.name])
}

# 3) Managed policy statements must not have Resource '*'
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Resource == "*"
  msg := sprintf("IAM Policy %v: uses Resource '*'", [p.name])
}

3.5 Security Group Hygiene (Standalone) (policy/sg_simple.rego)

package terraform.sg_simple

# Helper: all SGs
sgs[sg] if {
  some r
  sg := input.resource_changes[r]
  sg.type == "aws_security_group"
}

# Disallow 0.0.0.0/0 for SSH and RDP anywhere in the plan (defense in depth)
deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}

deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}

4) Python Runner (opa_check.py)

import subprocess, json, os, sys

plan = "terraform-plan.json"
policy_dir = "policy"

# Evaluate a single entrypoint: data.terraform.deny (from main.rego)
cmd = ["opa", "eval", "-i", plan, "-d", policy_dir, "--format", "json", "data.terraform.deny"]

try:
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
    print("OPA evaluation failed:", e.stderr or e.stdout)
    sys.exit(2)

data = json.loads(result.stdout)

violations = []
for res in data.get("result", []):
    for expr in res.get("expressions", []):
        val = expr.get("value")
        if isinstance(val, list):
            violations.extend(val)

if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
    sys.exit(1)
else:
    print("✅ All policies passed.")
    sys.exit(0)

5) PowerShell Runner (opa_check.ps1)

$Plan = "C:\OPA_Advanced\terraform-plan.json"
$PolicyDir = "C:\OPA_Advanced\policy"

$Cmd = @("opa", "eval", "-i", $Plan, "-d", $PolicyDir, "--format", "json", "data.terraform.deny")

try {
  $OutRaw = & $Cmd
} catch {
  Write-Error "OPA eval failed. Ensure opa.exe in PATH and inputs exist."
  exit 2
}

$Out = $OutRaw | ConvertFrom-Json
$Violations = @()

foreach ($r in $Out.result) {
  foreach ($e in $r.expressions) {
    if ($e.value) { $Violations += $e.value }
  }
}

if ($Violations.Count -gt 0) {
  Write-Host "❌ Policy violations found:" -ForegroundColor Red
  $Violations | ForEach-Object { Write-Host "- $_" -ForegroundColor Yellow }
  exit 1
} else {
  Write-Host "✅ All policies passed." -ForegroundColor Green
  exit 0
}

6) How to Run

  1. Create C:\OPA_Advanced and the policy\ folder.
  2. Save the JSON and the four .rego files (plus main.rego) into the paths above.
  3. Run a quick syntax check (OPA 1.0+):
    opa check policy\
  4. Evaluate:
    opa eval -i terraform-plan.json -d policy "data.terraform.deny"
  5. Or use the provided Python/PowerShell scripts.

7) Expected Violations (from this Mock Plan)

  • S3: versioning disabled
  • S3: encryption missing
  • S3: ACL public-read
  • S3: missing tag Owner
  • IAM Role: assume-role principal uses wildcard *
  • IAM Policy: Action is *
  • IAM Policy: Resource is *
  • EC2: not EBS optimized
  • EC2: public IP assigned
  • EC2: unencrypted EBS volume /dev/sda1
  • EC2 (Prod): non-compliant EBS volume type standard
  • EC2/SG: SSH (22) from 0.0.0.0/0
  • EC2/SG: RDP (3389) from 0.0.0.0/0
  • SG global: (defense in depth) open SSH/RDP

8) Tips for Real Plans

  • Real plan JSON often nests values and uses computed IDs; join resources via type/name or address.
  • Inspect after_unknown if values are computed and not known at plan time.
  • Consider separate warn vs deny sets for advisory controls.
  • Add default allow := true style patterns if using allow/deny models together.

GCP - OPA basics

GCP Terraform + OPA Policy Validation

GCP Terraform Infrastructure with OPA Policy Validation

This document demonstrates how to create GCP infrastructure (Compute, Storage, Firewall) using Terraform, validate it with Open Policy Agent (OPA), and run Terraform apply only if policies pass.

1. Directory Structure

GCP_OPA_Project/
│
├── terraform/              # Terraform configuration
│   └── main.tf
├── policy/                 # OPA policies
│   ├── main.rego
│   ├── gcs.rego
│   ├── compute.rego
│   ├── firewall.rego
│   └── iam_gcp.rego
├── plan.json               # Terraform plan in JSON (generated)
└── validate_apply_gcp.py  # Python orchestrator script

2. Terraform Configuration (main.tf)

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project     = "YOUR_PROJECT_ID"
  region      = "us-central1"
  zone        = "us-central1-a"
  credentials = file("c:\\test\\credial\\key.json")
}

resource "random_id" "rand" { byte_length = 4 }

resource "google_storage_bucket" "demo_bucket" {
  name     = "my-demo-bucket-${random_id.rand.hex}"
  location = "US"
  storage_class = "STANDARD"
  force_destroy = true
  uniform_bucket_level_access = true
}

resource "google_compute_firewall" "default_allow_ssh" {
  name    = "allow-ssh"
  network = "default"
  allow {
    protocol = "tcp"
    ports    = ["22"]
  }
  source_ranges = ["0.0.0.0/0"]
  target_tags   = ["ssh-allowed"]
}

resource "google_compute_instance" "demo_vm" {
  name         = "demo-vm"
  machine_type = "e2-micro"
  zone         = "us-central1-a"
  tags = ["ssh-allowed"]
  boot_disk {
    initialize_params {
      image = "debian-cloud/debian-11"
      size  = 30
    }
  }
  network_interface {
    network = "default"
    access_config { }
  }
  metadata_startup_script = <<-EOT
    #!/bin/bash
    echo "Hello from Terraform VM" > /var/tmp/startup.txt
  EOT
}

3. OPA Policies

Main Aggregator (policy/main.rego)

package terraform

import data.terraform.gcs
import data.terraform.compute
import data.terraform.firewall
import data.terraform.iam_gcp

deny contains msg if { msg := gcs.deny[_] }
deny contains msg if { msg := compute.deny[_] }
deny contains msg if { msg := firewall.deny[_] }
deny contains msg if { msg := iam_gcp.deny[_] }

GCS Bucket (policy/gcs.rego)

package terraform.gcs
buckets[b] if { some r; b := input.resource_changes[r]; b.type == "google_storage_bucket" }

deny contains msg if { 
  b := buckets[_]; 
  b.change.after.acl == "public-read"; 
  msg := sprintf("GCS bucket %v: acl is public-read", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.versioning.enabled; 
  msg := sprintf("GCS bucket %v: versioning not enabled", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.encryption; 
  msg := sprintf("GCS bucket %v: encryption not configured", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.uniform_bucket_level_access; 
  msg := sprintf("GCS bucket %v: uniform_bucket_level_access not enabled", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.labels.Owner; 
  msg := sprintf("GCS bucket %v: missing 'Owner' label", [b.name]) 
}

deny contains msg if { 
  b := buckets[_]; 
  not b.change.after.labels.Environment; 
  msg := sprintf("GCS bucket %v: missing 'Environment' label", [b.name]) 
}

Compute Instance (policy/compute.rego)

package terraform.compute
instances[i] if { some r; i := input.resource_changes[r]; i.type == "google_compute_instance" }
disallowed_types := {"f1-micro","g1-small"}

deny contains msg if { inst := instances[_]; inst.change.after.machine_type in disallowed_types; msg := sprintf("Compute %v: disallowed machine type %v", [inst.name, inst.change.after.machine_type]) }
deny contains msg if { inst := instances[_]; not inst.change.after.service_account; msg := sprintf("Compute %v: missing service_account", [inst.name]) }
deny contains msg if { inst := instances[_]; nic := inst.change.after.network_interface[_]; ac := nic.access_config[_]; ac != null; msg := sprintf("Compute %v: has external IP", [inst.name]) }
deny contains msg if { inst := instances[_]; bd := inst.change.after.boot_disk; bd != null; not bd[0].disk_encryption_key; msg := sprintf("Compute %v: boot disk not encrypted", [inst.name]) }
deny contains msg if { inst := instances[_]; meta := inst.change.after.metadata_startup_script; contains(meta,"curl") & contains(meta,"bash"); msg := sprintf("Compute %v: startup script uses curl|bash pattern", [inst.name]) }
deny contains msg if { inst := instances[_]; not inst.change.after.labels.Environment; msg := sprintf("Compute %v: missing label 'Environment'", [inst.name]) }

Firewall (policy/firewall.rego)

package terraform.firewall
fws[f] if { some r; f := input.resource_changes[r]; f.type == "google_compute_firewall" }

deny contains msg if { fw := fws[_]; rule := fw.change.after; rule.allowed[_].protocol=="tcp"; rule.allowed[_].ports[_]=="22"; rule.source_ranges[_]=="0.0.0.0/0"; msg := sprintf("Firewall %v allows SSH 22 from 0.0.0.0/0", [fw.name]) }
deny contains msg if { fw := fws[_]; rule := fw.change.after; rule.allowed[_].protocol=="tcp"; rule.allowed[_].ports[_]=="3389"; rule.source_ranges[_]=="0.0.0.0/0"; msg := sprintf("Firewall %v allows RDP 3389 from 0.0.0.0/0", [fw.name]) }
deny contains msg if { fw := fws[_]; rule := fw.change.after; rule.allowed[_].protocol=="all"; rule.source_ranges[_]=="0.0.0.0/0"; msg := sprintf("Firewall %v allows all traffic from 0.0.0.0/0", [fw.name]) }

IAM / Service Account (policy/iam_gcp.rego)

package terraform.iam_gcp
service_accounts[s] if { some r; s := input.resource_changes[r]; s.type == "google_service_account" }
iam_bindings[b] if { some r; b := input.resource_changes[r]; b.type == "google_project_iam_binding" }

deny contains msg if { sa := service_accounts[_]; not sa.change.after.display_name; msg := sprintf("Service Account %v: missing display_name", [sa.name]) }
deny contains msg if { b := iam_bindings[_]; member := b.change.after.members[_]; member == "allUsers"; msg := sprintf("IAM binding %v grants role %v to allUsers", [b.name, b.change.after.role]) }
deny contains msg if { b := iam_bindings[_]; b.change.after.role == "roles/owner"; msg := sprintf("IAM binding %v uses broad role roles/owner", [b.name]) }

4. Python Orchestrator (validate_apply_gcp.py)

import subprocess, json, os, shutil, sys

TERRAFORM_DIR = "terraform"
PLAN_BIN = "tfplan"
PLAN_JSON = "plan.json"
POLICY_DIR = "policy"

def run(cmd, cwd=None, check=True):
    print("👉", " ".join(cmd))
    proc = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
    if proc.returncode != 0 and check:
        print("❌ Command failed:", " ".join(cmd))
        print("STDOUT:", proc.stdout)
        print("STDERR:", proc.stderr)
        sys.exit(proc.returncode)
    return proc.stdout

def terraform_plan_and_show():
    run(["terraform", "init", "-input=false"], cwd=TERRAFORM_DIR)
    run(["terraform", "plan", "-out", PLAN_BIN, "-input=false"], cwd=TERRAFORM_DIR)
    out = run(["terraform", "show", "-json", os.path.join(TERRAFORM_DIR, PLAN_BIN)], cwd=None)
    with open(PLAN_JSON, "w") as f: f.write(out)

def opa_eval():
    opa_path = shutil.which("opa")
    if not opa_path:
        print("❌ opa not found in PATH"); sys.exit(1)
    cmd = [opa_path,"eval","-i",PLAN_JSON,"-d",POLICY_DIR,"--format","json","data.terraform.deny"]
    out = run(cmd)
    return json.loads(out)

def extract_violations(opa_json):
    violations = []
    for item in opa_json.get("result",[]):
        for expr in item.get("expressions",[]):
            val = expr.get("value")
            if isinstance(val,list): violations.extend(val)
    return violations

def terraform_apply():
    run(["terraform","apply","-auto-approve"], cwd=TERRAFORM_DIR)

def main():
    terraform_plan_and_show()
    opa_json = opa_eval()
    violations = extract_violations(opa_json)
    if violations:
        print("\n❌ Policy violations found:")
        for v in violations: print(" -",v)
        print("🚫 Aborting terraform apply.")
        sys.exit(1)
    else:
        print("\n✅ No policy violations. Applying Terraform...")
        terraform_apply()

if __name__=="__main__":
    main()

5. Workflow Summary

  1. Put Terraform code in terraform/.
  2. Put Rego policies in policy/.
  3. Run python validate_apply_gcp.py.
  4. The script will generate plan.json, evaluate OPA policies, and abort if violations exist.
  5. If clean, it will automatically apply Terraform to create GCP infrastructure.

TF and OPA

AWS + Terraform + OPA — Full Project (HTML)

AWS + Terraform + OPA — Full Project (mock, failing by design)

This file contains everything you need to run an automated pipeline locally: Terraform configuration (non-compliant by value), OPA Rego policies (OPA v1.0+ syntax), a single policy entrypoint, and runner scripts (Python + PowerShell). The TF config has all properties present so you can toggle values to change behavior from fail to pass.


Project layout (what to create locally)

aws-opa-tf/
├── main.tf               # Terraform config (all resources present — intentionally non-compliant values)
├── deploy.py             # Python orchestrator (plan → json → opa eval → apply)
├── opa_check.ps1         # PowerShell runner (optional)
└── policy/
    ├── main.rego         # aggregator -> data.terraform.deny
    ├── ec2_complex.rego
    ├── s3_complex.rego
    ├── iam_complex.rego
    └── sg_simple.rego

1) Terraform (non-compliant but complete) — main.tf

All resources are fully defined. Values chosen here intentionally violate the policy rules (so OPA will report violations). When you want to pass, just update the flagged values in the comments.

// main.tf
provider "aws" {
  region = "us-east-1"
}

# ---------- S3 (complete, but non-compliant values) ----------
resource "aws_s3_bucket" "bad_bucket" {
  bucket = "opa-violation-bucket-12345"
  acl    = "public-read"            # ❌ non-compliant: should be "private"
  versioning {
    enabled = false                 # ❌ non-compliant: should be true
  }
  # encryption block present but we will treat as missing by policy (simulate misconfigured)
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }

  tags = {
    Environment = "Dev"
    Team        = "Platform"
  }
}

# ---------- IAM Role (present but invalid trust policy) ----------
resource "aws_iam_role" "bad_role" {
  name = "bad-role"

  # intentionally empty object (invalid trust) to trigger policy check
  assume_role_policy = jsonencode({})   # ❌ non-compliant: should have Version and Statement
}

# ---------- IAM Policy (too broad) ----------
resource "aws_iam_policy" "too_broad" {
  name   = "AllowEverything"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = "*",      # ❌ non-compliant
        Resource = "*"     # ❌ non-compliant
      }
    ]
  })
}

# ---------- Security Group (open SSH/RDP) ----------
resource "aws_security_group" "bad_sg" {
  name   = "bad-sg"
  vpc_id = "vpc-12345678"  # replace for real runs

  ingress {
    from_port   = 22
    to_port     = 22
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]   # ❌ non-compliant: open SSH
  }

  ingress {
    from_port   = 3389
    to_port     = 3389
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]   # ❌ non-compliant: open RDP
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

# ---------- EC2 Instance (present but misconfigured) ----------
resource "aws_instance" "bad_ec2" {
  ami           = "ami-12345678"         # replace for real runs
  instance_type = "t2.micro"             # ❌ disallowed type per policy
  subnet_id     = "subnet-12345678"      # replace for real runs
  vpc_security_group_ids = [aws_security_group.bad_sg.id]
  iam_instance_profile   = aws_iam_role.bad_role.name

  ebs_optimized = false                  # ❌ should be true
  ebs_block_device {
    device_name = "/dev/sda1"
    volume_size = 30
    volume_type = "standard"             # ❌ non-compliant for Prod (policy expects gp3/io1)
    encrypted   = false                  # ❌ must be true
  }

  tags = {
    Environment = "Prod"
    Service     = "frontend"
  }
}

2) OPA policies (OPA v1.0+ syntax) — put these files in policy/

All policies use deny contains msg if { ... } style so they work with OPA 1.0+.

2.1 Aggregator — policy/main.rego

package terraform

import data.terraform.ec2_complex as ec2_complex
import data.terraform.s3_complex as s3_complex
import data.terraform.iam_complex as iam_complex
import data.terraform.sg_simple as sg_simple

# Aggregate all denials into one entrypoint:
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }

2.2 EC2 + EBS + SG cross checks — policy/ec2_complex.rego

package terraform.ec2_complex

# Helper: find all aws_instance resources
instances[i] if {
  some r
  inst := input.resource_changes[r]
  inst.type == "aws_instance"
  i = inst
}

# Helper: find SG by address/name (mock plan uses address-like id)
sg_by_id(id) := sg if {
  sg := input.resource_changes[id]
  sg.type == "aws_security_group"
}

# 1) EBS optimized required
deny contains msg if {
  inst := instances[_]
  not inst.change.after.ebs_optimized
  msg := sprintf("EC2 %v is not EBS optimized", [inst.name])
}

# 2) No public IP
deny contains msg if {
  inst := instances[_]
  inst.change.after.associate_public_ip_address
  msg := sprintf("EC2 %v has a public IP assigned", [inst.name])
}

# 3) IAM instance profile must be attached
deny contains msg if {
  inst := instances[_]
  not inst.change.after.iam_instance_profile
  msg := sprintf("EC2 %v does not have an IAM instance profile", [inst.name])
}

# 4) EBS volumes must be encrypted
deny contains msg if {
  inst := instances[_]
  vol := inst.change.after.ebs_block_device[_]
  not vol.encrypted
  msg := sprintf("EC2 %v has unencrypted EBS volume %v", [inst.name, vol.device_name])
}

# 5) For Prod environment, EBS types must be gp3 or io1
deny contains msg if {
  inst := instances[_]
  inst.change.after.tags.Environment == "Prod"
  vol := inst.change.after.ebs_block_device[_]
  not (vol.volume_type == "gp3" or vol.volume_type == "io1")
  msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [inst.name, vol.volume_type, vol.device_name])
}

# 6) Check attached SGs: no SSH/RDP from 0.0.0.0/0
deny contains msg if {
  inst := instances[_]
  sg_id := inst.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [inst.name, sg.name])
}

deny contains msg if {
  inst := instances[_]
  sg_id := inst.change.after.vpc_security_group_ids[_]
  sg := sg_by_id(sg_id)
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [inst.name, sg.name])
}

2.3 S3 best practices — policy/s3_complex.rego

package terraform.s3_complex

# all buckets
buckets[b] if {
  some r
  b := input.resource_changes[r]
  b.type == "aws_s3_bucket"
}

deny contains msg if {
  b := buckets[_]
  b.change.after.acl == "public-read"
  msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.versioning.enabled
  msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}

deny contains msg if {
  b := buckets[_]
  not b.change.after.server_side_encryption_configuration
  msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}

# require tag Owner for operational traceability
deny contains msg if {
  b := buckets[_]
  not b.change.after.tags.Owner
  msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}

2.4 IAM least privilege — policy/iam_complex.rego

package terraform.iam_complex

# helpers
roles[r] if {
  some k
  r := input.resource_changes[k]
  r.type == "aws_iam_role"
}
policies[p] if {
  some k
  p := input.resource_changes[k]
  p.type == "aws_iam_policy"
}

# 1) assume_role_policy must be an object with Version and Statement
deny contains msg if {
  r := roles[_]
  # missing Version or Statement -> deny
  not r.change.after.assume_role_policy.Version
  msg := sprintf("IAM Role %v: assume_role_policy missing 'Version'", [r.name])
}

deny contains msg if {
  r := roles[_]
  not r.change.after.assume_role_policy.Statement
  msg := sprintf("IAM Role %v: assume_role_policy missing 'Statement'", [r.name])
}

# 2) Policy statements must not use wildcard Action or Resource
deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Action == "*"
  msg := sprintf("IAM Policy %v: Statement uses Action '*'", [p.name])
}

deny contains msg if {
  p := policies[_]
  stmt := p.change.after.policy.Statement[_]
  stmt.Resource == "*"
  msg := sprintf("IAM Policy %v: Statement uses Resource '*'", [p.name])
}

2.5 SG hygiene (standalone) — policy/sg_simple.rego

package terraform.sg_simple

# all security groups
sgs[sg] if {
  some r
  sg := input.resource_changes[r]
  sg.type == "aws_security_group"
}

# deny open SSH/RDP anywhere
deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 22; ing.to_port == 22)
  msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}

deny contains msg if {
  sg := sgs[_]
  ing := sg.change.after.ingress[_]
  ing.cidr_blocks[_] == "0.0.0.0/0"
  (ing.from_port == 3389; ing.to_port == 3389)
  msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}

3) Python orchestrator — deploy.py

Place this at project root. It runs terraform plan → writes plan.json → runs opa eval (using the single policy folder) → aborts or applies based on violations.

#!/usr/bin/env python3
# deploy.py
import subprocess, json, os, sys

PLAN_FILE = "plan.tfplan"
PLAN_JSON = "plan.json"
POLICY_DIR = "policy"

def run_cmd(cmd, check=True):
    print("👉", " ".join(cmd))
    proc = subprocess.run(cmd, capture_output=True, text=True)
    if check and proc.returncode != 0:
        print("❌ Command failed:", " ".join(cmd))
        print(proc.stderr or proc.stdout)
        sys.exit(proc.returncode)
    return proc.stdout

def terraform_plan():
    run_cmd(["terraform", "init", "-input=false"])
    run_cmd(["terraform", "plan", "-out", PLAN_FILE, "-input=false"])
    plan_json = run_cmd(["terraform", "show", "-json", PLAN_FILE])
    with open(PLAN_JSON, "w") as f:
        f.write(plan_json)

def opa_eval():
    # Single entrypoint: data.terraform.deny (from policy/main.rego aggregator)
    cmd = ["opa", "eval", "-i", PLAN_JSON, "-d", POLICY_DIR, "--format", "json", "data.terraform.deny"]
    out = run_cmd(cmd)
    return json.loads(out)

def extract_violations(opa_json):
    violations = []
    for item in opa_json.get("result", []):
        for expr in item.get("expressions", []):
            val = expr.get("value")
            if isinstance(val, list):
                violations.extend(val)
    return violations

def terraform_apply():
    run_cmd(["terraform", "apply", "-auto-approve"])

if __name__ == "__main__":
    terraform_plan()
    opa_json = opa_eval()
    violations = extract_violations(opa_json)
    if violations:
        print("\n\033[91m❌ Policy violations detected:\033[0m")
        for v in violations:
            print(" -", v)
        print("\n🚫 Aborting. Fix policy violations and run again.")
        sys.exit(1)
    else:
        print("\n\033[92m✅ All policies passed. Applying infra...\033[0m")
        terraform_apply()

4) PowerShell runner (optional) — opa_check.ps1

# opa_check.ps1 (optional)
$Plan = "C:\path\to\aws-opa-tf\plan.json"
$Policy = "C:\path\to\aws-opa-tf\policy"

$cmd = @("opa","eval","-i",$Plan,"-d",$Policy,"--format","json","data.terraform.deny")
try {
  $raw = & $cmd
} catch {
  Write-Error "OPA eval failed. Ensure opa.exe in PATH."
  exit 2
}
$out = $raw | ConvertFrom-Json
$violations = @()
foreach ($r in $out.result) {
  foreach ($e in $r.expressions) {
    if ($e.value) { $violations += $e.value }
  }
}
if ($violations.Count -gt 0) {
  Write-Host "❌ Policy violations:" -ForegroundColor Red
  $violations | ForEach-Object { Write-Host " - $_" -ForegroundColor Yellow }
  exit 1
} else {
  Write-Host "✅ All policies passed." -ForegroundColor Green
  exit 0
}

5) Quick help / run checklist

  1. Install prerequisites: Terraform, OPA (v1.0+), Python 3.x, AWS CLI (configured).
  2. Create folder aws-opa-tf and paste files: main.tf, deploy.py, policy/*.rego.
  3. Replace placeholder VPC/subnet/AMI values in main.tf with real ones for apply (or leave them if you only plan to run plan+OPA check).
  4. Run the pipeline: python deploy.py. This will:
    • run terraform plan and produce plan.json
    • run opa eval -d policy -i plan.json data.terraform.deny
    • if violations → shows them and exits; if none → runs terraform apply -auto-approve
  5. To test a passing run: change TF values to the compliant ones (see comments in main.tf), then rerun python deploy.py.

6) Expected violations (with the provided non-compliant TF values)

  • S3: ACL is public-read, versioning is false, SSE missing — fail
  • IAM Role: assume_role_policy is empty/invalid — fail
  • IAM Policy: allows Action: * and Resource: * — fail
  • Security Group: SSH/RDP open to 0.0.0.0/0 — fail
  • EC2: disallowed instance type, not EBS optimized, unencrypted EBS, Prod uses non-gp3/io1 — fail

Notes & small gotchas

  • OPA 1.0+ requires the deny contains msg if { ... } pattern (no mixing of default assignments with contains). The policies above follow that format.
  • When using real Terraform plans, resource addresses and IDs can be different (e.g. computed IDs). The Rego helpers above assume a simplified mock plan structure where the security group reference is the resource address (e.g. aws_security_group.bad_sg). For real plans, you may need to match by address fields or resource instance keys.
  • If some values are computed at apply time, check for after_unknown in the JSON and handle accordingly (you may treat unknowns as violations or warnings depending on policy).

If you want, I can:

  • Provide a clean/compliant version of main.tf so you can quickly toggle between fail/pass.
  • Add unit tests (small Python tests) that run OPA against multiple plan JSON variants.
  • Help you adapt the Rego rules to the exact real Terraform plan JSON output your environment produces (I can analyze a sample plan.json you produce and tune the rules).

Terraform + OPA Validation with Python

This Python script validates a Terraform plan using multiple OPA policies (S3, EC2, IAM, Security Group) and applies the infrastructure only if there are no policy violations.

Python Script (opa_tf_apply.py)

import subprocess
import json
import os
import shutil
import sys

# ----------------------------
# Configuration
# ----------------------------
plan_file = "terraform-plan.json"
policy_dir = "policy"

# Change working directory if needed
os.chdir(os.path.dirname(os.path.abspath(__file__)))

# ----------------------------
# Check if OPA exists
# ----------------------------
opa_path = shutil.which("opa")
if not opa_path:
    print("❌ OPA executable not found in PATH. Please install OPA and add it to PATH.")
    sys.exit(1)

# ----------------------------
# Collect all Rego policy files
# ----------------------------
rego_files = [os.path.join(policy_dir, f) for f in os.listdir(policy_dir) if f.endswith(".rego")]
if not rego_files:
    print(f"❌ No Rego files found in '{policy_dir}'. Please add policy files.")
    sys.exit(1)

# ----------------------------
# Build OPA eval command
# ----------------------------
cmd = [opa_path, "eval", "-i", plan_file, "--format", "json"]
for rego in rego_files:
    cmd.extend(["-d", rego])

# ----------------------------
# Run OPA eval
# ----------------------------
print("🔎 OPA validation start...")

try:
    result = subprocess.run(cmd, capture_output=True, text=True)
except Exception as e:
    print("❌ Error running OPA:", str(e))
    sys.exit(1)

# ----------------------------
# Check return code
# ----------------------------
if result.returncode != 0:
    print("❌ OPA command failed.")
    print("STDOUT:", result.stdout)
    print("STDERR:", result.stderr)
    sys.exit(1)

# ----------------------------
# Parse OPA output
# ----------------------------
try:
    opa_output = json.loads(result.stdout)
except json.JSONDecodeError:
    print("❌ Failed to parse OPA JSON output:")
    print(result.stdout)
    sys.exit(1)

violations = []
for res in opa_output.get("result", []):
    for expr in res.get("expressions", []):
        violations.extend(expr.get("value", []))

# ----------------------------
# Display violations
# ----------------------------
if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
    sys.exit(1)
else:
    print("✅ All policies passed. Proceeding with Terraform apply...")

    # ----------------------------
    # Run Terraform apply
    # ----------------------------
    try:
        tf_apply = subprocess.run(["terraform", "apply", "-auto-approve"], capture_output=True, text=True)
        print(tf_apply.stdout)
        if tf_apply.returncode != 0:
            print("❌ Terraform apply failed:")
            print(tf_apply.stderr)
            sys.exit(1)
        else:
            print("✅ Terraform infrastructure created successfully!")
    except Exception as e:
        print("❌ Error running Terraform apply:", str(e))
        sys.exit(1)

Tuesday, 26 August 2025

Centralize OPA policy

OPA Policy Validation for Multi-Resource Terraform Plan (Mock)

OPA Policy Validation for Mock Terraform Plan

This document demonstrates how to validate a mock Terraform plan JSON containing multiple resources (EC2, S3, Security Group, and IAM) using Open Policy Agent (OPA). It includes JSON input, Rego policies, and both Python and PowerShell scripts.

1. Directory Structure

C:\OPA_Mock_Project\
│
├── terraform-plan.json     # Mock Terraform plan JSON for all resources
└── policy\                 # OPA policies
    ├── s3.rego
    ├── ec2.rego
    ├── iam.rego
    └── sg.rego

2. Mock Terraform Plan JSON (terraform-plan.json)

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_s3_bucket.example": {
      "type": "aws_s3_bucket",
      "name": "example",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "bucket": "my-opa-test-bucket-12345",
          "acl": "public-read",
          "versioning": {"enabled": false},
          "server_side_encryption_configuration": null
        }
      }
    },
    "aws_instance.example_ec2": {
      "type": "aws_instance",
      "name": "example_ec2",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ami": "ami-12345678",
          "instance_type": "t2.micro",
          "ebs_optimized": false,
          "associate_public_ip_address": true,
          "iam_instance_profile": "my-ec2-role",
          "vpc_security_group_ids": ["sg-12345678"],
          "ebs_block_device": [
            {"device_name": "/dev/sda1", "volume_size": 30, "encrypted": false},
            {"device_name": "/dev/sdb", "volume_size": 50, "encrypted": true}
          ],
          "tags": {"Environment": "Dev"}
        }
      }
    },
    "aws_security_group.example_sg": {
      "type": "aws_security_group",
      "name": "example_sg",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ingress": [
            {"from_port": 22, "to_port": 22, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"]}
          ]
        }
      }
    },
    "aws_iam_role.example_role": {
      "type": "aws_iam_role",
      "name": "example_role",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "assume_role_policy": {
            "Version": "2012-10-17",
            "Statement": [
              {"Action":"sts:AssumeRole","Effect":"Allow","Principal":{"Service":"ec2.amazonaws.com"}}
            ]
          }
        }
      }
    }
  }
}

3. OPA Policies

S3 Policy (policy/s3.rego)

package terraform.s3

# Disallow public-read ACL
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  input.resource_changes[resource].change.after.acl == "public-read"
  msg := sprintf("Bucket %v has public-read ACL", [input.resource_changes[resource].name])
}

# Require versioning enabled
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.versioning.enabled
  msg := sprintf("Bucket %v does not have versioning enabled", [input.resource_changes[resource].name])
}

# Require encryption
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.server_side_encryption_configuration
  msg := sprintf("Bucket %v does not have server-side encryption", [input.resource_changes[resource].name])
}

EC2 Policy (policy/ec2.rego)

package terraform.ec2

# Disallow t2.micro instances
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  input.resource_changes[resource].change.after.instance_type == "t2.micro"
  msg := sprintf("Instance %v uses disallowed type t2.micro", [input.resource_changes[resource].name])
}

# Require EBS optimization
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  not input.resource_changes[resource].change.after.ebs_optimized
  msg := sprintf("Instance %v is not EBS optimized", [input.resource_changes[resource].name])
}

# Disallow public IP
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  input.resource_changes[resource].change.after.associate_public_ip_address
  msg := sprintf("Instance %v has a public IP assigned", [input.resource_changes[resource].name])
}

# EBS volumes must be encrypted
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  volume := input.resource_changes[resource].change.after.ebs_block_device[_]
  not volume.encrypted
  msg := sprintf("Instance %v has unencrypted volume %v", [input.resource_changes[resource].name, volume.device_name])
}

Security Group Policy (policy/sg.rego)

package terraform.sg

# Disallow open ingress 0.0.0.0/0
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_security_group"
  ingress := input.resource_changes[resource].change.after.ingress[_]
  ingress.cidr_blocks[_] == "0.0.0.0/0"
  msg := sprintf("Security Group %v has open ingress to 0.0.0.0/0", [input.resource_changes[resource].name])
}

IAM Policy (policy/iam.rego)

package terraform.iam

# Require assume role policy
deny contains msg if {
  some resource
  input.resource_changes[resource].type == "aws_iam_role"
  not input.resource_changes[resource].change.after.assume_role_policy
  msg := sprintf("IAM Role %v does not have an assume role policy", [input.resource_changes[resource].name])
}

4. Python Script (opa_check.py)

import subprocess
import json
import os

plan_file = "terraform-plan.json"
policy_dir = "policy"

rego_files = [os.path.join(policy_dir, f) for f in os.listdir(policy_dir) if f.endswith(".rego")]

cmd = ["opa", "eval", "-i", plan_file, "--format", "json", "data"]
for rego in rego_files:
    cmd.extend(["-d", rego])

result = subprocess.run(cmd, capture_output=True, text=True)
opa_output = json.loads(result.stdout)

violations = []
for res in opa_output["result"]:
    for expr in res["expressions"]:
        if expr["value"]:
            violations.extend(expr["value"])

if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
else:
    print("✅ All policies passed.")

5. PowerShell Script (opa_check.ps1)

$PlanFile = "C:\OPA_Mock_Project\terraform-plan.json"
$PolicyFolder = "C:\OPA_Mock_Project\policy"

$RegoFiles = Get-ChildItem -Path $PolicyFolder -Filter *.rego | ForEach-Object { $_.FullName }

$OpaCommand = @("opa", "eval", "-i", $PlanFile, "--format", "json", "data")
foreach ($rego in $RegoFiles) { $OpaCommand += @("-d", $rego) }

try {
    $OpaOutputRaw = & $OpaCommand
} catch {
    Write-Error "Failed to run OPA. Ensure opa.exe is in PATH."
    exit 1
}

$OpaOutput = $OpaOutputRaw | ConvertFrom-Json
$Violations = @()
foreach ($res in $OpaOutput.result) {
    foreach ($expr in $res.expressions) {
        if ($expr.value) { $Violations += $expr.value }
    }
}

if ($Violations.Count -gt 0) {
    Write-Host "❌ Policy violations found:" -ForegroundColor Red
    foreach ($v in $Violations) { Write-Host "- $v" -ForegroundColor Yellow }
    exit 1
} else {
    Write-Host "✅ All policies passed." -ForegroundColor Green
}

6. Expected Violations for This Mock Plan

  • S3 bucket has public-read ACL
  • S3 bucket does not have versioning enabled
  • S3 bucket does not have server-side encryption
  • EC2 instance uses disallowed type t2.micro
  • EC2 instance is not EBS optimized
  • EC2 instance has a public IP assigned
  • EC2 instance has unencrypted volume /dev/sda1
  • Security Group has open ingress 0.0.0.0/0

EC2 mock and OPA

EC2 Terraform OPA Policy Test

EC2 Terraform OPA Policy Test


1. Mock Terraform Plan JSON (ec2-plan.json)

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_instance.example": {
      "type": "aws_instance",
      "name": "example",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "ami": "ami-12345678",
          "instance_type": "t2.micro",
          "ebs_optimized": false,
          "associate_public_ip_address": true,
          "iam_instance_profile": "my-ec2-role",
          "vpc_security_group_ids": ["sg-12345678", "sg-87654321"],
          "tags": {"Environment": "Dev"},
          "ebs_block_device": [
            {"device_name": "/dev/sda1", "volume_size": 30, "encrypted": false},
            {"device_name": "/dev/sdb", "volume_size": 50, "encrypted": true}
          ]
        }
      }
    }
  }
}

Explanation of Fields

Field Purpose / Relevance
instance_typeMust follow allowed instance types
ebs_optimizedMust be true for performance
associate_public_ip_addressMust be false for private instances
iam_instance_profileMust be attached for proper permissions
vpc_security_group_idsMust include required security groups
ebs_block_deviceVolumes must be encrypted
tagsOptional; can enforce Owner or Environment

2. OPA Policy (ec2.rego)

package terraform.ec2

default deny = []

# 1. Disallow t2.micro instances
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  input.resource_changes[resource].change.after.instance_type == "t2.micro"
  msg := sprintf("Instance %v uses disallowed type t2.micro", [input.resource_changes[resource].name])
}

# 2. Require EBS optimization
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  not input.resource_changes[resource].change.after.ebs_optimized
  msg := sprintf("Instance %v is not EBS optimized", [input.resource_changes[resource].name])
}

# 3. Disallow public IP
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  input.resource_changes[resource].change.after.associate_public_ip_address
  msg := sprintf("Instance %v has a public IP assigned", [input.resource_changes[resource].name])
}

# 4. Require IAM role
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  not input.resource_changes[resource].change.after.iam_instance_profile
  msg := sprintf("Instance %v does not have an IAM role attached", [input.resource_changes[resource].name])
}

# 5. Require Security Groups
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  sg_ids := input.resource_changes[resource].change.after.vpc_security_group_ids
  count(sg_ids) == 0
  msg := sprintf("Instance %v does not have any Security Groups attached", [input.resource_changes[resource].name])
}

# 6. EBS volumes must be encrypted
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  volume := input.resource_changes[resource].change.after.ebs_block_device[_]
  not volume.encrypted
  msg := sprintf("Instance %v has unencrypted volume %v", [input.resource_changes[resource].name, volume.device_name])
}

# 7. Optional: Require Owner tag
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_instance"
  tags := input.resource_changes[resource].change.after.tags
  not ("Owner" in tags)
  msg := sprintf("Instance %v does not have an 'Owner' tag", [input.resource_changes[resource].name])
}

3. Python Script to Call OPA

import subprocess
import json

plan_file = "ec2-plan.json"
policy_file = "policy/ec2.rego"

cmd = [
    "opa", "eval",
    "-i", plan_file,
    "-d", policy_file,
    "data.terraform.ec2.deny",
    "--format", "json"
]

result = subprocess.run(cmd, capture_output=True, text=True)
opa_output = json.loads(result.stdout)

violations = opa_output["result"][0]["expressions"][0]["value"]

if violations:
    print("❌ Policy violations found:")
    for v in violations:
        print("-", v)
else:
    print("✅ All policies passed.")

4. PowerShell Script to Call OPA

# -----------------------------
# PowerShell OPA Test Script for EC2 with EBS, IAM, SG
# -----------------------------

$PlanFile = "C:\OPA_EC2\ec2-plan.json"
$PolicyFile = "C:\OPA_EC2\policy\ec2.rego"

$OpaCommand = @(
    "opa", "eval",
    "-i", $PlanFile,
    "-d", $PolicyFile,
    "data.terraform.ec2.deny",
    "--format", "json"
)

try {
    $OpaOutputRaw = & $OpaCommand
} catch {
    Write-Error "Failed to run OPA. Make sure opa.exe is in your PATH."
    exit 1
}

$OpaOutput = $OpaOutputRaw | ConvertFrom-Json
$Violations = $OpaOutput.result[0].expressions[0].value

if ($Violations.Count -gt 0) {
    Write-Host "❌ Policy violations found:" -ForegroundColor Red
    foreach ($v in $Violations) {
        Write-Host "- $v" -ForegroundColor Yellow
    }
    Write-Host "Aborting Terraform apply due to policy violations..." -ForegroundColor Red
    exit 1
} else {
    Write-Host "✅ All policies passed." -ForegroundColor Green
}

5. Expected Output for This Mock JSON

❌ Policy violations found:
- Instance aws_instance.example uses disallowed type t2.micro
- Instance aws_instance.example is not EBS optimized
- Instance aws_instance.example has a public IP assigned
- Instance aws_instance.example has unencrypted volume /dev/sda1
- Instance aws_instance.example does not have an 'Owner' tag

s3-plan.json with all posible values and OPA policy

OPA Terraform S3 Policy Test

Complete Mock Terraform Plan JSON (s3-plan.json)

{
  "format_version": "0.1",
  "terraform_version": "1.13.1",
  "resource_changes": {
    "aws_s3_bucket.example": {
      "type": "aws_s3_bucket",
      "name": "example",
      "change": {
        "actions": ["create"],
        "before": null,
        "after": {
          "bucket": "my-opa-test-bucket-12345",
          "acl": "public-read", 
          "versioning": { "enabled": false },
          "server_side_encryption_configuration": null,
          "force_destroy": false,
          "tags": { "Environment": "Dev", "Owner": "Alice" },
          "lifecycle_rule": [
            { "id": "log-expire", "enabled": true, "expiration": { "days": 365 } }
          ],
          "logging": { "target_bucket": "log-bucket", "target_prefix": "s3logs/" },
          "website": null
        }
      }
    }
  }
}

Explanation of Fields

FieldPurpose / Relevance
aclAccess control; triggers public-read policy
versioning.enabledMust be true for compliance
server_side_encryption_configurationMust exist for compliance
force_destroyOptional; determines bucket deletion behavior
tagsOptional; useful for governance rules
lifecycle_ruleOptional; could be used in advanced policies
loggingOptional; compliance with logging requirements
websiteOptional; could block public website hosting

Complete OPA Policy (s3.rego)

package terraform.s3

# Default: no deny messages
default deny = []

# 1. Disallow public S3 buckets
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  acl := input.resource_changes[resource].change.after.acl
  acl == "public-read"
  msg := sprintf("Bucket %v has public-read ACL", [input.resource_changes[resource].name])
}

# 2. Require versioning
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.versioning.enabled
  msg := sprintf("Bucket %v does not have versioning enabled", [input.resource_changes[resource].name])
}

# 3. Require server-side encryption
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.server_side_encryption_configuration
  msg := sprintf("Bucket %v does not have server-side encryption enabled", [input.resource_changes[resource].name])
}

# 4. Optional: Require specific tags
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  tags := input.resource_changes[resource].change.after.tags
  not ("Owner" in tags)
  msg := sprintf("Bucket %v does not have an 'Owner' tag", [input.resource_changes[resource].name])
}

# 5. Optional: Require lifecycle rule
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  rules := input.resource_changes[resource].change.after.lifecycle_rule
  count(rules) == 0
  msg := sprintf("Bucket %v does not have a lifecycle rule defined", [input.resource_changes[resource].name])
}

# 6. Optional: Require logging
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  logging := input.resource_changes[resource].change.after.logging
  logging == null
  msg := sprintf("Bucket %v does not have logging enabled", [input.resource_changes[resource].name])
}

How to Test

  1. Save s3-plan.json in your project directory.
  2. Save s3.rego in policy\ folder.
  3. Run:
  4. opa eval -i s3-plan.json -d policy\s3.rego "data.terraform.s3.deny"

    Expected output:

    {
      "result": [
        {
          "expressions": [
            {
              "value": [
                "Bucket aws_s3_bucket.example has public-read ACL",
                "Bucket aws_s3_bucket.example does not have versioning enabled",
                "Bucket aws_s3_bucket.example does not have server-side encryption enabled"
              ]
            }
          ]
        }
      ]
    }

    Optional rules (tags, lifecycle, logging) will also trigger if null/missing.

    Using Python subprocess to call OPA CLI

    import subprocess
    import json
    
    # Path to Terraform plan JSON
    plan_file = "s3-plan.json"
    
    # Path to OPA policy
    policy_file = "policy/s3.rego"
    
    # OPA eval command
    cmd = [
        "opa", "eval",
        "-i", plan_file,
        "-d", policy_file,
        "data.terraform.s3.deny",
        "--format", "json"
    ]
    
    # Run the command
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    # Parse output
    opa_output = json.loads(result.stdout)
    
    # Print results
    violations = opa_output["result"][0]["expressions"][0]["value"]
    if violations:
        print("❌ Policy violations found:")
        for v in violations:
            print("-", v)
    else:
        print("✅ All policies passed.")

    PowerShell OPA Test Script

    # -----------------------------
    # PowerShell OPA Test Script
    # -----------------------------
    
    # Paths (update if needed)
    $PlanFile = "C:\OPA_S3\s3-plan.json"
    $PolicyFile = "C:\OPA_S3\policy\s3.rego"
    
    # OPA command
    $OpaCommand = @(
        "opa", "eval",
        "-i", $PlanFile,
        "-d", $PolicyFile,
        "data.terraform.s3.deny",
        "--format", "json"
    )
    
    # Run OPA eval
    try {
        $OpaOutputRaw = & $OpaCommand
    } catch {
        Write-Error "Failed to run OPA. Make sure opa.exe is in your PATH."
        exit 1
    }
    
    # Parse JSON output
    $OpaOutput = $OpaOutputRaw | ConvertFrom-Json
    
    # Extract violations
    $Violations = $OpaOutput.result[0].expressions[0].value
    
    # Display results
    if ($Violations.Count -gt 0) {
        Write-Host "❌ Policy violations found:" -ForegroundColor Red
        foreach ($v in $Violations) {
            Write-Host "- $v" -ForegroundColor Yellow
        }
    
        # Optional: Stop execution if violations found
        Write-Host "Aborting Terraform apply due to policy violations..." -ForegroundColor Red
        exit 1
    } else {
        Write-Host "✅ All policies passed." -ForegroundColor Green

Monday, 25 August 2025

OPA Policy Enforcement for AWS S3 using Terraform

OPA + Terraform S3 Policy Guide (Windows)

OPA + Terraform S3 Policy Guide (Windows)


1. Prerequisites

Tools & Versions

Tool / Software Recommended Version (as of Sept 2025)
Terraform CLI 1.13.1
OPA (Open Policy Agent) 0.67.0
AWS CLI v2 Latest (v2.x.y)

Install Steps

(A) Install Terraform

  1. Download: Terraform for Windows (v1.13.1)
  2. Extract the .zip to C:\Terraform
  3. Add C:\Terraform to System PATH:
    • Search “Environment Variables”
    • Edit System variables → Path → New → C:\Terraform
  4. Verify installation:
    terraform -version

(B) Install OPA

  1. Download: OPA Windows Release (v0.67.0)
  2. Extract opa_windows_amd64.exe (rename to opa.exe) to C:\OPA
  3. Add C:\OPA to System PATH
  4. Verify installation:
    opa version

(C) Install AWS CLI v2

  1. Download: AWS CLI v2 MSI Installer
  2. Run installer
  3. Configure:
    aws configure
    Enter:
    • Access Key
    • Secret Key
    • Region (e.g., us-east-1)
    • Output format: json

2. Create Terraform Configuration

Directory structure:

C:\OPA_S3\
│
├── main.tf               # Terraform configuration (AWS provider & S3 bucket)
├── variables.tf          # (Optional) Variables file
├── outputs.tf            # (Optional) Outputs file
├── s3.tfplan             # Terraform binary plan file (auto-generated)
├── s3-plan.json          # JSON plan for OPA (auto-generated)
│
└── policy\               # Folder for OPA policies
    └── s3.rego           # OPA policy for S3 compliance

Create main.tf inside C:\OPA_S3:

provider "aws" {
  region = "us-east-1"
}

resource "aws_s3_bucket" "example" {
  bucket = "my-opa-test-bucket-12345"
  acl    = "public-read" # <-- This will violate the policy
}

3. Generate Terraform Plan

cd C:\OPA_S3
terraform init
terraform plan -out=s3.tfplan
terraform show -json s3.tfplan > s3-plan.json

This creates a machine-readable s3-plan.json for OPA.


4. Create OPA Policy

Create folder C:\OPA_S3\policy and file s3.rego:

package terraform.s3

# Default: no deny messages
default deny = []

# Rule: Disallow public S3 buckets
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  acl := input.resource_changes[resource].change.after.acl
  acl == "public-read"
  msg := sprintf("Bucket %v has public-read ACL", [input.resource_changes[resource].name])
}

# Rule: Require versioning
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.versioning.enabled
  msg := sprintf("Bucket %v does not have versioning enabled", [input.resource_changes[resource].name])
}

# Rule: Require server-side encryption
deny[msg] {
  some resource
  input.resource_changes[resource].type == "aws_s3_bucket"
  not input.resource_changes[resource].change.after.server_side_encryption_configuration
  msg := sprintf("Bucket %v does not have server-side encryption enabled", [input.resource_changes[resource].name])
}

5. Evaluate Policy with OPA

opa eval -i s3-plan.json -d policy\s3.rego "data.terraform.s3.deny"

Expected output example:

{
  "result": [
    {
      "expressions": [
        {
          "value": [
            "Bucket aws_s3_bucket.example has public-read ACL",
            "Bucket aws_s3_bucket.example does not have versioning enabled",
            "Bucket aws_s3_bucket.example does not have server-side encryption enabled"
          ]
        }
      ]
    }
  ]
}

If the list is empty ([]), your Terraform code passes all policies.


6. Enforce the Policy

Integration examples:

  • CI/CD pipeline: Fail the job if deny messages exist.
  • Pre-apply hook: Block terraform apply if violations are found.

Example PowerShell snippet:

if ((opa eval -i s3-plan.json -d policy\s3.rego "data.terraform.s3.deny" | ConvertFrom-Json).result[0].expressions[0].value.Count -gt 0) {
    Write-Host "Policy violations found! Aborting..."
    exit 1
}

7. How the Policy Works

  • Reads Terraform plan (s3-plan.json → contains resource_changes array).
  • Loops through resources with some resource.
  • Checks only aws_s3_bucket type.
  • Validates:
    • ACL should not be public-read
    • Versioning must be enabled
    • Server-side encryption must be enabled
  • Adds messages to deny list if violations are found.

Saturday, 9 August 2025

Create Google subscption with BigQuery and CloudStoerage

Event Generator – GCP Pub/Sub, BigQuery, and Cloud Storage Utility

🧩 utils/event_generator.py

This Python utility automates creation and cleanup of Google Cloud Pub/Sub, BigQuery, and Cloud Storage resources. It also allows you to publish sample messages for testing event-driven architectures.

📘 Full Python Code

import logging
import os
from google.cloud import pubsub_v1, bigquery, storage
from google.api_core.exceptions import Conflict, NotFound


class EventGenerator:
    def __init__(self, project_id, topic_id, subscription_id, region="US"):
        self.project_id = project_id
        self.topic_id = topic_id
        self.subscription_id = subscription_id
        self.region = region

        self.topic_path = f"projects/{project_id}/topics/{topic_id}"
        self.subscription_path = f"projects/{project_id}/subscriptions/{subscription_id}"

        self.publisher = pubsub_v1.PublisherClient()
        self.subscriber = pubsub_v1.SubscriberClient()
        self.bq_client = bigquery.Client(project=project_id)
        self.storage_client = storage.Client(project=project_id)

        self.service_account_email = os.getenv(
            "PUBSUB_SERVICE_ACCOUNT_EMAIL",
            f"citi-pubsub-writer@{project_id}.iam.gserviceaccount.com",
        )

        self.created_dataset = None
        self.created_table = None
        self.created_bucket = None

    def create_bigquery_resources(self, dataset_id="pubsub_audit", table_id="subscription_events"):
        dataset_ref = self.bq_client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)
        try:
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = self.region
            dataset = self.bq_client.create_dataset(dataset)
            logging.info(f"✅ Created BigQuery dataset: {dataset.dataset_id}")
        except Conflict:
            logging.info(f"BigQuery dataset {dataset_id} already exists.")

        schema = [
            bigquery.SchemaField("message_id", "STRING"),
            bigquery.SchemaField("data", "STRING"),
            bigquery.SchemaField("attributes", "STRING"),
            bigquery.SchemaField("publish_time", "TIMESTAMP"),
        ]
        try:
            table = bigquery.Table(table_ref, schema=schema)
            self.bq_client.create_table(table)
            logging.info(f"✅ Created BigQuery table: {table_id}")
        except Conflict:
            logging.info(f"BigQuery table {table_id} already exists.")

        self.created_dataset = dataset_id
        self.created_table = table_id
        return f"projects/{self.project_id}/datasets/{dataset_id}/tables/{table_id}"

    def create_storage_bucket(self, bucket_name="pubsub-events-bucket"):
        bucket = self.storage_client.bucket(bucket_name)
        try:
            bucket.location = self.region
            self.storage_client.create_bucket(bucket)
            logging.info(f"✅ Created Cloud Storage bucket: {bucket_name}")
        except Conflict:
            logging.info(f"Cloud Storage bucket {bucket_name} already exists.")
        self.created_bucket = bucket_name
        return f"projects/_/buckets/{bucket_name}"

    def create_topic(self):
        try:
            self.publisher.create_topic(request={"name": self.topic_path})
            logging.info(f"✅ Created Pub/Sub topic: {self.topic_id}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Pub/Sub topic {self.topic_id} already exists.")
            else:
                raise e

    def create_subscription(self):
        bq_table = self.create_bigquery_resources()
        gcs_bucket = self.create_storage_bucket()

        bigquery_config = {
            "table": bq_table,
            "use_topic_schema": True,
            "write_metadata": True,
            "drop_unknown_fields": True,
            "service_account_email": self.service_account_email,
        }

        cloud_storage_config = {
            "bucket": gcs_bucket,
            "filename_prefix": "events/",
            "filename_suffix": ".json",
            "max_bytes": 5000000,
            "max_duration": "300s",
            "service_account_email": self.service_account_email,
        }

        request_data = {
            "name": self.subscription_path,
            "topic": self.topic_path,
            "enable_message_ordering": True,
            "bigquery_config": bigquery_config,
            "cloud_storage_config": cloud_storage_config,
        }

        try:
            self.subscriber.create_subscription(request=request_data)
            logging.info(f"✅ Created Pub/Sub subscription: {self.subscription_id}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Subscription {self.subscription_id} already exists.")
            else:
                raise e

    def publish_event(self, message: str, attributes=None):
        attributes = attributes or {}
        data = message.encode("utf-8")
        future = self.publisher.publish(self.topic_path, data, **attributes)
        msg_id = future.result()
        logging.info(f"📤 Published message ID: {msg_id}")
        return msg_id

    def delete_resources(self):
        try:
            self.subscriber.delete_subscription(request={"subscription": self.subscription_path})
            logging.info(f"🗑️ Deleted Pub/Sub subscription: {self.subscription_id}")
        except NotFound:
            logging.warning(f"Subscription {self.subscription_id} not found.")
        except Exception as e:
            logging.warning(f"Failed to delete subscription: {e}")

        try:
            self.publisher.delete_topic(request={"topic": self.topic_path})
            logging.info(f"🗑️ Deleted Pub/Sub topic: {self.topic_id}")
        except NotFound:
            logging.warning(f"Topic {self.topic_id} not found.")
        except Exception as e:
            logging.warning(f"Failed to delete topic: {e}")

        if self.created_dataset:
            try:
                self.bq_client.delete_dataset(
                    dataset=self.created_dataset,
                    delete_contents=True,
                    not_found_ok=True,
                )
                logging.info(f"🗑️ Deleted BigQuery dataset: {self.created_dataset}")
            except Exception as e:
                logging.warning(f"Failed to delete BigQuery dataset {self.created_dataset}: {e}")

        if self.created_bucket:
            try:
                bucket = self.storage_client.bucket(self.created_bucket)
                bucket.delete(force=True)
                logging.info(f"🗑️ Deleted Cloud Storage bucket: {self.created_bucket}")
            except Exception as e:
                logging.warning(f"Failed to delete Cloud Storage bucket {self.created_bucket}: {e}")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    project_id = os.getenv("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
    topic_id = "demo-topic"
    subscription_id = "demo-subscription"

    generator = EventGenerator(project_id, topic_id, subscription_id)
    generator.create_topic()
    generator.create_subscription()
    generator.publish_event("Sample event for audit", {"env": "dev"})

    # Uncomment to clean up:
    # generator.delete_resources()

⚙️ How It Works

ResourceCreate MethodDelete Method
Pub/Sub Topiccreate_topic()delete_resources()
Pub/Sub Subscriptioncreate_subscription()delete_resources()
BigQuery Dataset/Tablecreate_bigquery_resources()delete_resources()
Cloud Storage Bucketcreate_storage_bucket()delete_resources()

🧠 Run Instructions

export GOOGLE_CLOUD_PROJECT=my-gcp-project
export PUBSUB_SERVICE_ACCOUNT_EMAIL=citi-pubsub-writer@my-gcp-project.iam.gserviceaccount.com

python utils/event_generator.py

✅ Example Output

✅ Created Pub/Sub topic: demo-topic
✅ Created BigQuery dataset: pubsub_audit
✅ Created BigQuery table: subscription_events
✅ Created Cloud Storage bucket: pubsub-events-bucket
✅ Created Pub/Sub subscription: demo-subscription
📤 Published message ID: 481027638463
🗑️ Deleted Pub/Sub subscription: demo-subscription
🗑️ Deleted Pub/Sub topic: demo-topic
🗑️ Deleted BigQuery dataset: pubsub_audit
🗑️ Deleted Cloud Storage bucket: pubsub-events-bucket
Note: The script uses official Google Cloud client libraries which internally call real googleapis.com/v1 endpoints. Ensure you have gcloud auth application-default login completed before execution.