Complete Mock Terraform Plan JSON (s3-plan.json)
{ "format_version": "0.1", "terraform_version": "1.13.1", "resource_changes": { "aws_s3_bucket.example": { "type": "aws_s3_bucket", "name": "example", "change": { "actions": ["create"], "before": null, "after": { "bucket": "my-opa-test-bucket-12345", "acl": "public-read", "versioning": { "enabled": false }, "server_side_encryption_configuration": null, "force_destroy": false, "tags": { "Environment": "Dev", "Owner": "Alice" }, "lifecycle_rule": [ { "id": "log-expire", "enabled": true, "expiration": { "days": 365 } } ], "logging": { "target_bucket": "log-bucket", "target_prefix": "s3logs/" }, "website": null } } } } }
Explanation of Fields
Field | Purpose / Relevance |
---|---|
acl | Access control; triggers public-read policy |
versioning.enabled | Must be true for compliance |
server_side_encryption_configuration | Must exist for compliance |
force_destroy | Optional; determines bucket deletion behavior |
tags | Optional; useful for governance rules |
lifecycle_rule | Optional; could be used in advanced policies |
logging | Optional; compliance with logging requirements |
website | Optional; could block public website hosting |
Complete OPA Policy (s3.rego)
package terraform.s3 # Default: no deny messages default deny = [] # 1. Disallow public S3 buckets deny[msg] { some resource input.resource_changes[resource].type == "aws_s3_bucket" acl := input.resource_changes[resource].change.after.acl acl == "public-read" msg := sprintf("Bucket %v has public-read ACL", [input.resource_changes[resource].name]) } # 2. Require versioning deny[msg] { some resource input.resource_changes[resource].type == "aws_s3_bucket" not input.resource_changes[resource].change.after.versioning.enabled msg := sprintf("Bucket %v does not have versioning enabled", [input.resource_changes[resource].name]) } # 3. Require server-side encryption deny[msg] { some resource input.resource_changes[resource].type == "aws_s3_bucket" not input.resource_changes[resource].change.after.server_side_encryption_configuration msg := sprintf("Bucket %v does not have server-side encryption enabled", [input.resource_changes[resource].name]) } # 4. Optional: Require specific tags deny[msg] { some resource input.resource_changes[resource].type == "aws_s3_bucket" tags := input.resource_changes[resource].change.after.tags not ("Owner" in tags) msg := sprintf("Bucket %v does not have an 'Owner' tag", [input.resource_changes[resource].name]) } # 5. Optional: Require lifecycle rule deny[msg] { some resource input.resource_changes[resource].type == "aws_s3_bucket" rules := input.resource_changes[resource].change.after.lifecycle_rule count(rules) == 0 msg := sprintf("Bucket %v does not have a lifecycle rule defined", [input.resource_changes[resource].name]) } # 6. Optional: Require logging deny[msg] { some resource input.resource_changes[resource].type == "aws_s3_bucket" logging := input.resource_changes[resource].change.after.logging logging == null msg := sprintf("Bucket %v does not have logging enabled", [input.resource_changes[resource].name]) }
How to Test
- Save
s3-plan.json
in your project directory. - Save
s3.rego
inpolicy\
folder. - Run:
opa eval -i s3-plan.json -d policy\s3.rego "data.terraform.s3.deny"
Expected output:
{ "result": [ { "expressions": [ { "value": [ "Bucket aws_s3_bucket.example has public-read ACL", "Bucket aws_s3_bucket.example does not have versioning enabled", "Bucket aws_s3_bucket.example does not have server-side encryption enabled" ] } ] } ] }
Optional rules (tags, lifecycle, logging) will also trigger if null/missing.
Using Python subprocess to call OPA CLI
import subprocess import json # Path to Terraform plan JSON plan_file = "s3-plan.json" # Path to OPA policy policy_file = "policy/s3.rego" # OPA eval command cmd = [ "opa", "eval", "-i", plan_file, "-d", policy_file, "data.terraform.s3.deny", "--format", "json" ] # Run the command result = subprocess.run(cmd, capture_output=True, text=True) # Parse output opa_output = json.loads(result.stdout) # Print results violations = opa_output["result"][0]["expressions"][0]["value"] if violations: print("❌ Policy violations found:") for v in violations: print("-", v) else: print("✅ All policies passed.")
PowerShell OPA Test Script
# ----------------------------- # PowerShell OPA Test Script # ----------------------------- # Paths (update if needed) $PlanFile = "C:\OPA_S3\s3-plan.json" $PolicyFile = "C:\OPA_S3\policy\s3.rego" # OPA command $OpaCommand = @( "opa", "eval", "-i", $PlanFile, "-d", $PolicyFile, "data.terraform.s3.deny", "--format", "json" ) # Run OPA eval try { $OpaOutputRaw = & $OpaCommand } catch { Write-Error "Failed to run OPA. Make sure opa.exe is in your PATH." exit 1 } # Parse JSON output $OpaOutput = $OpaOutputRaw | ConvertFrom-Json # Extract violations $Violations = $OpaOutput.result[0].expressions[0].value # Display results if ($Violations.Count -gt 0) { Write-Host "❌ Policy violations found:" -ForegroundColor Red foreach ($v in $Violations) { Write-Host "- $v" -ForegroundColor Yellow } # Optional: Stop execution if violations found Write-Host "Aborting Terraform apply due to policy violations..." -ForegroundColor Red exit 1 } else { Write-Host "✅ All policies passed." -ForegroundColor Green
No comments:
Post a Comment