AWS + Terraform + OPA — Full Project (HTML)
AWS + Terraform + OPA — Full Project (mock, failing by design)
This file contains everything you need to run an automated pipeline locally:
Terraform configuration (non-compliant by value), OPA Rego policies (OPA v1.0+ syntax),
a single policy entrypoint, and runner scripts (Python + PowerShell). The TF config has all properties present
so you can toggle values to change behavior from fail to pass.
Project layout (what to create locally)
aws-opa-tf/
├── main.tf # Terraform config (all resources present — intentionally non-compliant values)
├── deploy.py # Python orchestrator (plan → json → opa eval → apply)
├── opa_check.ps1 # PowerShell runner (optional)
└── policy/
├── main.rego # aggregator -> data.terraform.deny
├── ec2_complex.rego
├── s3_complex.rego
├── iam_complex.rego
└── sg_simple.rego
1) Terraform (non-compliant but complete) — main.tf
All resources are fully defined. Values chosen here intentionally violate the policy rules (so OPA will report violations).
When you want to pass, just update the flagged values in the comments.
// main.tf
provider "aws" {
region = "us-east-1"
}
# ---------- S3 (complete, but non-compliant values) ----------
resource "aws_s3_bucket" "bad_bucket" {
bucket = "opa-violation-bucket-12345"
acl = "public-read" # ❌ non-compliant: should be "private"
versioning {
enabled = false # ❌ non-compliant: should be true
}
# encryption block present but we will treat as missing by policy (simulate misconfigured)
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
tags = {
Environment = "Dev"
Team = "Platform"
}
}
# ---------- IAM Role (present but invalid trust policy) ----------
resource "aws_iam_role" "bad_role" {
name = "bad-role"
# intentionally empty object (invalid trust) to trigger policy check
assume_role_policy = jsonencode({}) # ❌ non-compliant: should have Version and Statement
}
# ---------- IAM Policy (too broad) ----------
resource "aws_iam_policy" "too_broad" {
name = "AllowEverything"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = "*", # ❌ non-compliant
Resource = "*" # ❌ non-compliant
}
]
})
}
# ---------- Security Group (open SSH/RDP) ----------
resource "aws_security_group" "bad_sg" {
name = "bad-sg"
vpc_id = "vpc-12345678" # replace for real runs
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"] # ❌ non-compliant: open SSH
}
ingress {
from_port = 3389
to_port = 3389
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"] # ❌ non-compliant: open RDP
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
# ---------- EC2 Instance (present but misconfigured) ----------
resource "aws_instance" "bad_ec2" {
ami = "ami-12345678" # replace for real runs
instance_type = "t2.micro" # ❌ disallowed type per policy
subnet_id = "subnet-12345678" # replace for real runs
vpc_security_group_ids = [aws_security_group.bad_sg.id]
iam_instance_profile = aws_iam_role.bad_role.name
ebs_optimized = false # ❌ should be true
ebs_block_device {
device_name = "/dev/sda1"
volume_size = 30
volume_type = "standard" # ❌ non-compliant for Prod (policy expects gp3/io1)
encrypted = false # ❌ must be true
}
tags = {
Environment = "Prod"
Service = "frontend"
}
}
2) OPA policies (OPA v1.0+ syntax) — put these files in policy/
All policies use deny contains msg if { ... }
style so they work with OPA 1.0+.
2.1 Aggregator — policy/main.rego
package terraform
import data.terraform.ec2_complex as ec2_complex
import data.terraform.s3_complex as s3_complex
import data.terraform.iam_complex as iam_complex
import data.terraform.sg_simple as sg_simple
# Aggregate all denials into one entrypoint:
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }
2.2 EC2 + EBS + SG cross checks — policy/ec2_complex.rego
package terraform.ec2_complex
# Helper: find all aws_instance resources
instances[i] if {
some r
inst := input.resource_changes[r]
inst.type == "aws_instance"
i = inst
}
# Helper: find SG by address/name (mock plan uses address-like id)
sg_by_id(id) := sg if {
sg := input.resource_changes[id]
sg.type == "aws_security_group"
}
# 1) EBS optimized required
deny contains msg if {
inst := instances[_]
not inst.change.after.ebs_optimized
msg := sprintf("EC2 %v is not EBS optimized", [inst.name])
}
# 2) No public IP
deny contains msg if {
inst := instances[_]
inst.change.after.associate_public_ip_address
msg := sprintf("EC2 %v has a public IP assigned", [inst.name])
}
# 3) IAM instance profile must be attached
deny contains msg if {
inst := instances[_]
not inst.change.after.iam_instance_profile
msg := sprintf("EC2 %v does not have an IAM instance profile", [inst.name])
}
# 4) EBS volumes must be encrypted
deny contains msg if {
inst := instances[_]
vol := inst.change.after.ebs_block_device[_]
not vol.encrypted
msg := sprintf("EC2 %v has unencrypted EBS volume %v", [inst.name, vol.device_name])
}
# 5) For Prod environment, EBS types must be gp3 or io1
deny contains msg if {
inst := instances[_]
inst.change.after.tags.Environment == "Prod"
vol := inst.change.after.ebs_block_device[_]
not (vol.volume_type == "gp3" or vol.volume_type == "io1")
msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [inst.name, vol.volume_type, vol.device_name])
}
# 6) Check attached SGs: no SSH/RDP from 0.0.0.0/0
deny contains msg if {
inst := instances[_]
sg_id := inst.change.after.vpc_security_group_ids[_]
sg := sg_by_id(sg_id)
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 22; ing.to_port == 22)
msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [inst.name, sg.name])
}
deny contains msg if {
inst := instances[_]
sg_id := inst.change.after.vpc_security_group_ids[_]
sg := sg_by_id(sg_id)
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 3389; ing.to_port == 3389)
msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [inst.name, sg.name])
}
2.3 S3 best practices — policy/s3_complex.rego
package terraform.s3_complex
# all buckets
buckets[b] if {
some r
b := input.resource_changes[r]
b.type == "aws_s3_bucket"
}
deny contains msg if {
b := buckets[_]
b.change.after.acl == "public-read"
msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}
deny contains msg if {
b := buckets[_]
not b.change.after.versioning.enabled
msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}
deny contains msg if {
b := buckets[_]
not b.change.after.server_side_encryption_configuration
msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}
# require tag Owner for operational traceability
deny contains msg if {
b := buckets[_]
not b.change.after.tags.Owner
msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}
2.4 IAM least privilege — policy/iam_complex.rego
package terraform.iam_complex
# helpers
roles[r] if {
some k
r := input.resource_changes[k]
r.type == "aws_iam_role"
}
policies[p] if {
some k
p := input.resource_changes[k]
p.type == "aws_iam_policy"
}
# 1) assume_role_policy must be an object with Version and Statement
deny contains msg if {
r := roles[_]
# missing Version or Statement -> deny
not r.change.after.assume_role_policy.Version
msg := sprintf("IAM Role %v: assume_role_policy missing 'Version'", [r.name])
}
deny contains msg if {
r := roles[_]
not r.change.after.assume_role_policy.Statement
msg := sprintf("IAM Role %v: assume_role_policy missing 'Statement'", [r.name])
}
# 2) Policy statements must not use wildcard Action or Resource
deny contains msg if {
p := policies[_]
stmt := p.change.after.policy.Statement[_]
stmt.Action == "*"
msg := sprintf("IAM Policy %v: Statement uses Action '*'", [p.name])
}
deny contains msg if {
p := policies[_]
stmt := p.change.after.policy.Statement[_]
stmt.Resource == "*"
msg := sprintf("IAM Policy %v: Statement uses Resource '*'", [p.name])
}
2.5 SG hygiene (standalone) — policy/sg_simple.rego
package terraform.sg_simple
# all security groups
sgs[sg] if {
some r
sg := input.resource_changes[r]
sg.type == "aws_security_group"
}
# deny open SSH/RDP anywhere
deny contains msg if {
sg := sgs[_]
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 22; ing.to_port == 22)
msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}
deny contains msg if {
sg := sgs[_]
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 3389; ing.to_port == 3389)
msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}
3) Python orchestrator — deploy.py
Place this at project root. It runs terraform plan → writes plan.json → runs opa eval (using the single policy folder) → aborts or applies based on violations.
#!/usr/bin/env python3
# deploy.py
import subprocess, json, os, sys
PLAN_FILE = "plan.tfplan"
PLAN_JSON = "plan.json"
POLICY_DIR = "policy"
def run_cmd(cmd, check=True):
print("👉", " ".join(cmd))
proc = subprocess.run(cmd, capture_output=True, text=True)
if check and proc.returncode != 0:
print("❌ Command failed:", " ".join(cmd))
print(proc.stderr or proc.stdout)
sys.exit(proc.returncode)
return proc.stdout
def terraform_plan():
run_cmd(["terraform", "init", "-input=false"])
run_cmd(["terraform", "plan", "-out", PLAN_FILE, "-input=false"])
plan_json = run_cmd(["terraform", "show", "-json", PLAN_FILE])
with open(PLAN_JSON, "w") as f:
f.write(plan_json)
def opa_eval():
# Single entrypoint: data.terraform.deny (from policy/main.rego aggregator)
cmd = ["opa", "eval", "-i", PLAN_JSON, "-d", POLICY_DIR, "--format", "json", "data.terraform.deny"]
out = run_cmd(cmd)
return json.loads(out)
def extract_violations(opa_json):
violations = []
for item in opa_json.get("result", []):
for expr in item.get("expressions", []):
val = expr.get("value")
if isinstance(val, list):
violations.extend(val)
return violations
def terraform_apply():
run_cmd(["terraform", "apply", "-auto-approve"])
if __name__ == "__main__":
terraform_plan()
opa_json = opa_eval()
violations = extract_violations(opa_json)
if violations:
print("\n\033[91m❌ Policy violations detected:\033[0m")
for v in violations:
print(" -", v)
print("\n🚫 Aborting. Fix policy violations and run again.")
sys.exit(1)
else:
print("\n\033[92m✅ All policies passed. Applying infra...\033[0m")
terraform_apply()
4) PowerShell runner (optional) — opa_check.ps1
# opa_check.ps1 (optional)
$Plan = "C:\path\to\aws-opa-tf\plan.json"
$Policy = "C:\path\to\aws-opa-tf\policy"
$cmd = @("opa","eval","-i",$Plan,"-d",$Policy,"--format","json","data.terraform.deny")
try {
$raw = & $cmd
} catch {
Write-Error "OPA eval failed. Ensure opa.exe in PATH."
exit 2
}
$out = $raw | ConvertFrom-Json
$violations = @()
foreach ($r in $out.result) {
foreach ($e in $r.expressions) {
if ($e.value) { $violations += $e.value }
}
}
if ($violations.Count -gt 0) {
Write-Host "❌ Policy violations:" -ForegroundColor Red
$violations | ForEach-Object { Write-Host " - $_" -ForegroundColor Yellow }
exit 1
} else {
Write-Host "✅ All policies passed." -ForegroundColor Green
exit 0
}
5) Quick help / run checklist
- Install prerequisites: Terraform, OPA (v1.0+), Python 3.x, AWS CLI (configured).
- Create folder
aws-opa-tf
and paste files: main.tf
, deploy.py
, policy/*.rego
.
- Replace placeholder VPC/subnet/AMI values in
main.tf
with real ones for apply (or leave them if you only plan to run plan+OPA check).
- Run the pipeline:
python deploy.py
. This will:
- run
terraform plan
and produce plan.json
- run
opa eval -d policy -i plan.json data.terraform.deny
- if violations → shows them and exits; if none → runs
terraform apply -auto-approve
- To test a passing run: change TF values to the compliant ones (see comments in
main.tf
), then rerun python deploy.py
.
6) Expected violations (with the provided non-compliant TF values)
- S3: ACL is
public-read
, versioning is false, SSE missing — fail
- IAM Role: assume_role_policy is empty/invalid — fail
- IAM Policy: allows
Action: *
and Resource: *
— fail
- Security Group: SSH/RDP open to
0.0.0.0/0
— fail
- EC2: disallowed instance type, not EBS optimized, unencrypted EBS, Prod uses non-gp3/io1 — fail
Notes & small gotchas
- OPA 1.0+ requires the
deny contains msg if { ... }
pattern (no mixing of default
assignments with contains
). The policies above follow that format.
- When using real Terraform plans, resource addresses and IDs can be different (e.g. computed IDs). The Rego helpers above assume a simplified mock plan structure where the security group reference is the resource address (e.g.
aws_security_group.bad_sg
). For real plans, you may need to match by address
fields or resource instance keys.
- If some values are computed at apply time, check for
after_unknown
in the JSON and handle accordingly (you may treat unknowns as violations or warnings depending on policy).
If you want, I can:
- Provide a clean/compliant version of
main.tf
so you can quickly toggle between fail/pass.
- Add unit tests (small Python tests) that run OPA against multiple plan JSON variants.
- Help you adapt the Rego rules to the exact real Terraform plan JSON output your environment produces (I can analyze a sample plan.json you produce and tune the rules).
Terraform + OPA Validation with Python
This Python script validates a Terraform plan using multiple OPA policies (S3, EC2, IAM, Security Group) and applies the infrastructure only if there are no policy violations.
Python Script (opa_tf_apply.py
)
import subprocess
import json
import os
import shutil
import sys
# ----------------------------
# Configuration
# ----------------------------
plan_file = "terraform-plan.json"
policy_dir = "policy"
# Change working directory if needed
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# ----------------------------
# Check if OPA exists
# ----------------------------
opa_path = shutil.which("opa")
if not opa_path:
print("❌ OPA executable not found in PATH. Please install OPA and add it to PATH.")
sys.exit(1)
# ----------------------------
# Collect all Rego policy files
# ----------------------------
rego_files = [os.path.join(policy_dir, f) for f in os.listdir(policy_dir) if f.endswith(".rego")]
if not rego_files:
print(f"❌ No Rego files found in '{policy_dir}'. Please add policy files.")
sys.exit(1)
# ----------------------------
# Build OPA eval command
# ----------------------------
cmd = [opa_path, "eval", "-i", plan_file, "--format", "json"]
for rego in rego_files:
cmd.extend(["-d", rego])
# ----------------------------
# Run OPA eval
# ----------------------------
print("🔎 OPA validation start...")
try:
result = subprocess.run(cmd, capture_output=True, text=True)
except Exception as e:
print("❌ Error running OPA:", str(e))
sys.exit(1)
# ----------------------------
# Check return code
# ----------------------------
if result.returncode != 0:
print("❌ OPA command failed.")
print("STDOUT:", result.stdout)
print("STDERR:", result.stderr)
sys.exit(1)
# ----------------------------
# Parse OPA output
# ----------------------------
try:
opa_output = json.loads(result.stdout)
except json.JSONDecodeError:
print("❌ Failed to parse OPA JSON output:")
print(result.stdout)
sys.exit(1)
violations = []
for res in opa_output.get("result", []):
for expr in res.get("expressions", []):
violations.extend(expr.get("value", []))
# ----------------------------
# Display violations
# ----------------------------
if violations:
print("❌ Policy violations found:")
for v in violations:
print("-", v)
sys.exit(1)
else:
print("✅ All policies passed. Proceeding with Terraform apply...")
# ----------------------------
# Run Terraform apply
# ----------------------------
try:
tf_apply = subprocess.run(["terraform", "apply", "-auto-approve"], capture_output=True, text=True)
print(tf_apply.stdout)
if tf_apply.returncode != 0:
print("❌ Terraform apply failed:")
print(tf_apply.stderr)
sys.exit(1)
else:
print("✅ Terraform infrastructure created successfully!")
except Exception as e:
print("❌ Error running Terraform apply:", str(e))
sys.exit(1)