Tuesday 29 October 2024

AWS CDK code to set up an Athena-Snowflake connector using a Lambda function and Lambda layer for the Snowflake JDBC driver

AWS CDK code to set up an Athena-Snowflake connector using a Lambda function and Lambda layer for the Snowflake JDBC driver. This solution also retrieves Snowflake credentials from AWS Secrets Manager.

To use this code, you'll need an existing AWS Secrets Manager secret that stores the Snowflake credentials. If you haven't created it yet, you can follow the previous steps to create a secret with the necessary key-value pairs.


Step 1: Install AWS CDK and Dependencies

Make sure you have the AWS CDK installed. If not, install it first:

npm install -g aws-cdk


Next, create a new CDK app

mkdir athena-snowflake-connector
cd athena-snowflake-connector
cdk init app --language python

Install the necessary AWS CDK modules:

pip install aws-cdk.aws_lambda aws-cdk.aws_secretsmanager aws-cdk.aws_iam aws-cdk.aws_s3

Step 2: Define the CDK Stack for the Athena-Snowflake Connector

Below is the CDK code in Python to create a Lambda function, a Lambda layer for the Snowflake JDBC driver, and IAM permissions to access Secrets Manager.

  1. Create a file named athena_snowflake_connector_stack.py in the lib folder (or modify lib/athena-snowflake-connector-stack.py if generated by CDK).
  2. Add the following code to the file.

from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_iam as iam,
    aws_secretsmanager as secretsmanager,
    aws_s3 as s3,
    Duration,
)
from constructs import Construct
import os

class AthenaSnowflakeConnectorStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Parameter: S3 bucket and key for the Snowflake JDBC driver
        s3_bucket_name = "my-s3-bucket"  # Replace with your S3 bucket name
        s3_key = "snowflake-jdbc-<version>.jar"  # Replace with the S3 key for the JDBC driver
        
        # Secrets Manager ARN for Snowflake credentials
        snowflake_secret_arn = "arn:aws:secretsmanager:your-region:123456789012:secret:snowflake/credentials"  # Replace with your Secret ARN

        # Create the Lambda Layer for Snowflake JDBC Driver
        snowflake_jdbc_layer = _lambda.LayerVersion(
            self, "SnowflakeJDBCLayer",
            code=_lambda.Code.from_bucket(
                bucket=s3.Bucket.from_bucket_name(self, "JDBCBucket", s3_bucket_name),
                key=s3_key
            ),
            compatible_runtimes=[_lambda.Runtime.PYTHON_3_8, _lambda.Runtime.PYTHON_3_9],
            description="Lambda layer for Snowflake JDBC driver"
        )

        # IAM Role for Lambda function
        lambda_role = iam.Role(
            self, "LambdaExecutionRole",
            assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
            inline_policies={
                "LambdaBasicExecution": iam.PolicyDocument(statements=[
                    iam.PolicyStatement(
                        actions=[
                            "logs:CreateLogGroup",
                            "logs:CreateLogStream",
                            "logs:PutLogEvents"
                        ],
                        resources=["*"]
                    ),
                    iam.PolicyStatement(
                        actions=[
                            "athena:StartQueryExecution",
                            "athena:GetQueryExecution",
                            "athena:GetQueryResults",
                            "s3:PutObject",
                            "s3:GetObject"
                        ],
                        resources=["*"]
                    ),
                    iam.PolicyStatement(
                        actions=["secretsmanager:GetSecretValue"],
                        resources=[snowflake_secret_arn]
                    )
                ])
            }
        )

        # Lambda Function for Athena-Snowflake connection
        athena_snowflake_connector = _lambda.Function(
            self, "AthenaSnowflakeConnector",
            runtime=_lambda.Runtime.PYTHON_3_8,
            handler="app.lambda_handler",
            code=_lambda.Code.from_asset("src"),  # Path to source code
            memory_size=1024,
            timeout=Duration.seconds(300),
            role=lambda_role,
            layers=[snowflake_jdbc_layer],
            environment={
                "SECRET_ARN": snowflake_secret_arn
            }
        )

        # Outputs
        self.athena_snowflake_connector = athena_snowflake_connector

Step 3: Create the Lambda Function Code (src/app.py)

In your CDK project directory, create a folder named src and add a file named app.py inside it with the following code. This Lambda function retrieves Snowflake credentials from Secrets Manager and establishes a connection to Snowflake.

src/app.py

import os
import json
import boto3
import snowflake.connector
from botocore.exceptions import ClientError

def get_snowflake_credentials(secret_arn):
    # Initialize Secrets Manager client
    client = boto3.client('secretsmanager')
    try:
        # Retrieve the secret value
        response = client.get_secret_value(SecretId=secret_arn)
        secret = json.loads(response['SecretString'])
        return secret
    except ClientError as e:
        print(f"Error retrieving secret: {e}")
        raise e

def lambda_handler(event, context):
    # Retrieve Snowflake credentials from Secrets Manager
    secret_arn = os.getenv('SECRET_ARN')
    credentials = get_snowflake_credentials(secret_arn)
    
    # Extract the SQL query from the event
    sql_query = event.get('query')
    if not sql_query:
        return {"error": "No query provided"}

    # Establish connection to Snowflake using retrieved credentials
    conn = snowflake.connector.connect(
        user=credentials['SNOWFLAKE_USER'],
        password=credentials['SNOWFLAKE_PASSWORD'],
        account=credentials['SNOWFLAKE_ACCOUNT'],
        warehouse=credentials['SNOWFLAKE_WAREHOUSE'],
        database=credentials['SNOWFLAKE_DATABASE'],
        schema=credentials['SNOWFLAKE_SCHEMA']
    )

    cursor = conn.cursor()
    try:
        # Execute the query
        cursor.execute(sql_query)
        # Fetch results
        results = cursor.fetchall()

        # Format the results for Athena
        response = [{"column_name": value for value in row} for row in results]
        return {"statusCode": 200, "body": json.dumps(response)}

    except Exception as e:
        return {"error": str(e)}

    finally:
        cursor.close()
        conn.close()


Step 4: Deploy the CDK Stack

  1. Bootstrap the CDK

    cdk bootstrap

     

  2. Deploy the stack:
    cdk deploy

No comments:

Post a Comment