Advanced OPA Policies for AWS Terraform (Mock Plan)
This guide demonstrates complex OPA (Open Policy Agent) policies for AWS Terraform plans using a mock plan JSON. It includes cross-resource checks (EC2 ↔ Security Groups, EC2 ↔ EBS), S3 best practices, IAM least privilege, a single policy entrypoint to evaluate everything at once, and runner scripts (Python & PowerShell).
1) Directory Layout
C:\OPA_Advanced\
│
├── terraform-plan.json # Mock Terraform plan JSON (intentionally violating several policies)
└── policy\
├── main.rego # Single entrypoint aggregating all denials
├── ec2_complex.rego # EC2 + SG + EBS cross checks
├── s3_complex.rego # S3 best-practice checks
├── iam_complex.rego # IAM least privilege checks
└── sg_simple.rego # SG hygiene (used by EC2 cross-checks)
Note: This is a mock plan; keys/IDs are simplified so cross-references are easy. In real plans,
resource IDs are computed and you’ll often join using
address
, type
, and name
or inspect after_unknown
and data sources.
2) Mock Terraform Plan JSON (terraform-plan.json
)
This plan intentionally violates multiple controls to showcase policy failures.
{
"format_version": "0.1",
"terraform_version": "1.13.1",
"resource_changes": {
"aws_s3_bucket.app_bucket": {
"type": "aws_s3_bucket",
"name": "app_bucket",
"change": {
"actions": ["create"],
"before": null,
"after": {
"bucket": "my-app-public-bucket",
"acl": "public-read",
"versioning": { "enabled": false },
"server_side_encryption_configuration": null,
"tags": { "Team": "Platform" }
}
}
},
"aws_security_group.web": {
"type": "aws_security_group",
"name": "web",
"change": {
"actions": ["create"],
"before": null,
"after": {
"name": "web-sg",
"description": "Web security group",
"ingress": [
{ "from_port": 22, "to_port": 22, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
{ "from_port": 3389, "to_port": 3389, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] },
{ "from_port": 80, "to_port": 80, "protocol": "tcp", "cidr_blocks": ["0.0.0.0/0"] }
],
"egress": [
{ "from_port": 0, "to_port": 0, "protocol": "-1", "cidr_blocks": ["0.0.0.0/0"] }
],
"tags": { "Environment": "Prod" }
}
}
},
"aws_iam_role.ec2_role": {
"type": "aws_iam_role",
"name": "ec2_role",
"change": {
"actions": ["create"],
"before": null,
"after": {
"name": "ec2-role",
"assume_role_policy": {
"Version": "2012-10-17",
"Statement": [
{ "Effect": "Allow", "Principal": { "Service": "*" }, "Action": "sts:AssumeRole" }
]
},
"tags": { "Owner": "Alice" }
}
}
},
"aws_iam_policy.too_broad": {
"type": "aws_iam_policy",
"name": "too_broad",
"change": {
"actions": ["create"],
"before": null,
"after": {
"name": "AllowEverything",
"policy": {
"Version": "2012-10-17",
"Statement": [
{ "Effect": "Allow", "Action": "*", "Resource": "*" }
]
}
}
}
},
"aws_instance.web1": {
"type": "aws_instance",
"name": "web1",
"change": {
"actions": ["create"],
"before": null,
"after": {
"ami": "ami-12345678",
"instance_type": "t3.small",
"ebs_optimized": false,
"associate_public_ip_address": true,
"iam_instance_profile": "ec2-role",
"vpc_security_group_ids": ["aws_security_group.web"],
"ebs_block_device": [
{ "device_name": "/dev/sda1", "volume_size": 30, "volume_type": "standard", "encrypted": false }
],
"tags": { "Environment": "Prod", "Service": "frontend" }
}
}
}
}
}
3) OPA Policies (OPA v1.0+ syntax)
3.1 Main Aggregator (policy/main.rego
)
Single entrypoint so you can evaluate everything at once.
package terraform
import data.terraform.ec2_complex
import data.terraform.s3_complex
import data.terraform.iam_complex
import data.terraform.sg_simple
# Aggregate all denials into one set
deny contains msg if { msg := ec2_complex.deny[_] }
deny contains msg if { msg := s3_complex.deny[_] }
deny contains msg if { msg := iam_complex.deny[_] }
deny contains msg if { msg := sg_simple.deny[_] }
3.2 EC2 + SG + EBS Cross Checks (policy/ec2_complex.rego
)
package terraform.ec2_complex
# Helper: iterate all EC2 instances in the plan
ec2s[ec2] if {
some r
ec2 := input.resource_changes[r]
ec2.type == "aws_instance"
}
# Helper: look up SG resource by "id" (mock uses address-style id)
sg_by_id(id) := sg if {
sg := input.resource_changes[id]
sg.type == "aws_security_group"
}
# --- Deny rules ---
# 1) EC2 must be EBS-optimized
deny contains msg if {
ec2 := ec2s[_]
not ec2.change.after.ebs_optimized
msg := sprintf("EC2 %v is not EBS optimized", [ec2.name])
}
# 2) EC2 must not have public IP
deny contains msg if {
ec2 := ec2s[_]
ec2.change.after.associate_public_ip_address
msg := sprintf("EC2 %v has a public IP assigned", [ec2.name])
}
# 3) EC2 must have IAM instance profile (role)
deny contains msg if {
ec2 := ec2s[_]
not ec2.change.after.iam_instance_profile
msg := sprintf("EC2 %v does not have an IAM instance profile attached", [ec2.name])
}
# 4) All attached EBS volumes must be encrypted
deny contains msg if {
ec2 := ec2s[_]
vol := ec2.change.after.ebs_block_device[_]
not vol.encrypted
msg := sprintf("EC2 %v has unencrypted EBS volume %v", [ec2.name, vol.device_name])
}
# 5) If Environment=Prod, EBS volume types must be gp3 or io1
deny contains msg if {
ec2 := ec2s[_]
ec2.change.after.tags.Environment == "Prod"
vol := ec2.change.after.ebs_block_device[_]
not (vol.volume_type == "gp3" or vol.volume_type == "io1")
msg := sprintf("EC2 %v in Prod has non-compliant EBS type %v on %v", [ec2.name, vol.volume_type, vol.device_name])
}
# 6) EC2's attached SGs must not allow SSH (22) or RDP (3389) from 0.0.0.0/0
deny contains msg if {
ec2 := ec2s[_]
sg_id := ec2.change.after.vpc_security_group_ids[_]
sg := sg_by_id(sg_id)
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 22; ing.to_port == 22)
msg := sprintf("EC2 %v allows SSH (22) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}
deny contains msg if {
ec2 := ec2s[_]
sg_id := ec2.change.after.vpc_security_group_ids[_]
sg := sg_by_id(sg_id)
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 3389; ing.to_port == 3389)
msg := sprintf("EC2 %v allows RDP (3389) from 0.0.0.0/0 via SG %v", [ec2.name, sg.name])
}
3.3 S3 Best Practices (policy/s3_complex.rego
)
package terraform.s3_complex
# Helper: all S3 buckets
buckets[b] if {
some r
b := input.resource_changes[r]
b.type == "aws_s3_bucket"
}
# Require versioning
deny contains msg if {
b := buckets[_]
not b.change.after.versioning.enabled
msg := sprintf("S3 bucket %v: versioning is not enabled", [b.name])
}
# Require server-side encryption
deny contains msg if {
b := buckets[_]
not b.change.after.server_side_encryption_configuration
msg := sprintf("S3 bucket %v: server-side encryption not configured", [b.name])
}
# Block public ACLs
deny contains msg if {
b := buckets[_]
b.change.after.acl == "public-read"
msg := sprintf("S3 bucket %v: ACL is public-read", [b.name])
}
# Require mandatory tags (Environment and Owner)
deny contains msg if {
b := buckets[_]
not b.change.after.tags.Environment
msg := sprintf("S3 bucket %v: missing required tag 'Environment'", [b.name])
}
deny contains msg if {
b := buckets[_]
not b.change.after.tags.Owner
msg := sprintf("S3 bucket %v: missing required tag 'Owner'", [b.name])
}
3.4 IAM Least Privilege (policy/iam_complex.rego
)
package terraform.iam_complex
# Helper: roles and policies
roles[r] if {
some k
r := input.resource_changes[k]
r.type == "aws_iam_role"
}
policies[p] if {
some k
p := input.resource_changes[k]
p.type == "aws_iam_policy"
}
# 1) AssumeRole principal must not be wildcard
deny contains msg if {
r := roles[_]
stmt := r.change.after.assume_role_policy.Statement[_]
stmt.Principal.Service == "*"
msg := sprintf("IAM Role %v: assume-role Principal.Service is wildcard '*'", [r.name])
}
# 2) Managed policy statements must not have Action '*'
deny contains msg if {
p := policies[_]
stmt := p.change.after.policy.Statement[_]
stmt.Action == "*"
msg := sprintf("IAM Policy %v: uses Action '*'", [p.name])
}
# 3) Managed policy statements must not have Resource '*'
deny contains msg if {
p := policies[_]
stmt := p.change.after.policy.Statement[_]
stmt.Resource == "*"
msg := sprintf("IAM Policy %v: uses Resource '*'", [p.name])
}
3.5 Security Group Hygiene (Standalone) (policy/sg_simple.rego
)
package terraform.sg_simple
# Helper: all SGs
sgs[sg] if {
some r
sg := input.resource_changes[r]
sg.type == "aws_security_group"
}
# Disallow 0.0.0.0/0 for SSH and RDP anywhere in the plan (defense in depth)
deny contains msg if {
sg := sgs[_]
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 22; ing.to_port == 22)
msg := sprintf("SG %v allows SSH (22) from 0.0.0.0/0", [sg.name])
}
deny contains msg if {
sg := sgs[_]
ing := sg.change.after.ingress[_]
ing.cidr_blocks[_] == "0.0.0.0/0"
(ing.from_port == 3389; ing.to_port == 3389)
msg := sprintf("SG %v allows RDP (3389) from 0.0.0.0/0", [sg.name])
}
4) Python Runner (opa_check.py
)
import subprocess, json, os, sys
plan = "terraform-plan.json"
policy_dir = "policy"
# Evaluate a single entrypoint: data.terraform.deny (from main.rego)
cmd = ["opa", "eval", "-i", plan, "-d", policy_dir, "--format", "json", "data.terraform.deny"]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
print("OPA evaluation failed:", e.stderr or e.stdout)
sys.exit(2)
data = json.loads(result.stdout)
violations = []
for res in data.get("result", []):
for expr in res.get("expressions", []):
val = expr.get("value")
if isinstance(val, list):
violations.extend(val)
if violations:
print("❌ Policy violations found:")
for v in violations:
print("-", v)
sys.exit(1)
else:
print("✅ All policies passed.")
sys.exit(0)
5) PowerShell Runner (opa_check.ps1
)
$Plan = "C:\OPA_Advanced\terraform-plan.json"
$PolicyDir = "C:\OPA_Advanced\policy"
$Cmd = @("opa", "eval", "-i", $Plan, "-d", $PolicyDir, "--format", "json", "data.terraform.deny")
try {
$OutRaw = & $Cmd
} catch {
Write-Error "OPA eval failed. Ensure opa.exe in PATH and inputs exist."
exit 2
}
$Out = $OutRaw | ConvertFrom-Json
$Violations = @()
foreach ($r in $Out.result) {
foreach ($e in $r.expressions) {
if ($e.value) { $Violations += $e.value }
}
}
if ($Violations.Count -gt 0) {
Write-Host "❌ Policy violations found:" -ForegroundColor Red
$Violations | ForEach-Object { Write-Host "- $_" -ForegroundColor Yellow }
exit 1
} else {
Write-Host "✅ All policies passed." -ForegroundColor Green
exit 0
}
6) How to Run
- Create
C:\OPA_Advanced
and thepolicy\
folder. - Save the JSON and the four
.rego
files (plusmain.rego
) into the paths above. - Run a quick syntax check (OPA 1.0+):
opa check policy\
- Evaluate:
opa eval -i terraform-plan.json -d policy "data.terraform.deny"
- Or use the provided Python/PowerShell scripts.
7) Expected Violations (from this Mock Plan)
- S3: versioning disabled
- S3: encryption missing
- S3: ACL public-read
- S3: missing tag
Owner
- IAM Role: assume-role principal uses wildcard
*
- IAM Policy:
Action
is*
- IAM Policy:
Resource
is*
- EC2: not EBS optimized
- EC2: public IP assigned
- EC2: unencrypted EBS volume
/dev/sda1
- EC2 (Prod): non-compliant EBS volume type
standard
- EC2/SG: SSH (22) from
0.0.0.0/0
- EC2/SG: RDP (3389) from
0.0.0.0/0
- SG global: (defense in depth) open SSH/RDP
8) Tips for Real Plans
- Real plan JSON often nests values and uses computed IDs; join resources via
type
/name
oraddress
. - Inspect
after_unknown
if values are computed and not known at plan time. - Consider separate
warn
vsdeny
sets for advisory controls. - Add
default allow := true
style patterns if using allow/deny models together.
No comments:
Post a Comment