Sunday, 23 March 2025

CloudFormation Template (CFT) to host your FastAPI app behind an AWS ALB

 AWSTemplateFormatVersion: '2010-09-09'

Resources:

  FastAPILambdaFunction:

    Type: AWS::Lambda::Function

    Properties:

      FunctionName: FastAPITradeGraphQL

      Handler: lambda_function.lambda_handler

      Runtime: python3.11

      Timeout: 30

      MemorySize: 512

      Code:

        S3Bucket: !Sub "${AWS::AccountId}-fastapi-app"

        S3Key: "fastapi_trade_graphql.zip"

      Role: !GetAtt LambdaExecutionRole.Arn


  LambdaExecutionRole:

    Type: AWS::IAM::Role

    Properties:

      RoleName: FastAPILambdaExecutionRole

      AssumeRolePolicyDocument:

        Version: '2012-10-17'

        Statement:

          - Effect: Allow

            Principal:

              Service: lambda.amazonaws.com

            Action: sts:AssumeRole

      Policies:

        - PolicyName: LambdaBasicExecution

          PolicyDocument:

            Version: '2012-10-17'

            Statement:

              - Effect: Allow

                Action:

                  - logs:CreateLogGroup

                  - logs:CreateLogStream

                  - logs:PutLogEvents

                Resource: "*"


  FastAPILambdaPermission:

    Type: AWS::Lambda::Permission

    Properties:

      Action: lambda:InvokeFunction

      FunctionName: !GetAtt FastAPILambdaFunction.Arn

      Principal: apigateway.amazonaws.com


  FastAPIAPI:

    Type: AWS::ApiGatewayV2::Api

    Properties:

      Name: FastAPITradeGraphQLAPI

      ProtocolType: HTTP


  FastAPIIntegration:

    Type: AWS::ApiGatewayV2::Integration

    Properties:

      ApiId: !Ref FastAPIAPI

      IntegrationType: AWS_PROXY

      IntegrationUri: !GetAtt FastAPILambdaFunction.Arn


  FastAPIRoute:

    Type: AWS::ApiGatewayV2::Route

    Properties:

      ApiId: !Ref FastAPIAPI

      RouteKey: "POST /graphql"

      Target: !Sub "integrations/${FastAPIIntegration}"


  FastAPIStage:

    Type: AWS::ApiGatewayV2::Stage

    Properties:

      ApiId: !Ref FastAPIAPI

      StageName: "$default"

      AutoDeploy: true


  FastAPILoadBalancer:

    Type: AWS::ElasticLoadBalancingV2::LoadBalancer

    Properties:

      Name: FastAPITradeGraphQLLB

      Scheme: internet-facing

      Type: application

      Subnets:

        - subnet-xxxxxxxx

        - subnet-yyyyyyyy

      SecurityGroups:

        - !GetAtt FastAPILBSecurityGroup.GroupId


  FastAPILBSecurityGroup:

    Type: AWS::EC2::SecurityGroup

    Properties:

      GroupDescription: Allow HTTP

      SecurityGroupIngress:

        - IpProtocol: tcp

          FromPort: 80

          ToPort: 80

          CidrIp: 0.0.0.0/0


Outputs:

  FastAPIInvokeURL:

    Value: !Sub "https://${FastAPIAPI}.execute-api.${AWS::Region}.amazonaws.com/"


# Instructions to upload Lambda code:

# 1. Zip your FastAPI app code (including dependencies) into fastapi_trade_graphql.zip.

# 2. Upload the zip to an S3 bucket named '<account-id>-fastapi-app' (replace with your AWS Account ID).

#    aws s3 cp fastapi_trade_graphql.zip s3://<account-id>-fastapi-app/

# 3. Deploy the CloudFormation template:

#    aws cloudformation create-stack --stack-name FastAPITradeGraphQLStack --template-body file://fastapi_cft.yaml --capabilities CAPABILITY_IAM


# FastAPI Code:

from fastapi import FastAPI, HTTPException, Request, Depends

from google.protobuf.json_format import MessageToJson

from google.protobuf.message import DecodeError

from confluent_kafka import Consumer, KafkaException

import puls_pb2  # Import your compiled Protobuf file

from datetime import datetime, timedelta

import strawberry

from strawberry.fastapi import GraphQLRouter

import time


app = FastAPI()


def authenticate(request: Request):

    username = request.headers.get("username")

    password = request.headers.get("password")

    if username != "aaa" or password != "bbb":

        raise HTTPException(status_code=401, detail="Invalid credentials")


def get_redpanda_data(symbol: str, start_timestamp: int, end_timestamp: int):

    conf = {

        'bootstrap.servers': 'localhost:9092',  # Change to your Redpanda broker

        'group.id': 'redpanda-group',

        'auto.offset.reset': 'earliest'

    }

    consumer = Consumer(conf)

    consumer.subscribe(["trades"])


    trades = []

    try:

        while True:

            msg = consumer.poll(1.0)

            if msg is None:

                break

            if msg.error():

                raise KafkaException(msg.error())


            trade_msg = puls_pb2.Trade()

            try:

                trade_msg.ParseFromString(msg.value())

                if (trade_msg.symbol == symbol and

                        start_timestamp <= trade_msg.sourcetime <= end_timestamp):

                    trades.append(MessageToJson(trade_msg))

            except DecodeError:

                continue

    finally:

        consumer.close()

    

    return trades if trades else None


@strawberry.type

class Trade:

    feedmsgseq: int

    sourcetime: int

    symbol: str

    tradeid: int

    price: float

    volume: int

    tradecondition1: str

    tradecondition2: str

    marketid: str


@strawberry.type

class Query:

    @strawberry.field

    def trade(

        self, 

        symbol: str, 

        last_hours: int = None, 

        info: strawberry.types.Info

    ) -> list[Trade]:

        authenticate(info.context['request'])


        end_timestamp = int(time.time())

        start_timestamp = end_timestamp - (last_hours * 3600)

        

        data_json_list = get_redpanda_data(symbol, start_timestamp, end_timestamp)

        if not data_json_list:

            raise HTTPException(status_code=404, detail="No trade data found")


        trades = []

        for data_json in data_json_list:

            data = puls_pb2.Trade()

            data.ParseFromString(data_json.encode())

            trades.append(Trade(

                feedmsgseq=data.feedmsgseq,

                sourcetime=data.sourcetime,

                symbol=data.symbol,

                tradeid=data.tradeid,

                price=data.price,

                volume=data.volume,

                tradecondition1=data.tradecondition1,

                tradecondition2=data.tradecondition2,

                marketid=data.marketid

            ))

        return trades


def get_context():

    return {"request": Request}


schema = strawberry.Schema(query=Query)

graphql_app = GraphQLRouter(schema, context_getter=get_context)


app.include_router(graphql_app, prefix="/graphql")


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


No comments:

Post a Comment