Complete Mock Terraform Plan JSON (s3-plan.json)
{
"format_version": "0.1",
"terraform_version": "1.13.1",
"resource_changes": {
"aws_s3_bucket.example": {
"type": "aws_s3_bucket",
"name": "example",
"change": {
"actions": ["create"],
"before": null,
"after": {
"bucket": "my-opa-test-bucket-12345",
"acl": "public-read",
"versioning": { "enabled": false },
"server_side_encryption_configuration": null,
"force_destroy": false,
"tags": { "Environment": "Dev", "Owner": "Alice" },
"lifecycle_rule": [
{ "id": "log-expire", "enabled": true, "expiration": { "days": 365 } }
],
"logging": { "target_bucket": "log-bucket", "target_prefix": "s3logs/" },
"website": null
}
}
}
}
}
Explanation of Fields
| Field | Purpose / Relevance |
|---|---|
| acl | Access control; triggers public-read policy |
| versioning.enabled | Must be true for compliance |
| server_side_encryption_configuration | Must exist for compliance |
| force_destroy | Optional; determines bucket deletion behavior |
| tags | Optional; useful for governance rules |
| lifecycle_rule | Optional; could be used in advanced policies |
| logging | Optional; compliance with logging requirements |
| website | Optional; could block public website hosting |
Complete OPA Policy (s3.rego)
package terraform.s3
# Default: no deny messages
default deny = []
# 1. Disallow public S3 buckets
deny[msg] {
some resource
input.resource_changes[resource].type == "aws_s3_bucket"
acl := input.resource_changes[resource].change.after.acl
acl == "public-read"
msg := sprintf("Bucket %v has public-read ACL", [input.resource_changes[resource].name])
}
# 2. Require versioning
deny[msg] {
some resource
input.resource_changes[resource].type == "aws_s3_bucket"
not input.resource_changes[resource].change.after.versioning.enabled
msg := sprintf("Bucket %v does not have versioning enabled", [input.resource_changes[resource].name])
}
# 3. Require server-side encryption
deny[msg] {
some resource
input.resource_changes[resource].type == "aws_s3_bucket"
not input.resource_changes[resource].change.after.server_side_encryption_configuration
msg := sprintf("Bucket %v does not have server-side encryption enabled", [input.resource_changes[resource].name])
}
# 4. Optional: Require specific tags
deny[msg] {
some resource
input.resource_changes[resource].type == "aws_s3_bucket"
tags := input.resource_changes[resource].change.after.tags
not ("Owner" in tags)
msg := sprintf("Bucket %v does not have an 'Owner' tag", [input.resource_changes[resource].name])
}
# 5. Optional: Require lifecycle rule
deny[msg] {
some resource
input.resource_changes[resource].type == "aws_s3_bucket"
rules := input.resource_changes[resource].change.after.lifecycle_rule
count(rules) == 0
msg := sprintf("Bucket %v does not have a lifecycle rule defined", [input.resource_changes[resource].name])
}
# 6. Optional: Require logging
deny[msg] {
some resource
input.resource_changes[resource].type == "aws_s3_bucket"
logging := input.resource_changes[resource].change.after.logging
logging == null
msg := sprintf("Bucket %v does not have logging enabled", [input.resource_changes[resource].name])
}
How to Test
- Save
s3-plan.jsonin your project directory. - Save
s3.regoinpolicy\folder. - Run:
opa eval -i s3-plan.json -d policy\s3.rego "data.terraform.s3.deny"
Expected output:
{
"result": [
{
"expressions": [
{
"value": [
"Bucket aws_s3_bucket.example has public-read ACL",
"Bucket aws_s3_bucket.example does not have versioning enabled",
"Bucket aws_s3_bucket.example does not have server-side encryption enabled"
]
}
]
}
]
}
Optional rules (tags, lifecycle, logging) will also trigger if null/missing.
Using Python subprocess to call OPA CLI
import subprocess
import json
# Path to Terraform plan JSON
plan_file = "s3-plan.json"
# Path to OPA policy
policy_file = "policy/s3.rego"
# OPA eval command
cmd = [
"opa", "eval",
"-i", plan_file,
"-d", policy_file,
"data.terraform.s3.deny",
"--format", "json"
]
# Run the command
result = subprocess.run(cmd, capture_output=True, text=True)
# Parse output
opa_output = json.loads(result.stdout)
# Print results
violations = opa_output["result"][0]["expressions"][0]["value"]
if violations:
print("❌ Policy violations found:")
for v in violations:
print("-", v)
else:
print("✅ All policies passed.")
PowerShell OPA Test Script
# -----------------------------
# PowerShell OPA Test Script
# -----------------------------
# Paths (update if needed)
$PlanFile = "C:\OPA_S3\s3-plan.json"
$PolicyFile = "C:\OPA_S3\policy\s3.rego"
# OPA command
$OpaCommand = @(
"opa", "eval",
"-i", $PlanFile,
"-d", $PolicyFile,
"data.terraform.s3.deny",
"--format", "json"
)
# Run OPA eval
try {
$OpaOutputRaw = & $OpaCommand
} catch {
Write-Error "Failed to run OPA. Make sure opa.exe is in your PATH."
exit 1
}
# Parse JSON output
$OpaOutput = $OpaOutputRaw | ConvertFrom-Json
# Extract violations
$Violations = $OpaOutput.result[0].expressions[0].value
# Display results
if ($Violations.Count -gt 0) {
Write-Host "❌ Policy violations found:" -ForegroundColor Red
foreach ($v in $Violations) {
Write-Host "- $v" -ForegroundColor Yellow
}
# Optional: Stop execution if violations found
Write-Host "Aborting Terraform apply due to policy violations..." -ForegroundColor Red
exit 1
} else {
Write-Host "✅ All policies passed." -ForegroundColor Green
No comments:
Post a Comment