Sunday, 23 March 2025

Python that create API for Redpanda

from fastapi import FastAPI, HTTPException, Query, Depends

from google.protobuf.json_format import MessageToJson

import puls_pb2  # Import your compiled Protobuf file

from datetime import datetime


app = FastAPI()


def authenticate(username: str, password: str):

    # Simulated authentication (replace with actual logic)

    if username != "aaa" or password != "bbb":

        raise HTTPException(status_code=401, detail="Invalid credentials")


def get_redpanda_data(symbol: str, start_date: str, end_date: str):

    # Simulated data retrieval logic (replace with actual Redpanda query)

    trade = puls_pb2.Trade(

        feedmsgseq=123456789,

        sourcetime=1631023200,

        symbol=symbol,

        tradeid=1001,

        price=150.5,

        volume=200,

        tradecondition1="A",

        tradecondition2="B",

        marketid="NASDAQ"

    )

    return MessageToJson(trade)


@app.get("/trade/{symbol}")

def get_trade(

    symbol: str,

    start_date: str = Query(..., description="Start date in YYYY-MM-DD format"),

    end_date: str = Query(..., description="End date in YYYY-MM-DD format"),

    username: str = Query(..., description="Redpanda username"),

    password: str = Query(..., description="Redpanda password")

):

    authenticate(username, password)

    try:

        # Validate date format

        datetime.strptime(start_date, "%Y-%m-%d")

        datetime.strptime(end_date, "%Y-%m-%d")

    except ValueError:

        raise HTTPException(status_code=400, detail="Invalid date format")

    

    return get_redpanda_data(symbol, start_date, end_date)


# CloudFormation Template for FastAPI with Load Balancer and Lambda

cft_template = {

    "AWSTemplateFormatVersion": "2010-09-09",

    "Resources": {

        "FastAPILambdaFunction": {

            "Type": "AWS::Lambda::Function",

            "Properties": {

                "Handler": "app.lambda_handler",

                "Runtime": "python3.8",

                "Code": {

                    "S3Bucket": "your-bucket-name",

                    "S3Key": "your-code.zip"

                },

                "MemorySize": 128,

                "Timeout": 30

            }

        },

        "ApiGateway": {

            "Type": "AWS::ApiGatewayV2::Api",

            "Properties": {

                "ProtocolType": "HTTP",

                "Target": {"Fn::GetAtt": ["FastAPILambdaFunction", "Arn"]}

            }

        },

        "LoadBalancer": {

            "Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",

            "Properties": {

                "Subnets": [{"Ref": "SubnetId"}],

                "SecurityGroups": [{"Ref": "SecurityGroupId"}]

            }

        }

    }

}


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


No comments:

Post a Comment