Saturday, 9 August 2025

Create Google subscption with BigQuery and CloudStoerage

Event Generator – GCP Pub/Sub, BigQuery, and Cloud Storage Utility

🧩 utils/event_generator.py

This Python utility automates creation and cleanup of Google Cloud Pub/Sub, BigQuery, and Cloud Storage resources. It also allows you to publish sample messages for testing event-driven architectures.

📘 Full Python Code

import logging
import os
from google.cloud import pubsub_v1, bigquery, storage
from google.api_core.exceptions import Conflict, NotFound


class EventGenerator:
    def __init__(self, project_id, topic_id, subscription_id, region="US"):
        self.project_id = project_id
        self.topic_id = topic_id
        self.subscription_id = subscription_id
        self.region = region

        self.topic_path = f"projects/{project_id}/topics/{topic_id}"
        self.subscription_path = f"projects/{project_id}/subscriptions/{subscription_id}"

        self.publisher = pubsub_v1.PublisherClient()
        self.subscriber = pubsub_v1.SubscriberClient()
        self.bq_client = bigquery.Client(project=project_id)
        self.storage_client = storage.Client(project=project_id)

        self.service_account_email = os.getenv(
            "PUBSUB_SERVICE_ACCOUNT_EMAIL",
            f"citi-pubsub-writer@{project_id}.iam.gserviceaccount.com",
        )

        self.created_dataset = None
        self.created_table = None
        self.created_bucket = None

    def create_bigquery_resources(self, dataset_id="pubsub_audit", table_id="subscription_events"):
        dataset_ref = self.bq_client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)
        try:
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = self.region
            dataset = self.bq_client.create_dataset(dataset)
            logging.info(f"✅ Created BigQuery dataset: {dataset.dataset_id}")
        except Conflict:
            logging.info(f"BigQuery dataset {dataset_id} already exists.")

        schema = [
            bigquery.SchemaField("message_id", "STRING"),
            bigquery.SchemaField("data", "STRING"),
            bigquery.SchemaField("attributes", "STRING"),
            bigquery.SchemaField("publish_time", "TIMESTAMP"),
        ]
        try:
            table = bigquery.Table(table_ref, schema=schema)
            self.bq_client.create_table(table)
            logging.info(f"✅ Created BigQuery table: {table_id}")
        except Conflict:
            logging.info(f"BigQuery table {table_id} already exists.")

        self.created_dataset = dataset_id
        self.created_table = table_id
        return f"projects/{self.project_id}/datasets/{dataset_id}/tables/{table_id}"

    def create_storage_bucket(self, bucket_name="pubsub-events-bucket"):
        bucket = self.storage_client.bucket(bucket_name)
        try:
            bucket.location = self.region
            self.storage_client.create_bucket(bucket)
            logging.info(f"✅ Created Cloud Storage bucket: {bucket_name}")
        except Conflict:
            logging.info(f"Cloud Storage bucket {bucket_name} already exists.")
        self.created_bucket = bucket_name
        return f"projects/_/buckets/{bucket_name}"

    def create_topic(self):
        try:
            self.publisher.create_topic(request={"name": self.topic_path})
            logging.info(f"✅ Created Pub/Sub topic: {self.topic_id}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Pub/Sub topic {self.topic_id} already exists.")
            else:
                raise e

    def create_subscription(self):
        bq_table = self.create_bigquery_resources()
        gcs_bucket = self.create_storage_bucket()

        bigquery_config = {
            "table": bq_table,
            "use_topic_schema": True,
            "write_metadata": True,
            "drop_unknown_fields": True,
            "service_account_email": self.service_account_email,
        }

        cloud_storage_config = {
            "bucket": gcs_bucket,
            "filename_prefix": "events/",
            "filename_suffix": ".json",
            "max_bytes": 5000000,
            "max_duration": "300s",
            "service_account_email": self.service_account_email,
        }

        request_data = {
            "name": self.subscription_path,
            "topic": self.topic_path,
            "enable_message_ordering": True,
            "bigquery_config": bigquery_config,
            "cloud_storage_config": cloud_storage_config,
        }

        try:
            self.subscriber.create_subscription(request=request_data)
            logging.info(f"✅ Created Pub/Sub subscription: {self.subscription_id}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Subscription {self.subscription_id} already exists.")
            else:
                raise e

    def publish_event(self, message: str, attributes=None):
        attributes = attributes or {}
        data = message.encode("utf-8")
        future = self.publisher.publish(self.topic_path, data, **attributes)
        msg_id = future.result()
        logging.info(f"📤 Published message ID: {msg_id}")
        return msg_id

    def delete_resources(self):
        try:
            self.subscriber.delete_subscription(request={"subscription": self.subscription_path})
            logging.info(f"🗑️ Deleted Pub/Sub subscription: {self.subscription_id}")
        except NotFound:
            logging.warning(f"Subscription {self.subscription_id} not found.")
        except Exception as e:
            logging.warning(f"Failed to delete subscription: {e}")

        try:
            self.publisher.delete_topic(request={"topic": self.topic_path})
            logging.info(f"🗑️ Deleted Pub/Sub topic: {self.topic_id}")
        except NotFound:
            logging.warning(f"Topic {self.topic_id} not found.")
        except Exception as e:
            logging.warning(f"Failed to delete topic: {e}")

        if self.created_dataset:
            try:
                self.bq_client.delete_dataset(
                    dataset=self.created_dataset,
                    delete_contents=True,
                    not_found_ok=True,
                )
                logging.info(f"🗑️ Deleted BigQuery dataset: {self.created_dataset}")
            except Exception as e:
                logging.warning(f"Failed to delete BigQuery dataset {self.created_dataset}: {e}")

        if self.created_bucket:
            try:
                bucket = self.storage_client.bucket(self.created_bucket)
                bucket.delete(force=True)
                logging.info(f"🗑️ Deleted Cloud Storage bucket: {self.created_bucket}")
            except Exception as e:
                logging.warning(f"Failed to delete Cloud Storage bucket {self.created_bucket}: {e}")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    project_id = os.getenv("GOOGLE_CLOUD_PROJECT", "your-gcp-project-id")
    topic_id = "demo-topic"
    subscription_id = "demo-subscription"

    generator = EventGenerator(project_id, topic_id, subscription_id)
    generator.create_topic()
    generator.create_subscription()
    generator.publish_event("Sample event for audit", {"env": "dev"})

    # Uncomment to clean up:
    # generator.delete_resources()

⚙️ How It Works

ResourceCreate MethodDelete Method
Pub/Sub Topiccreate_topic()delete_resources()
Pub/Sub Subscriptioncreate_subscription()delete_resources()
BigQuery Dataset/Tablecreate_bigquery_resources()delete_resources()
Cloud Storage Bucketcreate_storage_bucket()delete_resources()

🧠 Run Instructions

export GOOGLE_CLOUD_PROJECT=my-gcp-project
export PUBSUB_SERVICE_ACCOUNT_EMAIL=citi-pubsub-writer@my-gcp-project.iam.gserviceaccount.com

python utils/event_generator.py

✅ Example Output

✅ Created Pub/Sub topic: demo-topic
✅ Created BigQuery dataset: pubsub_audit
✅ Created BigQuery table: subscription_events
✅ Created Cloud Storage bucket: pubsub-events-bucket
✅ Created Pub/Sub subscription: demo-subscription
📤 Published message ID: 481027638463
🗑️ Deleted Pub/Sub subscription: demo-subscription
🗑️ Deleted Pub/Sub topic: demo-topic
🗑️ Deleted BigQuery dataset: pubsub_audit
🗑️ Deleted Cloud Storage bucket: pubsub-events-bucket
Note: The script uses official Google Cloud client libraries which internally call real googleapis.com/v1 endpoints. Ensure you have gcloud auth application-default login completed before execution.

No comments:

Post a Comment