Sunday 3 November 2024

ClodFormation, terraform and CDK example

The complete CloudFormation Template (CFT) for creating VPC Interface Endpoints, Route 53 Private Hosted Zone, and CNAME records for Kafka brokers within an AWS environment. Each endpoint is given a friendly DNS name (like broker1.bgt.pulse.ABC) that points to its corresponding endpoint's DNS name within the VPC.

CloudFormation Template

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation template to create VPC endpoints and Route 53 resources for Kafka brokers in ABC environment.
 
Parameters:
  VpcId:
    Type: AWS::EC2::VPC::Id
    Default: "vpc-0example000000000"  # Replace with your default VPC ID
    Description: The ID of the VPC where the endpoints and Route 53 hosted zone will be created.
 
  SubnetIds:
    Type: List<AWS::EC2::Subnet::Id>
    Default:
      - "subnet-0example000000000"  # Replace with your default subnet IDs
      - "subnet-0example000000000"
      - "subnet-0example000000000"
    Description: List of subnet IDs where the endpoints should be created. Must be in 3 Availability Zones.
 
  Broker1ServiceName:
    Type: String
    Default: "com.amazonaws.vpc.us-east-1.vpce-svc-aaaaaaaaaaaa"
    Description: Service name for Broker1.
 
  Broker2ServiceName:
    Type: String
    Default: "com.amazonaws.vpc.us-east-1.vpce-svc-bbbbbb"
    Description: Service name for Broker2.
 
  Broker3ServiceName:
    Type: String
    Default: "com.amazonaws.vpc.us-east-1.vpce-svc-ccccccc"
    Description: Service name for Broker3.
 
  Dc1ServiceName:
    Type: String
    Default: "com.amazonaws.vpc.us-east-1.vpce-svc-ddddddd"
    Description: Service name for DC1.
 
Resources:
  # VPC Endpoints
  Broker1VPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcEndpointType: Interface
      VpcId: !Ref VpcId
      ServiceName: !Ref Broker1ServiceName
      SubnetIds: !Ref SubnetIds
      PrivateDnsEnabled: true
      TagSpecifications:
        - ResourceType: vpc-endpoint
          Tags:
            - Key: Name
              Value: broker1.bgt.pulse.ABC
 
  Broker2VPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcEndpointType: Interface
      VpcId: !Ref VpcId
      ServiceName: !Ref Broker2ServiceName
      SubnetIds: !Ref SubnetIds
      PrivateDnsEnabled: true
      TagSpecifications:
        - ResourceType: vpc-endpoint
          Tags:
            - Key: Name
              Value: broker2.bgt.pulse.ABC
 
  Broker3VPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcEndpointType: Interface
      VpcId: !Ref VpcId
      ServiceName: !Ref Broker3ServiceName
      SubnetIds: !Ref SubnetIds
      PrivateDnsEnabled: true
      TagSpecifications:
        - ResourceType: vpc-endpoint
          Tags:
            - Key: Name
              Value: broker3.bgt.pulse.ABC
 
  Dc1VPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcEndpointType: Interface
      VpcId: !Ref VpcId
      ServiceName: !Ref Dc1ServiceName
      SubnetIds: !Ref SubnetIds
      PrivateDnsEnabled: true
      TagSpecifications:
        - ResourceType: vpc-endpoint
          Tags:
            - Key: Name
              Value: dc1.bgt.pulse.ABC
 
  # Route 53 Private Hosted Zone
  PrivateHostedZone:
    Type: AWS::Route53::HostedZone
    Properties:
      Name: "bgt.pulse.ABC"  # Replace with your desired domain name
      VPCs:
        - VPCId: !Ref VpcId
          VPCRegion: "us-east-1"  # Change to your VPC region
      HostedZoneConfig:
        Comment: "Private hosted zone for Kafka brokers"
 
  # Route 53 DNS Records for VPC Endpoints
  Broker1DNSRecord:
    Type: AWS::Route53::RecordSet
    Properties:
      HostedZoneId: !Ref PrivateHostedZone
      Name: "broker1.bgt.pulse.ABC"
      Type: CNAME
      TTL: "300"
      ResourceRecords:
        - !GetAtt Broker1VPCEndpoint.DnsEntries[0].DnsName
 
  Broker2DNSRecord:
    Type: AWS::Route53::RecordSet
    Properties:
      HostedZoneId: !Ref PrivateHostedZone
      Name: "broker2.bgt.pulse.ABC"
      Type: CNAME
      TTL: "300"
      ResourceRecords:
        - !GetAtt Broker2VPCEndpoint.DnsEntries[0].DnsName
 
  Broker3DNSRecord:
    Type: AWS::Route53::RecordSet
    Properties:
      HostedZoneId: !Ref PrivateHostedZone
      Name: "broker3.bgt.pulse.ABC"
      Type: CNAME
      TTL: "300"
      ResourceRecords:
        - !GetAtt Broker3VPCEndpoint.DnsEntries[0].DnsName
 
  Dc1DNSRecord:
    Type: AWS::Route53::RecordSet
    Properties:
      HostedZoneId: !Ref PrivateHostedZone
      Name: "dc1.bgt.pulse.ABC"
      Type: CNAME
      TTL: "300"
      ResourceRecords:
        - !GetAtt Dc1VPCEndpoint.DnsEntries[0].DnsName
 
Outputs:
  Broker1EndpointDNS:
    Description: The DNS name for broker1 endpoint
    Value: !GetAtt Broker1VPCEndpoint.DnsEntries[0].DnsName
 
  Broker2EndpointDNS:
    Description: The DNS name for broker2 endpoint
    Value: !GetAtt Broker2VPCEndpoint.DnsEntries[0].DnsName
 
  Broker3EndpointDNS:
    Description: The DNS name for broker3 endpoint
    Value: !GetAtt Broker3VPCEndpoint.DnsEntries[0].DnsName
 
  Dc1EndpointDNS:
    Description: The DNS name for dc1 endpoint
    Value: !GetAtt Dc1VPCEndpoint.DnsEntries[0].DnsName

 


Explanation of Key Components:

  1. VPC Endpoints (Broker1VPCEndpoint, Broker2VPCEndpoint, etc.): These resources create the VPC Interface Endpoints for each Kafka broker and dc1 service.
  2. Route 53 Private Hosted Zone (PrivateHostedZone): This hosted zone allows you to manage the DNS within your VPC for the specified domain (e.g., bgt.pulse.ABC).
  3. CNAME Records (Broker1DNSRecord, Broker2DNSRecord, etc.): Each record maps a user-friendly name (e.g., broker1.bgt.pulse.ABC) to the actual DNS of the VPC endpoint.
  4. Outputs: Provides the DNS names of each endpoint as output values after stack deployment.

Usage Notes

  • Update the default values (like VpcId, SubnetIds, and Service Names) as per your environment.
  • This template assumes you are deploying it in the us-east-1 region. Adjust VPCRegion if needed.

Deploying this CloudFormation stack will create VPC Interface Endpoints, a Route 53 private hosted zone, and DNS records within your AWS environment, enabling you to use friendly DNS names for each broker in your private VPC network.

 

Terraform configuration for creating VPC Interface Endpoints, Route 53 Private Hosted Zone, and CNAME records for Kafka brokers within an AWS environment.

 

provider "aws" {

  region = "us-east-1"  # Adjust as needed

}

 

variable "vpc_id" {

  type    = string

  default = "vpc-0example000000000"  # Replace with your VPC ID

}

 

variable "subnet_ids" {

  type    = list(string)

  default = ["subnet-0example000000000", "subnet-0example000000000", "subnet-0example000000000"]  # Replace with your subnet IDs

}

 

variable "domain_name" {

  type    = string

  default = "bgt.pulse.ABC"  # Replace with your desired domain name

}

 

variable "broker1_service_name" {

  type    = string

  default = "com.amazonaws.vpc.us-east-1.vpce-svc-aaaaaaaaaaaa"  # Replace with your Broker1 service name

}

 

variable "broker2_service_name" {

  type    = string

  default = "com.amazonaws.vpc.us-east-1.vpce-svc-bbbbbb"  # Replace with your Broker2 service name

}

 

variable "broker3_service_name" {

  type    = string

  default = "com.amazonaws.vpc.us-east-1.vpce-svc-ccccccc"  # Replace with your Broker3 service name

}

 

variable "dc1_service_name" {

  type    = string

  default = "com.amazonaws.vpc.us-east-1.vpce-svc-ddddddd"  # Replace with your DC1 service name

}

 

# VPC Endpoints

resource "aws_vpc_endpoint" "broker1" {

  vpc_id             = var.vpc_id

  service_name       = var.broker1_service_name

  vpc_endpoint_type  = "Interface"

  subnet_ids         = var.subnet_ids

  private_dns_enabled = true

 

  tags = {

    Name = "broker1.${var.domain_name}"

  }

}

 

resource "aws_vpc_endpoint" "broker2" {

  vpc_id             = var.vpc_id

  service_name       = var.broker2_service_name

  vpc_endpoint_type  = "Interface"

  subnet_ids         = var.subnet_ids

  private_dns_enabled = true

 

  tags = {

    Name = "broker2.${var.domain_name}"

  }

}

 

resource "aws_vpc_endpoint" "broker3" {

  vpc_id             = var.vpc_id

  service_name       = var.broker3_service_name

  vpc_endpoint_type  = "Interface"

  subnet_ids         = var.subnet_ids

  private_dns_enabled = true

 

  tags = {

    Name = "broker3.${var.domain_name}"

  }

}

 

resource "aws_vpc_endpoint" "dc1" {

  vpc_id             = var.vpc_id

  service_name       = var.dc1_service_name

  vpc_endpoint_type  = "Interface"

  subnet_ids         = var.subnet_ids

  private_dns_enabled = true

 

  tags = {

    Name = "dc1.${var.domain_name}"

  }

}

 

# Route 53 Private Hosted Zone

resource "aws_route53_zone" "private_zone" {

  name = var.domain_name

  vpc {

    vpc_id = var.vpc_id

  }

}

 

# Route 53 CNAME Records for each VPC Endpoint

resource "aws_route53_record" "broker1_cname" {

  zone_id = aws_route53_zone.private_zone.zone_id

  name    = "broker1.${var.domain_name}"

  type    = "CNAME"

  ttl     = 300

  records = [aws_vpc_endpoint.broker1.dns_entry.0.dns_name]

}

 

resource "aws_route53_record" "broker2_cname" {

  zone_id = aws_route53_zone.private_zone.zone_id

  name    = "broker2.${var.domain_name}"

  type    = "CNAME"

  ttl     = 300

  records = [aws_vpc_endpoint.broker2.dns_entry.0.dns_name]

}

 

resource "aws_route53_record" "broker3_cname" {

  zone_id = aws_route53_zone.private_zone.zone_id

  name    = "broker3.${var.domain_name}"

  type    = "CNAME"

  ttl     = 300

  records = [aws_vpc_endpoint.broker3.dns_entry.0.dns_name]

}

 

resource "aws_route53_record" "dc1_cname" {

  zone_id = aws_route53_zone.private_zone.zone_id

  name    = "dc1.${var.domain_name}"

  type    = "CNAME"

  ttl     = 300

  records = [aws_vpc_endpoint.dc1.dns_entry.0.dns_name]

}

 

output "broker1_endpoint_dns" {

  description = "The DNS name for broker1 endpoint"

  value       = aws_vpc_endpoint.broker1.dns_entry.0.dns_name

}

 

output "broker2_endpoint_dns" {

  description = "The DNS name for broker2 endpoint"

  value       = aws_vpc_endpoint.broker2.dns_entry.0.dns_name

}

 

output "broker3_endpoint_dns" {

  description = "The DNS name for broker3 endpoint"

  value       = aws_vpc_endpoint.broker3.dns_entry.0.dns_name

}

 

output "dc1_endpoint_dns" {

  description = "The DNS name for dc1 endpoint"

  value       = aws_vpc_endpoint.dc1.dns_entry.0.dns_name

}

 

Explanation

  1. VPC Endpoints (aws_vpc_endpoint resources): Creates VPC Interface Endpoints for each Kafka broker and dc1, using the specified service_name, vpc_id, and subnet_ids.
  2. Route 53 Private Hosted Zone (aws_route53_zone): Creates a private hosted zone for the domain specified in domain_name, making it accessible only within the specified VPC.
  3. CNAME Records (aws_route53_record resources): Creates a CNAME record in the private hosted zone for each VPC endpoint (e.g., broker1.bgt.pulse.ABC), pointing it to the DNS name of the respective endpoint.
  4. Outputs: Provides the DNS names of each endpoint, allowing you to access them easily after deployment.

Usage Notes

  • Adjust vpc_id, subnet_ids, and service names to match your environment.
  • This configuration assumes the us-east-1 region; adjust the provider region if needed.

This Terraform configuration enables you to manage your AWS resources with a similar setup as the provided CloudFormation template, allowing access to Kafka brokers via user-friendly DNS names in a VPC.

 

AWS CDK (Cloud Development Kit) stack written in Python that creates the same resources as the previous Terraform and CloudFormation templates: VPC Interface Endpoints, Route 53 Private Hosted Zone, and CNAME records for Kafka brokers.

 

AWS CDK (Python) Code

To get started, make sure you have AWS CDK installed, and initialize a Python CDK project by running:

 

cdk init app --language python

 

Then, add the following dependencies to requirements.txt:

aws-cdk-lib

constructs

 

Then, run pip install -r requirements.txt to install dependencies.

Next, create the following CDK stack in lib/my_kafka_stack.py:

from aws_cdk import (

    Stack,

    aws_ec2 as ec2,

    aws_route53 as route53,

    aws_route53_targets as targets,

)

from constructs import Construct

 

class MyKafkaStack(Stack):

 

    def __init__(self, scope: Construct, id: str, **kwargs) -> None:

        super().__init__(scope, id, **kwargs)

 

        # Parameters for VPC, Subnets, and Service Names

        vpc_id = "vpc-0example000000000"  # Replace with your VPC ID

        subnet_ids = ["subnet-0example000000000", "subnet-0example000000000", "subnet-0example000000000"]  # Replace with your subnet IDs

        domain_name = "bgt.pulse.ABC"  # Replace with your domain name

 

        # Kafka Broker Service Names

        broker1_service_name = "com.amazonaws.vpc.us-east-1.vpce-svc-aaaaaaaaaaaa"  # Replace with actual service name

        broker2_service_name = "com.amazonaws.vpc.us-east-1.vpce-svc-bbbbbb"  # Replace with actual service name

        broker3_service_name = "com.amazonaws.vpc.us-east-1.vpce-svc-ccccccc"  # Replace with actual service name

        dc1_service_name = "com.amazonaws.vpc.us-east-1.vpce-svc-ddddddd"  # Replace with actual service name

 

        # Fetch existing VPC

        vpc = ec2.Vpc.from_lookup(self, "ExistingVPC", vpc_id=vpc_id)

 

        # VPC Endpoint for Broker 1

        broker1_endpoint = ec2.InterfaceVpcEndpoint(

            self, "Broker1Endpoint",

            vpc=vpc,

            service=ec2.InterfaceVpcEndpointService(broker1_service_name, 443),

            subnets=ec2.SubnetSelection(subnets=[ec2.Subnet.from_subnet_id(self, f"Subnet{i+1}", subnet_id) for i, subnet_id in enumerate(subnet_ids)]),

            private_dns_enabled=True

        )

 

        # VPC Endpoint for Broker 2

        broker2_endpoint = ec2.InterfaceVpcEndpoint(

            self, "Broker2Endpoint",

            vpc=vpc,

            service=ec2.InterfaceVpcEndpointService(broker2_service_name, 443),

            subnets=ec2.SubnetSelection(subnets=[ec2.Subnet.from_subnet_id(self, f"Subnet{i+1}", subnet_id) for i, subnet_id in enumerate(subnet_ids)]),

            private_dns_enabled=True

        )

 

        # VPC Endpoint for Broker 3

        broker3_endpoint = ec2.InterfaceVpcEndpoint(

            self, "Broker3Endpoint",

            vpc=vpc,

            service=ec2.InterfaceVpcEndpointService(broker3_service_name, 443),

            subnets=ec2.SubnetSelection(subnets=[ec2.Subnet.from_subnet_id(self, f"Subnet{i+1}", subnet_id) for i, subnet_id in enumerate(subnet_ids)]),

            private_dns_enabled=True

        )

 

        # VPC Endpoint for DC1

        dc1_endpoint = ec2.InterfaceVpcEndpoint(

            self, "DC1Endpoint",

            vpc=vpc,

            service=ec2.InterfaceVpcEndpointService(dc1_service_name, 443),

            subnets=ec2.SubnetSelection(subnets=[ec2.Subnet.from_subnet_id(self, f"Subnet{i+1}", subnet_id) for i, subnet_id in enumerate(subnet_ids)]),

            private_dns_enabled=True

        )

 

        # Route 53 Private Hosted Zone

        private_hosted_zone = route53.PrivateHostedZone(

            self, "PrivateHostedZone",

            zone_name=domain_name,

            vpc=vpc

        )

 

        # CNAME Records for each broker

        route53.CnameRecord(

            self, "Broker1Cname",

            zone=private_hosted_zone,

            record_name=f"broker1.{domain_name}",

            domain_name=broker1_endpoint.vpc_endpoint_dns_entries[0]

        )

 

        route53.CnameRecord(

            self, "Broker2Cname",

            zone=private_hosted_zone,

            record_name=f"broker2.{domain_name}",

            domain_name=broker2_endpoint.vpc_endpoint_dns_entries[0]

        )

 

        route53.CnameRecord(

            self, "Broker3Cname",

            zone=private_hosted_zone,

            record_name=f"broker3.{domain_name}",

            domain_name=broker3_endpoint.vpc_endpoint_dns_entries[0]

        )

 

        route53.CnameRecord(

            self, "DC1Cname",

            zone=private_hosted_zone,

            record_name=f"dc1.{domain_name}",

            domain_name=dc1_endpoint.vpc_endpoint_dns_entries[0]

        )

 

Steps to Deploy

  1. Save the code in lib/my_kafka_stack.py.
  2. In app.py, add the following to import and instantiate the stack:

 

from aws_cdk import App

from my_kafka_stack import MyKafkaStack

 

app = App()

MyKafkaStack(app, "MyKafkaStack")

app.synth()

 

3.      Deploy the stack:
cdk deploy


Tuesday 29 October 2024

AWS CDK code to set up an Athena-Snowflake connector using a Lambda function and Lambda layer for the Snowflake JDBC driver

AWS CDK code to set up an Athena-Snowflake connector using a Lambda function and Lambda layer for the Snowflake JDBC driver. This solution also retrieves Snowflake credentials from AWS Secrets Manager.

To use this code, you'll need an existing AWS Secrets Manager secret that stores the Snowflake credentials. If you haven't created it yet, you can follow the previous steps to create a secret with the necessary key-value pairs.


Step 1: Install AWS CDK and Dependencies

Make sure you have the AWS CDK installed. If not, install it first:

npm install -g aws-cdk


Next, create a new CDK app

mkdir athena-snowflake-connector
cd athena-snowflake-connector
cdk init app --language python

Install the necessary AWS CDK modules:

pip install aws-cdk.aws_lambda aws-cdk.aws_secretsmanager aws-cdk.aws_iam aws-cdk.aws_s3

Step 2: Define the CDK Stack for the Athena-Snowflake Connector

Below is the CDK code in Python to create a Lambda function, a Lambda layer for the Snowflake JDBC driver, and IAM permissions to access Secrets Manager.

  1. Create a file named athena_snowflake_connector_stack.py in the lib folder (or modify lib/athena-snowflake-connector-stack.py if generated by CDK).
  2. Add the following code to the file.

from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_iam as iam,
    aws_secretsmanager as secretsmanager,
    aws_s3 as s3,
    Duration,
)
from constructs import Construct
import os

class AthenaSnowflakeConnectorStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Parameter: S3 bucket and key for the Snowflake JDBC driver
        s3_bucket_name = "my-s3-bucket"  # Replace with your S3 bucket name
        s3_key = "snowflake-jdbc-<version>.jar"  # Replace with the S3 key for the JDBC driver
        
        # Secrets Manager ARN for Snowflake credentials
        snowflake_secret_arn = "arn:aws:secretsmanager:your-region:123456789012:secret:snowflake/credentials"  # Replace with your Secret ARN

        # Create the Lambda Layer for Snowflake JDBC Driver
        snowflake_jdbc_layer = _lambda.LayerVersion(
            self, "SnowflakeJDBCLayer",
            code=_lambda.Code.from_bucket(
                bucket=s3.Bucket.from_bucket_name(self, "JDBCBucket", s3_bucket_name),
                key=s3_key
            ),
            compatible_runtimes=[_lambda.Runtime.PYTHON_3_8, _lambda.Runtime.PYTHON_3_9],
            description="Lambda layer for Snowflake JDBC driver"
        )

        # IAM Role for Lambda function
        lambda_role = iam.Role(
            self, "LambdaExecutionRole",
            assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
            inline_policies={
                "LambdaBasicExecution": iam.PolicyDocument(statements=[
                    iam.PolicyStatement(
                        actions=[
                            "logs:CreateLogGroup",
                            "logs:CreateLogStream",
                            "logs:PutLogEvents"
                        ],
                        resources=["*"]
                    ),
                    iam.PolicyStatement(
                        actions=[
                            "athena:StartQueryExecution",
                            "athena:GetQueryExecution",
                            "athena:GetQueryResults",
                            "s3:PutObject",
                            "s3:GetObject"
                        ],
                        resources=["*"]
                    ),
                    iam.PolicyStatement(
                        actions=["secretsmanager:GetSecretValue"],
                        resources=[snowflake_secret_arn]
                    )
                ])
            }
        )

        # Lambda Function for Athena-Snowflake connection
        athena_snowflake_connector = _lambda.Function(
            self, "AthenaSnowflakeConnector",
            runtime=_lambda.Runtime.PYTHON_3_8,
            handler="app.lambda_handler",
            code=_lambda.Code.from_asset("src"),  # Path to source code
            memory_size=1024,
            timeout=Duration.seconds(300),
            role=lambda_role,
            layers=[snowflake_jdbc_layer],
            environment={
                "SECRET_ARN": snowflake_secret_arn
            }
        )

        # Outputs
        self.athena_snowflake_connector = athena_snowflake_connector

Step 3: Create the Lambda Function Code (src/app.py)

In your CDK project directory, create a folder named src and add a file named app.py inside it with the following code. This Lambda function retrieves Snowflake credentials from Secrets Manager and establishes a connection to Snowflake.

src/app.py

import os
import json
import boto3
import snowflake.connector
from botocore.exceptions import ClientError

def get_snowflake_credentials(secret_arn):
    # Initialize Secrets Manager client
    client = boto3.client('secretsmanager')
    try:
        # Retrieve the secret value
        response = client.get_secret_value(SecretId=secret_arn)
        secret = json.loads(response['SecretString'])
        return secret
    except ClientError as e:
        print(f"Error retrieving secret: {e}")
        raise e

def lambda_handler(event, context):
    # Retrieve Snowflake credentials from Secrets Manager
    secret_arn = os.getenv('SECRET_ARN')
    credentials = get_snowflake_credentials(secret_arn)
    
    # Extract the SQL query from the event
    sql_query = event.get('query')
    if not sql_query:
        return {"error": "No query provided"}

    # Establish connection to Snowflake using retrieved credentials
    conn = snowflake.connector.connect(
        user=credentials['SNOWFLAKE_USER'],
        password=credentials['SNOWFLAKE_PASSWORD'],
        account=credentials['SNOWFLAKE_ACCOUNT'],
        warehouse=credentials['SNOWFLAKE_WAREHOUSE'],
        database=credentials['SNOWFLAKE_DATABASE'],
        schema=credentials['SNOWFLAKE_SCHEMA']
    )

    cursor = conn.cursor()
    try:
        # Execute the query
        cursor.execute(sql_query)
        # Fetch results
        results = cursor.fetchall()

        # Format the results for Athena
        response = [{"column_name": value for value in row} for row in results]
        return {"statusCode": 200, "body": json.dumps(response)}

    except Exception as e:
        return {"error": str(e)}

    finally:
        cursor.close()
        conn.close()


Step 4: Deploy the CDK Stack

  1. Bootstrap the CDK

    cdk bootstrap

     

  2. Deploy the stack:
    cdk deploy

CloudFormation YAML template to set up a serverless application that connects Amazon Athena to Snowflake.

Here’s the CloudFormation YAML template to set up a serverless application that connects Amazon Athena to Snowflake. This setup uses a Lambda layer for the Snowflake JDBC driver and a Lambda function to handle the connection and query execution.


Step 1: Store Snowflake Credentials in Secrets Manager

  1. Go to AWS Secrets Manager in the AWS Management Console.

  2. Choose Store a new secret.

  3. Select Other type of secret and input the following key-value pairs:

    KeyValue
    SNOWFLAKE_ACCOUNTyour_snowflake_account
    SNOWFLAKE_USERyour_snowflake_user
    SNOWFLAKE_PASSWORDyour_snowflake_password
    SNOWFLAKE_DATABASEyour_snowflake_database
    SNOWFLAKE_SCHEMAyour_snowflake_schema
    SNOWFLAKE_WAREHOUSEyour_snowflake_warehouse
  4. Name the secret (e.g., snowflake/credentials) and complete the setup. Note down the Secret ARN


Step 2: Update the CloudFormation Template to Use Secrets Manager

Below is the modified CloudFormation template that uses Secrets Manager to retrieve Snowflake credentials, and parameters to handle environment variables.

+++

AWSTemplateFormatVersion: '2010-09-09'

Transform: AWS::Serverless-2016-10-31


Parameters:

  SecretArn:

    Type: String

    Description: ARN of the Snowflake credentials stored in Secrets Manager


Resources:

  # Lambda Layer to hold the Snowflake JDBC driver

  SnowflakeJDBCLayer:

    Type: AWS::Serverless::LayerVersion

    Properties:

      ContentUri: s3://my-s3-bucket/snowflake-jdbc-<version>.jar  # Replace with your S3 bucket and key

      CompatibleRuntimes:

        - python3.8

        - python3.9

      Description: 'Layer containing Snowflake JDBC driver for Lambda function'


  # IAM Role for Lambda function to access Athena, CloudWatch, and Secrets Manager

  LambdaExecutionRole:

    Type: AWS::IAM::Role

    Properties:

      AssumeRolePolicyDocument:

        Version: '2012-10-17'

        Statement:

          - Effect: Allow

            Principal:

              Service: lambda.amazonaws.com

            Action: sts:AssumeRole

      Policies:

        - PolicyName: LambdaBasicExecution

          PolicyDocument:

            Version: '2012-10-17'

            Statement:

              - Effect: Allow

                Action:

                  - logs:CreateLogGroup

                  - logs:CreateLogStream

                  - logs:PutLogEvents

                Resource: '*'

        - PolicyName: AthenaAccess

          PolicyDocument:

            Version: '2012-10-17'

            Statement:

              - Effect: Allow

                Action:

                  - athena:StartQueryExecution

                  - athena:GetQueryExecution

                  - athena:GetQueryResults

                  - s3:PutObject

                  - s3:GetObject

                Resource: '*'

        - PolicyName: SecretsManagerAccess

          PolicyDocument:

            Version: '2012-10-17'

            Statement:

              - Effect: Allow

                Action:

                  - secretsmanager:GetSecretValue

                Resource: !Ref SecretArn


  # Lambda Function for Athena-Snowflake connection

  AthenaSnowflakeConnector:

    Type: AWS::Serverless::Function

    Properties:

      Handler: src/app.lambda_handler

      Runtime: python3.8

      CodeUri: ./src  # Ensure `src/app.py` is included in this path

      MemorySize: 1024

      Timeout: 300

      Role: !GetAtt LambdaExecutionRole.Arn

      Layers:

        - !Ref SnowflakeJDBCLayer

      Environment:

        Variables:

          SECRET_ARN: !Ref SecretArn


Outputs:

  LambdaFunctionName:

    Description: "Lambda function for Athena to Snowflake connection"

    Value: !Ref AthenaSnowflakeConnector


+++

Step 3: Update app.py to Fetch Secrets from Secrets Manager

Modify the Lambda function code in src/app.py to retrieve credentials from Secrets Manager using the boto3 SDK.

src/app.py

++++++++
import os
import json
import boto3
import snowflake.connector
from botocore.exceptions import ClientError

def get_snowflake_credentials(secret_arn):
    # Initialize Secrets Manager client
    client = boto3.client('secretsmanager')
    try:
        # Retrieve the secret value
        response = client.get_secret_value(SecretId=secret_arn)
        secret = json.loads(response['SecretString'])
        return secret
    except ClientError as e:
        print(f"Error retrieving secret: {e}")
        raise e

def lambda_handler(event, context):
    # Retrieve Snowflake credentials from Secrets Manager
    secret_arn = os.getenv('SECRET_ARN')
    credentials = get_snowflake_credentials(secret_arn)
    
    # Extract the SQL query from the event
    sql_query = event.get('query')
    if not sql_query:
        return {"error": "No query provided"}

    # Establish connection to Snowflake using retrieved credentials
    conn = snowflake.connector.connect(
        user=credentials['SNOWFLAKE_USER'],
        password=credentials['SNOWFLAKE_PASSWORD'],
        account=credentials['SNOWFLAKE_ACCOUNT'],
        warehouse=credentials['SNOWFLAKE_WAREHOUSE'],
        database=credentials['SNOWFLAKE_DATABASE'],
        schema=credentials['SNOWFLAKE_SCHEMA']
    )

    cursor = conn.cursor()
    try:
        # Execute the query
        cursor.execute(sql_query)
        # Fetch results
        results = cursor.fetchall()

        # Format the results for Athena
        response = [{"column_name": value for value in row} for row in results]
        return {"statusCode": 200, "body": json.dumps(response)}

    except Exception as e:
        return {"error": str(e)}

    finally:
        cursor.close()
        conn.close()

++++++++

Step 4: Deploy the CloudFormation Stack

  1. Package and Deploy the CloudFormation stack with the AWS CLI:

    aws cloudformation package --template-file template.yaml --s3-bucket my-s3-bucket --output-template-file packaged-template.yaml

    aws cloudformation deploy --template-file packaged-template.yaml --stack-name AthenaSnowflakeConnectorStack --capabilities CAPABILITY_IAM --parameter-overrides SecretArn="arn:aws:secretsmanager:your-region:123456789012:secret:snowflake/credentials"

  2. Replace arn:aws:secretsmanager:your-region:123456789012:secret:snowflake/credentials with the actual ARN of your Secrets Manager secret.
  3. Explanation of the Template

    1. SnowflakeJDBCLayer: This Lambda layer is created from the JDBC driver JAR file in S3. Replace s3://my-s3-bucket/snowflake-jdbc-<version>.jar with the path to your JDBC JAR file in S3.

    2. LambdaExecutionRole: IAM role that gives the Lambda function permissions to log to CloudWatch and interact with Athena.

    3. AthenaSnowflakeConnector: The Lambda function that connects to Snowflake using the JDBC driver in the layer, executes the query, and formats the response for Athena.


  • After deployment, go to the Athena Console and set up a new data source:

    • Go to Data Sources and select Lambda.
    • Choose the AthenaSnowflakeConnector Lambda function from the list.
  • Run a Query in Athena:

    • Use a query in Athena to test the connection to Snowflake
    • SELECT * FROM snowflake_connector."your_database"."your_schema"."your_table" LIMIT 10;
  • AWS Serverless Application Model (SAM) and JDBC driver for Snowflake

     Using AWS Serverless Application Model (SAM) and JDBC driver for Snowflake, you can set up a Lambda function to serve as a federated query connector, enabling Amazon Athena to query Snowflake data. Here’s a comprehensive step-by-step guide, from setting up the environment to deploying the SAM template and calling Snowflake from Athena.


    Overview

    1. Download and Upload Snowflake JDBC Driver to S3.
    2. Create a Lambda Layer with the JDBC driver using SAM.
    3. Configure the Lambda Function in SAM for Athena to query Snowflake.
    4. Deploy the Application using SAM CLI.
    5. Register the Connector in Athena.
    6. Query Snowflake from Athena.

    Step 1: Download the Snowflake JDBC Driver and Upload to S3

    1. Download JDBC Driver:

    2. Upload to S3:

      • Choose an S3 bucket where you have permission to upload. For example, my-s3-bucket.
      • Upload the snowflake-jdbc-<version>.jar file to this bucket. Note the S3 URI (s3://my-s3-bucket/snowflake-jdbc-<version>.jar).

    Step 2: Create the SAM Application with the JDBC Layer and Lambda Function

    Directory Structure

    my-snowflake-athena-connector/
    ├── src/
    │   └── app.py
    └── template.yaml

    template.yaml (SAM Template)

    AWSTemplateFormatVersion: '2010-09-09'
    Transform: 'AWS::Serverless-2016-10-31'
    Resources:
      # Lambda Layer to hold the JDBC Driver
      SnowflakeJDBCLayer:
        Type: 'AWS::Serverless::LayerVersion'
        Properties:
          ContentUri: s3://my-s3-bucket/snowflake-jdbc-<version>.jar
          CompatibleRuntimes:
            - python3.8
            - python3.9
          Description: 'Layer containing Snowflake JDBC driver for Lambda function'

      # Lambda Function that queries Snowflake
      AthenaSnowflakeConnector:
        Type: 'AWS::Serverless::Function'
        Properties:
          Handler: src/app.lambda_handler
          Runtime: python3.8
          Layers:
            - !Ref SnowflakeJDBCLayer
          MemorySize: 1024
          Timeout: 300
          Environment:
            Variables:
              SNOWFLAKE_ACCOUNT: 'your_snowflake_account'
              SNOWFLAKE_USER: 'your_snowflake_user'
              SNOWFLAKE_PASSWORD: 'your_snowflake_password'
              SNOWFLAKE_DATABASE: 'your_snowflake_database'
              SNOWFLAKE_SCHEMA: 'your_snowflake_schema'
              SNOWFLAKE_WAREHOUSE: 'your_snowflake_warehouse'
          Policies:
            - AWSLambdaBasicExecutionRole
            - Effect: Allow
              Action:
                - "athena:*"
              Resource: "*"

    Replace:

    • s3://my-s3-bucket/snowflake-jdbc-<version>.jar with the actual path of your uploaded Snowflake JDBC driver.
    • your_snowflake_account, your_snowflake_user, etc., with your Snowflake connection details.

    Step 3: Write the Lambda Function Code

    Create the app.py file inside the src/ directory to handle connections and query execution.

    src/app.py

    import snowflake.connector
    import os
    import json

    def lambda_handler(event, context):
        # Extract SQL query from Athena event
        sql_query = event.get('query')
        if not sql_query:
            return {"error": "No query provided"}

        # Establish connection to Snowflake
        conn = snowflake.connector.connect(
            user=os.getenv('SNOWFLAKE_USER'),
            password=os.getenv('SNOWFLAKE_PASSWORD'),
            account=os.getenv('SNOWFLAKE_ACCOUNT'),
            warehouse=os.getenv('SNOWFLAKE_WAREHOUSE'),
            database=os.getenv('SNOWFLAKE_DATABASE'),
            schema=os.getenv('SNOWFLAKE_SCHEMA')
        )

        cursor = conn.cursor()
        try:
            # Execute the query
            cursor.execute(sql_query)
            # Fetch results
            results = cursor.fetchall()

            # Format the results in a way Athena can interpret
            response = [{"column_name": value for value in row} for row in results]
            return {"statusCode": 200, "body": json.dumps(response)}

        except Exception as e:
            return {"error": str(e)}

        finally:
            cursor.close()
            conn.close()

    Step 4: Deploy the SAM Application

    1. Build and Package:

      sam build

      sam package --s3-bucket my-s3-bucket --output-template-file packaged.yaml

    2. Deploy the SAM Application:
      sam deploy --template-file packaged.yaml --stack-name AthenaSnowflakeConnector --capabilities CAPABILITY_IAM
    The stack will create the Lambda function and layer, enabling it to connect to Snowflake with the JDBC driver.

    Step 5: Register the Connector in Athena

    1. Go to the Athena Console:

      • Open the Athena Console.
      • Go to Data Sources > Connect Data Source.
      • Select Lambda as the connection type.
    2. Choose the Lambda Function:

      • Select the AthenaSnowflakeConnector Lambda function from the list.
      • Name the data source (e.g., snowflake_connector).
    3. Test the Connection:

      • Run a test query in Athena, such as:
      • SELECT * FROM snowflake_connector."your_database"."your_schema"."your_table" LIMIT 10;

    4. Athena will invoke the Lambda function, which will execute the SQL query in Snowflake and return the results.



      Step 6: Query Snowflake Data in Athena

      Now, you can use Athena’s SQL interface to query Snowflake data using the snowflake_connector you configured.

      SELECT * FROM snowflake_connector."your_database"."your_schema"."your_table" LIMIT 10;


    Sunday 27 October 2024

    Python virtual environment in Windows and copying it to a server

    1. Set Up a Python Virtual Environment on Windows (CMD Compatible)

    1. Install Python (if not already installed).
    2. Open Command Prompt and Navigate to Your Project Directory:

    cmd

     cd path\to\your\project

    1. Create a Virtual Environment:

    cmd

     python -m venv venv

    1. Activate the Virtual Environment:

    In CMD, use:

    cmd

     venv\Scripts\activate

    If using PowerShell, the command would be slightly different:

    powershell

     .\venv\Scripts\Activate.ps1

    1. Install Dependencies:

    cmd

     pip install -r requirements.txt

    2. Copy the Virtual Environment to the Server

    Since Windows CMD does not support native tar and scp commands, you’ll need some workarounds:

    1. Compress the Virtual Environment Using a Tool Like 7-Zip:
      • Right-click on the venv folder and compress it into a .zip file using 7-Zip or a similar tool.
      • Name the file venv.zip.
    2. Transfer the Archive to the Server:

    Use an FTP client (e.g., FileZilla) or, if you have installed the Windows Subsystem for Linux (WSL), you can use scp in a WSL terminal:

    bash

     scp venv.zip user@server_ip:/path/to/server/directory

    1. Decompress on the Server:

    Log into your server and navigate to the directory where you copied venv.zip, then unzip it:

    bash

     unzip venv.zip

    1. Activate the Virtual Environment on the Server:

    bash

     source /path/to/server/directory/venv/bin/activate

    1. Verify Dependencies:

    Run pip freeze to confirm all required packages are present and install any missing ones if needed.