Here’s the CloudFormation YAML template to set up a serverless application that connects Amazon Athena to Snowflake. This setup uses a Lambda layer for the Snowflake JDBC driver and a Lambda function to handle the connection and query execution.
Step 1: Store Snowflake Credentials in Secrets Manager
Go to AWS Secrets Manager in the AWS Management Console.
Choose Store a new secret.
Select Other type of secret and input the following key-value pairs:
Key Value SNOWFLAKE_ACCOUNT
your_snowflake_account SNOWFLAKE_USER
your_snowflake_user SNOWFLAKE_PASSWORD
your_snowflake_password SNOWFLAKE_DATABASE
your_snowflake_database SNOWFLAKE_SCHEMA
your_snowflake_schema SNOWFLAKE_WAREHOUSE
your_snowflake_warehouse Name the secret (e.g.,
snowflake/credentials
) and complete the setup. Note down the Secret ARN
Step 2: Update the CloudFormation Template to Use Secrets Manager
Below is the modified CloudFormation template that uses Secrets Manager to retrieve Snowflake credentials, and parameters to handle environment variables.
+++
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Parameters:
SecretArn:
Type: String
Description: ARN of the Snowflake credentials stored in Secrets Manager
Resources:
# Lambda Layer to hold the Snowflake JDBC driver
SnowflakeJDBCLayer:
Type: AWS::Serverless::LayerVersion
Properties:
ContentUri: s3://my-s3-bucket/snowflake-jdbc-<version>.jar # Replace with your S3 bucket and key
CompatibleRuntimes:
- python3.8
- python3.9
Description: 'Layer containing Snowflake JDBC driver for Lambda function'
# IAM Role for Lambda function to access Athena, CloudWatch, and Secrets Manager
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: LambdaBasicExecution
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- PolicyName: AthenaAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- athena:StartQueryExecution
- athena:GetQueryExecution
- athena:GetQueryResults
- s3:PutObject
- s3:GetObject
Resource: '*'
- PolicyName: SecretsManagerAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- secretsmanager:GetSecretValue
Resource: !Ref SecretArn
# Lambda Function for Athena-Snowflake connection
AthenaSnowflakeConnector:
Type: AWS::Serverless::Function
Properties:
Handler: src/app.lambda_handler
Runtime: python3.8
CodeUri: ./src # Ensure `src/app.py` is included in this path
MemorySize: 1024
Timeout: 300
Role: !GetAtt LambdaExecutionRole.Arn
Layers:
- !Ref SnowflakeJDBCLayer
Environment:
Variables:
SECRET_ARN: !Ref SecretArn
Outputs:
LambdaFunctionName:
Description: "Lambda function for Athena to Snowflake connection"
Value: !Ref AthenaSnowflakeConnector
+++
Step 3: Update app.py
to Fetch Secrets from Secrets Manager
Modify the Lambda function code in src/app.py
to retrieve credentials from Secrets Manager using the boto3
SDK.
src/app.py
++++++++
import osimport jsonimport boto3import snowflake.connectorfrom botocore.exceptions import ClientError
def get_snowflake_credentials(secret_arn): # Initialize Secrets Manager client client = boto3.client('secretsmanager') try: # Retrieve the secret value response = client.get_secret_value(SecretId=secret_arn) secret = json.loads(response['SecretString']) return secret except ClientError as e: print(f"Error retrieving secret: {e}") raise e
def lambda_handler(event, context): # Retrieve Snowflake credentials from Secrets Manager secret_arn = os.getenv('SECRET_ARN') credentials = get_snowflake_credentials(secret_arn) # Extract the SQL query from the event sql_query = event.get('query') if not sql_query: return {"error": "No query provided"}
# Establish connection to Snowflake using retrieved credentials conn = snowflake.connector.connect( user=credentials['SNOWFLAKE_USER'], password=credentials['SNOWFLAKE_PASSWORD'], account=credentials['SNOWFLAKE_ACCOUNT'], warehouse=credentials['SNOWFLAKE_WAREHOUSE'], database=credentials['SNOWFLAKE_DATABASE'], schema=credentials['SNOWFLAKE_SCHEMA'] )
cursor = conn.cursor() try: # Execute the query cursor.execute(sql_query) # Fetch results results = cursor.fetchall()
# Format the results for Athena response = [{"column_name": value for value in row} for row in results] return {"statusCode": 200, "body": json.dumps(response)}
except Exception as e: return {"error": str(e)}
finally: cursor.close() conn.close()
++++++++
Step 4: Deploy the CloudFormation Stack
Package and Deploy the CloudFormation stack with the AWS CLI:
aws cloudformation package --template-file template.yaml --s3-bucket my-s3-bucket --output-template-file packaged-template.yaml
aws cloudformation deploy --template-file packaged-template.yaml --stack-name AthenaSnowflakeConnectorStack --capabilities CAPABILITY_IAM --parameter-overrides SecretArn="arn:aws:secretsmanager:your-region:123456789012:secret:snowflake/credentials"
- Replace
arn:aws:secretsmanager:your-region:123456789012:secret:snowflake/credentials
with the actual ARN of your Secrets Manager secret. Explanation of the Template
SnowflakeJDBCLayer: This Lambda layer is created from the JDBC driver JAR file in S3. Replace s3://my-s3-bucket/snowflake-jdbc-<version>.jar
with the path to your JDBC JAR file in S3.
LambdaExecutionRole: IAM role that gives the Lambda function permissions to log to CloudWatch and interact with Athena.
AthenaSnowflakeConnector: The Lambda function that connects to Snowflake using the JDBC driver in the layer, executes the query, and formats the response for Athena.
After deployment, go to the Athena Console and set up a new data source:
- Go to Data Sources and select Lambda.
- Choose the
AthenaSnowflakeConnector
Lambda function from the list.
Run a Query in Athena:
- Use a query in Athena to test the connection to Snowflake
- SELECT * FROM snowflake_connector."your_database"."your_schema"."your_table" LIMIT 10;
No comments:
Post a Comment