Unauthorized Pub/Sub Publish Detection
Python: Unauthorized Pub/Sub Publish Detection
import json
def get_principal(proto: dict) -> str:
auth = proto.get("authenticationInfo", {})
return (
auth.get("principalEmail") or
auth.get("principalSubject") or
"UNKNOWN"
).lower()
def is_unauthorized_publish(event: dict) -> bool:
try:
proto = event.get("protoPayload", {})
method = proto.get("methodName", "")
resource = proto.get("resourceName", "")
principal = get_principal(proto)
status_code = proto.get("status", {}).get("code")
if method != "google.pubsub.v1.Publisher.Publish":
return False
if not resource.endswith("siem-logs-security-logs-dlt"):
return False
if not resource.split("/")[-1].startswith("pst-"):
return False
if status_code != 0:
return False
if "sq-dataflow" in principal or "sa-dataflow" in principal or "dataflow" in principal:
return False
return True
except Exception:
return False
test_payload_1 = {
"protoPayload": {
"methodName": "google.pubsub.v1.Publisher.Publish",
"resourceName": "projects/my-project/topics/pst-prod-siem-logs-security-logs-dlt",
"authenticationInfo": {
"principalEmail": "user@example.com"
},
"status": {
"code": 0
}
}
}
test_payload_2 = {
"protoPayload": {
"methodName": "google.pubsub.v1.Publisher.Publish",
"resourceName": "projects/my-project/topics/pst-prod-siem-logs-security-logs-dlt",
"authenticationInfo": {
"principalEmail": "service-1234567890@dataflow-service-producer-prod.iam.gserviceaccount.com"
},
"status": {
"code": 0
}
}
}
test_payload_3 = {
"protoPayload": {
"methodName": "google.pubsub.v1.Publisher.CreateTopic",
"resourceName": "projects/my-project/topics/pst-prod-siem-logs-security-logs-dlt",
"authenticationInfo": {
"principalEmail": "user@example.com"
},
"status": {
"code": 0
}
}
}
for i, payload in enumerate([test_payload_1, test_payload_2, test_payload_3], 1):
result = is_unauthorized_publish(payload)
print(f"Test Payload {i}: {result}")
No comments:
Post a Comment