🧩 utils/event_generator.py
This Python utility automates creation and cleanup of Google Cloud Pub/Sub, BigQuery, and Cloud Storage resources. It also allows you to publish sample messages for testing event-driven architectures.
📘 Full Python Code
import logging
import os
from google.cloud import pubsub_v1, bigquery, storage
from google.api_core.exceptions import Conflict, NotFound
class EventGenerator:
def __init__(self, project_id, topic_id, subscription_id, region="US"):
self.project_id = project_id
self.topic_id = topic_id
self.subscription_id = subscription_id
self.region = region
self.topic_path = f"projects/{project_id}/topics/{topic_id}"
self.subscription_path = f"projects/{project_id}/subscriptions/{subscription_id}"
self.publisher = pubsub_v1.PublisherClient()
self.subscriber = pubsub_v1.SubscriberClient()
self.bq_client = bigquery.Client(project=project_id)
self.storage_client = storage.Client(project=project_id)
self.service_account_email = os.getenv(
"PUBSUB_SERVICE_ACCOUNT_EMAIL",
f"citi-pubsub-writer@{project_id}.iam.gserviceaccount.com",
)
self.created_dataset = None
self.created_table = None
self.created_bucket = None
def create_bigquery_resources(self, dataset_id="pubsub_audit", table_id="subscription_events"):
dataset_ref = self.bq_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
try:
dataset = bigquery.Dataset(dataset_ref)
dataset.location = self.region
dataset = self.bq_client.create_dataset(dataset)
logging.info(f"✅ Created BigQuery dataset: {dataset.dataset_id}")
except Conflict:
logging.info(f"BigQuery dataset {dataset_id} already exists.")
schema = [
bigquery.SchemaField("message_id", "STRING"),
bigquery.SchemaField("data", "STRING"),
bigquery.SchemaField("attributes", "STRING"),
bigquery.SchemaField("publish_time", "TIMESTAMP"),
]
try:
table = bigquery.Table(table_ref, schema=schema)
self.bq_client.create_table(table)
logging.info(f"✅ Created BigQuery table: {table_id}")
except Conflict:
logging.info(f"BigQuery table {table_id} already exists.")
self.created_dataset = dataset_id
self.created_table = table_id
return f"projects/{self.project_id}/datasets/{dataset_id}/tables/{table_id}"
def create_storage_bucket(self, bucket_name="pubsub-events-bucket"):
bucket = self.storage_client.bucket(bucket_name)
try:
bucket.location = self.region
self.storage_client.create_bucket(bucket)
logging.info(f"✅ Created Cloud Storage bucket: {bucket_name}")
except Conflict:
logging.info(f"Cloud Storage bucket {bucket_name} already exists.")
self.created_bucket = bucket_name
return f"projects/_/buckets/{bucket_name}"
def create_topic(self):
try:
self.publisher.create_topic(request={"name": self.topic_path})
logging.info(f"✅ Created Pub/Sub topic: {self.topic_id}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Pub/Sub topic {self.topic_id} already exists.")
else:
raise e
def create_subscription(self):
bq_table = self.create_bigquery_resources()
gcs_bucket = self.create_storage_bucket()
bigquery_config = {
"table": bq_table,
"use_topic_schema": True,
"write_metadata": True,
"drop_unknown_fields": True,
"service_account_email": self.service_account_email,
}
cloud_storage_config = {
"bucket": gcs_bucket,
"filename_prefix": "events/",
"filename_suffix": ".json",
"max_bytes": 5000000,
"max_duration": "300s",
"service_account_email": self.service_account_email,
}
request_data = {
"name": self.subscription_path,
"topic": self.topic_path,
"enable_message_ordering": True,
"bigquery_config": bigquery_config,
"cloud_storage_config": cloud_storage_config,
}
try:
self.subscriber.create_subscription(request=request_data)
logging.info(f"✅ Created Pub/Sub subscription: {self.subscription_id}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Subscription {self.subscription_id} already exists.")
else:
raise e
def publish_event(self, message: str, attributes=None):
attributes = attributes or {}
data = message.encode("utf-8")
future = self.publisher.publish(self.topic_path, data, **attributes)
msg_id = future.result()
logging.info(f"📤 Published message ID: {msg_id}")
return msg_id
def delete_resources(self):
try:
self.subscriber.delete_subscription(request={"subscription": self.subscription_path})
logging.info(f"🗑️ Deleted Pub/Sub subscription: {self.subscription_id}")
except NotFound:
logging.warning(f"Subscription {self.subscription_id} not found.")
except Exception as e:
logging.warning(f"Failed to delete subscription: {e}")
try:
self.publisher.delete_topic(request={"topic": self.topic_path})
logging.info(f"🗑️ Deleted Pub/Sub topic: {self.topic_id}")
except NotFound:
logging.warning(f"Topic {self.topic_id} not found.")
except Exception as e:
logging.warning(f"Failed to delete topic: {e}")
if self.created_dataset:
try:
self.bq_client.delete_dataset(
dataset=self.created_dataset,
delete_contents=True,
not_found_ok=True,
)
logging.info(f"🗑️ Deleted BigQuery dataset: {self.created_dataset}")
except Exception as e:
logging.warning(f"Failed to delete BigQuery dataset {self.created_dataset}: {e}")
if self.created_bucket:
try:
bucket = self.storage_client.bucket(self.created_bucket)
bucket.delete(force=True)
logging.info(f"🗑️ Deleted Cloud Storage bucket: {self.created_bucket}")
except Exception as e:
logging.warning(f"Failed to delete Cloud Storage bucket {self.created_bucket}: {e}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
project_id = os.getenv("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
topic_id = "demo-topic"
subscription_id = "demo-subscription"
generator = EventGenerator(project_id, topic_id, subscription_id)
generator.create_topic()
generator.create_subscription()
generator.publish_event("Sample event for audit", {"env": "dev"})
# Uncomment to clean up:
# generator.delete_resources()
⚙️ How It Works
Resource | Create Method | Delete Method |
---|---|---|
Pub/Sub Topic | create_topic() | delete_resources() |
Pub/Sub Subscription | create_subscription() | delete_resources() |
BigQuery Dataset/Table | create_bigquery_resources() | delete_resources() |
Cloud Storage Bucket | create_storage_bucket() | delete_resources() |
🧠 Run Instructions
export GOOGLE_CLOUD_PROJECT=my-gcp-project
export PUBSUB_SERVICE_ACCOUNT_EMAIL=citi-pubsub-writer@my-gcp-project.iam.gserviceaccount.com
python utils/event_generator.py
✅ Example Output
✅ Created Pub/Sub topic: demo-topic
✅ Created BigQuery dataset: pubsub_audit
✅ Created BigQuery table: subscription_events
✅ Created Cloud Storage bucket: pubsub-events-bucket
✅ Created Pub/Sub subscription: demo-subscription
📤 Published message ID: 481027638463
🗑️ Deleted Pub/Sub subscription: demo-subscription
🗑️ Deleted Pub/Sub topic: demo-topic
🗑️ Deleted BigQuery dataset: pubsub_audit
🗑️ Deleted Cloud Storage bucket: pubsub-events-bucket
Note: The script uses official Google Cloud client libraries which internally call
real
googleapis.com/v1
endpoints. Ensure you have gcloud auth application-default login
completed before execution.
No comments:
Post a Comment