AWSTemplateFormatVersion: '2010-09-09'
Resources:
FastAPILambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: FastAPITradeGraphQL
Handler: lambda_function.lambda_handler
Runtime: python3.11
Timeout: 30
MemorySize: 512
Code:
S3Bucket: !Sub "${AWS::AccountId}-fastapi-app"
S3Key: "fastapi_trade_graphql.zip"
Role: !GetAtt LambdaExecutionRole.Arn
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastAPILambdaExecutionRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: LambdaBasicExecution
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
FastAPILambdaPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt FastAPILambdaFunction.Arn
Principal: apigateway.amazonaws.com
FastAPIAPI:
Type: AWS::ApiGatewayV2::Api
Properties:
Name: FastAPITradeGraphQLAPI
ProtocolType: HTTP
FastAPIIntegration:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref FastAPIAPI
IntegrationType: AWS_PROXY
IntegrationUri: !GetAtt FastAPILambdaFunction.Arn
FastAPIRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref FastAPIAPI
RouteKey: "POST /graphql"
Target: !Sub "integrations/${FastAPIIntegration}"
FastAPIStage:
Type: AWS::ApiGatewayV2::Stage
Properties:
ApiId: !Ref FastAPIAPI
StageName: "$default"
AutoDeploy: true
FastAPILoadBalancer:
Type: AWS::ElasticLoadBalancingV2::LoadBalancer
Properties:
Name: FastAPITradeGraphQLLB
Scheme: internet-facing
Type: application
Subnets:
- subnet-xxxxxxxx
- subnet-yyyyyyyy
SecurityGroups:
- !GetAtt FastAPILBSecurityGroup.GroupId
FastAPILBSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Allow HTTP
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 80
ToPort: 80
CidrIp: 0.0.0.0/0
Outputs:
FastAPIInvokeURL:
Value: !Sub "https://${FastAPIAPI}.execute-api.${AWS::Region}.amazonaws.com/"
# Instructions to upload Lambda code:
# 1. Zip your FastAPI app code (including dependencies) into fastapi_trade_graphql.zip.
# 2. Upload the zip to an S3 bucket named '<account-id>-fastapi-app' (replace with your AWS Account ID).
# aws s3 cp fastapi_trade_graphql.zip s3://<account-id>-fastapi-app/
# 3. Deploy the CloudFormation template:
# aws cloudformation create-stack --stack-name FastAPITradeGraphQLStack --template-body file://fastapi_cft.yaml --capabilities CAPABILITY_IAM
# FastAPI Code:
from fastapi import FastAPI, HTTPException, Request, Depends
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import DecodeError
from confluent_kafka import Consumer, KafkaException
import puls_pb2 # Import your compiled Protobuf file
from datetime import datetime, timedelta
import strawberry
from strawberry.fastapi import GraphQLRouter
import time
app = FastAPI()
def authenticate(request: Request):
username = request.headers.get("username")
password = request.headers.get("password")
if username != "aaa" or password != "bbb":
raise HTTPException(status_code=401, detail="Invalid credentials")
def get_redpanda_data(symbol: str, start_timestamp: int, end_timestamp: int):
conf = {
'bootstrap.servers': 'localhost:9092', # Change to your Redpanda broker
'group.id': 'redpanda-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(["trades"])
trades = []
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
break
if msg.error():
raise KafkaException(msg.error())
trade_msg = puls_pb2.Trade()
try:
trade_msg.ParseFromString(msg.value())
if (trade_msg.symbol == symbol and
start_timestamp <= trade_msg.sourcetime <= end_timestamp):
trades.append(MessageToJson(trade_msg))
except DecodeError:
continue
finally:
consumer.close()
return trades if trades else None
@strawberry.type
class Trade:
feedmsgseq: int
sourcetime: int
symbol: str
tradeid: int
price: float
volume: int
tradecondition1: str
tradecondition2: str
marketid: str
@strawberry.type
class Query:
@strawberry.field
def trade(
self,
symbol: str,
last_hours: int = None,
info: strawberry.types.Info
) -> list[Trade]:
authenticate(info.context['request'])
end_timestamp = int(time.time())
start_timestamp = end_timestamp - (last_hours * 3600)
data_json_list = get_redpanda_data(symbol, start_timestamp, end_timestamp)
if not data_json_list:
raise HTTPException(status_code=404, detail="No trade data found")
trades = []
for data_json in data_json_list:
data = puls_pb2.Trade()
data.ParseFromString(data_json.encode())
trades.append(Trade(
feedmsgseq=data.feedmsgseq,
sourcetime=data.sourcetime,
symbol=data.symbol,
tradeid=data.tradeid,
price=data.price,
volume=data.volume,
tradecondition1=data.tradecondition1,
tradecondition2=data.tradecondition2,
marketid=data.marketid
))
return trades
def get_context():
return {"request": Request}
schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema, context_getter=get_context)
app.include_router(graphql_app, prefix="/graphql")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
No comments:
Post a Comment