Wednesday, 7 January 2026

Unauthorized Pub/Sub Publish Detection

Unauthorized Pub/Sub Publish Detection

Python: Unauthorized Pub/Sub Publish Detection


import json

def get_principal(proto: dict) -> str:
    """
    Extracts the principal from protoPayload safely.
    Checks for principalEmail first, then principalSubject.
    Returns 'UNKNOWN' if not found.
    """
    auth = proto.get("authenticationInfo", {})
    return (
        auth.get("principalEmail") or
        auth.get("principalSubject") or
        "UNKNOWN"
    ).lower()


def is_unauthorized_publish(event: dict) -> bool:
    """
    Returns True if the event matches:
    - pubsub publish to DLT topic
    - principal NOT Dataflow service accounts
    - request successful
    """
    try:
        proto = event.get("protoPayload", {})

        method = proto.get("methodName", "")
        resource = proto.get("resourceName", "")
        principal = get_principal(proto)
        status_code = proto.get("status", {}).get("code")

        # 1️⃣ Check method
        if method != "google.pubsub.v1.Publisher.Publish":
            return False

        # 2️⃣ Check topic pattern: pst-<env>-siem-logs-security-logs-dlt
        if not resource.endswith("siem-logs-security-logs-dlt"):
            return False

        if not resource.split("/")[-1].startswith("pst-"):
            return False

        # 3️⃣ Check request was successful
        if status_code != 0:
            return False

        # 4️⃣ Exclude Dataflow SAs
        if "sq-dataflow" in principal or "sa-dataflow" in principal or "dataflow" in principal:
            return False

        return True

    except Exception:
        return False


# ------------------ TEST PAYLOADS ------------------

# ✅ Should return True
test_payload_1 = {
    "protoPayload": {
        "methodName": "google.pubsub.v1.Publisher.Publish",
        "resourceName": "projects/my-project/topics/pst-prod-siem-logs-security-logs-dlt",
        "authenticationInfo": {
            "principalEmail": "user@example.com"
        },
        "status": {
            "code": 0
        }
    }
}

# ❌ Should return False (Dataflow SA)
test_payload_2 = {
    "protoPayload": {
        "methodName": "google.pubsub.v1.Publisher.Publish",
        "resourceName": "projects/my-project/topics/pst-prod-siem-logs-security-logs-dlt",
        "authenticationInfo": {
            "principalEmail": "service-1234567890@dataflow-service-producer-prod.iam.gserviceaccount.com"
        },
        "status": {
            "code": 0
        }
    }
}

# ❌ Should return False (Wrong method)
test_payload_3 = {
    "protoPayload": {
        "methodName": "google.pubsub.v1.Publisher.CreateTopic",
        "resourceName": "projects/my-project/topics/pst-prod-siem-logs-security-logs-dlt",
        "authenticationInfo": {
            "principalEmail": "user@example.com"
        },
        "status": {
            "code": 0
        }
    }
}

# ------------------ TEST EXECUTION ------------------
for i, payload in enumerate([test_payload_1, test_payload_2, test_payload_3], 1):
    result = is_unauthorized_publish(payload)
    print(f"Test Payload {i}: {result}")

No comments:

Post a Comment