AWS CDK code to set up an Athena-Snowflake connector using a Lambda function and Lambda layer for the Snowflake JDBC driver. This solution also retrieves Snowflake credentials from AWS Secrets Manager.
To use this code, you'll need an existing AWS Secrets Manager secret that stores the Snowflake credentials. If you haven't created it yet, you can follow the previous steps to create a secret with the necessary key-value pairs.
Step 1: Install AWS CDK and Dependencies
Make sure you have the AWS CDK installed. If not, install it first:
npm install -g aws-cdk
Step 2: Define the CDK Stack for the Athena-Snowflake Connector
Below is the CDK code in Python to create a Lambda function, a Lambda layer for the Snowflake JDBC driver, and IAM permissions to access Secrets Manager.
- Create a file named
athena_snowflake_connector_stack.py
in thelib
folder (or modifylib/athena-snowflake-connector-stack.py
if generated by CDK). - Add the following code to the file.
Step 3: Create the Lambda Function Code (src/app.py
)
In your CDK project directory, create a folder named src
and add a file named app.py
inside it with the following code. This Lambda function retrieves Snowflake credentials from Secrets Manager and establishes a connection to Snowflake.
src/app.py
import osimport jsonimport boto3import snowflake.connectorfrom botocore.exceptions import ClientError
def get_snowflake_credentials(secret_arn): # Initialize Secrets Manager client client = boto3.client('secretsmanager') try: # Retrieve the secret value response = client.get_secret_value(SecretId=secret_arn) secret = json.loads(response['SecretString']) return secret except ClientError as e: print(f"Error retrieving secret: {e}") raise e
def lambda_handler(event, context): # Retrieve Snowflake credentials from Secrets Manager secret_arn = os.getenv('SECRET_ARN') credentials = get_snowflake_credentials(secret_arn) # Extract the SQL query from the event sql_query = event.get('query') if not sql_query: return {"error": "No query provided"}
# Establish connection to Snowflake using retrieved credentials conn = snowflake.connector.connect( user=credentials['SNOWFLAKE_USER'], password=credentials['SNOWFLAKE_PASSWORD'], account=credentials['SNOWFLAKE_ACCOUNT'], warehouse=credentials['SNOWFLAKE_WAREHOUSE'], database=credentials['SNOWFLAKE_DATABASE'], schema=credentials['SNOWFLAKE_SCHEMA'] )
cursor = conn.cursor() try: # Execute the query cursor.execute(sql_query) # Fetch results results = cursor.fetchall()
# Format the results for Athena response = [{"column_name": value for value in row} for row in results] return {"statusCode": 200, "body": json.dumps(response)}
except Exception as e: return {"error": str(e)}
finally: cursor.close() conn.close()
Step 4: Deploy the CDK Stack
Bootstrap the CDK
cdk bootstrap
Deploy the stack:
cdk deploy