Sunday, 23 March 2025

Python that create API for Redpanda using Graphql

 from fastapi import FastAPI, HTTPException, Request, Depends

from google.protobuf.json_format import MessageToJson

import puls_pb2  # Import your compiled Protobuf file

from datetime import datetime

import strawberry

from strawberry.fastapi import GraphQLRouter


app = FastAPI()


def authenticate(request: Request):

    username = request.headers.get("username")

    password = request.headers.get("password")

    if username != "aaa" or password != "bbb":

        raise HTTPException(status_code=401, detail="Invalid credentials")


def get_redpanda_data(symbol: str, start_date: str, end_date: str):

    # Simulated data retrieval logic (replace with actual Redpanda query)

    trade = puls_pb2.Trade(

        feedmsgseq=123456789,

        sourcetime=1631023200,

        symbol=symbol,

        tradeid=1001,

        price=150.5,

        volume=200,

        tradecondition1="A",

        tradecondition2="B",

        marketid="NASDAQ"

    )

    return MessageToJson(trade)


@strawberry.type

class Trade:

    feedmsgseq: int

    sourcetime: int

    symbol: str

    tradeid: int

    price: float

    volume: int

    tradecondition1: str

    tradecondition2: str

    marketid: str


@strawberry.type

class Query:

    @strawberry.field

    def trade(

        self, 

        symbol: str, 

        start_date: str, 

        end_date: str, 

        info: strawberry.types.Info

    ) -> Trade:

        authenticate(info.context['request'])

        try:

            # Validate date format

            datetime.strptime(start_date, "%Y-%m-%d")

            datetime.strptime(end_date, "%Y-%m-%d")

        except ValueError:

            raise HTTPException(status_code=400, detail="Invalid date format")

        

        data = puls_pb2.Trade(

            feedmsgseq=123456789,

            sourcetime=1631023200,

            symbol=symbol,

            tradeid=1001,

            price=150.5,

            volume=200,

            tradecondition1="A",

            tradecondition2="B",

            marketid="NASDAQ"

        )

        return Trade(

            feedmsgseq=data.feedmsgseq,

            sourcetime=data.sourcetime,

            symbol=data.symbol,

            tradeid=data.tradeid,

            price=data.price,

            volume=data.volume,

            tradecondition1=data.tradecondition1,

            tradecondition2=data.tradecondition2,

            marketid=data.marketid

        )


def get_context():

    return {"request": Request}


schema = strawberry.Schema(query=Query)

graphql_app = GraphQLRouter(schema, context_getter=get_context)


app.include_router(graphql_app, prefix="/graphql")


# CloudFormation Template for FastAPI with Load Balancer and Lambda

cft_template = {

    "AWSTemplateFormatVersion": "2010-09-09",

    "Resources": {

        "FastAPILambdaFunction": {

            "Type": "AWS::Lambda::Function",

            "Properties": {

                "Handler": "app.lambda_handler",

                "Runtime": "python3.8",

                "Code": {

                    "S3Bucket": "your-bucket-name",

                    "S3Key": "your-code.zip"

                },

                "MemorySize": 128,

                "Timeout": 30

            }

        },

        "ApiGateway": {

            "Type": "AWS::ApiGatewayV2::Api",

            "Properties": {

                "ProtocolType": "HTTP",

                "Target": {"Fn::GetAtt": ["FastAPILambdaFunction", "Arn"]}

            }

        },

        "LoadBalancer": {

            "Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",

            "Properties": {

                "Subnets": [{"Ref": "SubnetId"}],

                "SecurityGroups": [{"Ref": "SecurityGroupId"}]

            }

        }

    }

}


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


# Example of GraphQL Query

# Send a POST request to http://<your-load-balancer-dns-name>/graphql with headers:

# "username": "aaa"

# "password": "bbb"

# Body:

# {

#   "query": "query { trade(symbol: \"AAPL\", start_date: \"2024-03-01\", end_date: \"2024-03-10\") { feedmsgseq sourcetime symbol tradeid price volume tradecondition1 tradecondition2 marketid }}"

# }


No comments:

Post a Comment