Sunday, 30 March 2025

Install nvme-cli for ARM based linux

 


to install the nvme-cli utility on your Graviton-based (ARM architecture) Red Hat Enterprise Linux (RHEL) EC2 instance without direct internet access, you can download the necessary RPM packages on your local Windows machine and then transfer them to your EC2 instance. Here's how you can proceed:

1. Determine Your RHEL Version and Architecture:

  • Connect to your EC2 instance and run the following commands to identify your RHEL version and system architecture:


    cat /etc/os-release uname -m
  • Note the VERSION_ID (e.g., 8.4) and architecture (aarch64 for ARM).

2. Download the nvme-cli RPM Package and Dependencies:

  • On your Windows machine, open a web browser and navigate to a reliable RPM repository such as RPM Find.

  • Search for the nvme-cli package that matches your RHEL version and ARM architecture (aarch64). For example, for RHEL 8 on ARM, you might find a package like nvme-cli-1.12-2.el8.aarch64.rpm.
    https://rpmfind.net/linux/RPM/centos-stream/10/baseos/aarch64/nvme-cli-2.11-5.el10.aarch64.html

  • Download the appropriate nvme-cli RPM package.

  • Identify and Download Dependencies:

    • nvme-cli may have dependencies that need to be installed alongside it. Common dependencies include libnvme and json-c.

    • Search for and download the RPM packages for these dependencies that match your RHEL version and ARM architecture.

3. Transfer the RPM Packages to Your EC2 Instance:

  • Use a secure method to transfer the downloaded RPM packages from your Windows machine to the EC2 instance. One common method is using scp (Secure Copy Protocol):

    • If you have an SSH client like PuTTY installed, you can use pscp (the SCP client from PuTTY) to transfer files. Open the Command Prompt on your Windows machine and run:


      pscp path\to\your\file username@your-ec2-instance-ip:/destination/path
    • Replace path\to\your\file with the path to your downloaded RPM package, username with your EC2 instance username (e.g., ec2-user), your-ec2-instance-ip with the public IP address of your EC2 instance, and /destination/path with the directory on the EC2 instance where you want to place the file.

4. Install the RPM Packages on Your EC2 Instance:

  • Connect to your EC2 instance and navigate to the directory containing the transferred RPM files.

  • Install the packages using the rpm command:


    sudo rpm -Uvh package-name.rpm

    Install each package, starting with the dependencies and finishing with nvme-cli.

  • If you encounter dependency issues, ensure that all required packages are present and installed in the correct order.

5. Verify the Installation:

  • After installation, verify that nvme-cli is correctly installed by running:


    nvme --version
  • This should display the installed version of nvme-cli, confirming a successful installation.

Additional Considerations:

  • Dependency Management: Manually managing dependencies can be complex. Ensure that all required libraries and packages are compatible with your system to prevent issues.

  • Security: Only download packages from official and trusted repositories to maintain the security and integrity of your system.

  • Alternative Methods: If manual installation becomes cumbersome, consider setting up a local repository or using AWS services like Systems Manager to manage packages on instances without direct internet access.




Sunday, 23 March 2025

CloudFormation Template (CFT) to host your FastAPI app behind an AWS ALB

 AWSTemplateFormatVersion: '2010-09-09'

Resources:

  FastAPILambdaFunction:

    Type: AWS::Lambda::Function

    Properties:

      FunctionName: FastAPITradeGraphQL

      Handler: lambda_function.lambda_handler

      Runtime: python3.11

      Timeout: 30

      MemorySize: 512

      Code:

        S3Bucket: !Sub "${AWS::AccountId}-fastapi-app"

        S3Key: "fastapi_trade_graphql.zip"

      Role: !GetAtt LambdaExecutionRole.Arn


  LambdaExecutionRole:

    Type: AWS::IAM::Role

    Properties:

      RoleName: FastAPILambdaExecutionRole

      AssumeRolePolicyDocument:

        Version: '2012-10-17'

        Statement:

          - Effect: Allow

            Principal:

              Service: lambda.amazonaws.com

            Action: sts:AssumeRole

      Policies:

        - PolicyName: LambdaBasicExecution

          PolicyDocument:

            Version: '2012-10-17'

            Statement:

              - Effect: Allow

                Action:

                  - logs:CreateLogGroup

                  - logs:CreateLogStream

                  - logs:PutLogEvents

                Resource: "*"


  FastAPILambdaPermission:

    Type: AWS::Lambda::Permission

    Properties:

      Action: lambda:InvokeFunction

      FunctionName: !GetAtt FastAPILambdaFunction.Arn

      Principal: apigateway.amazonaws.com


  FastAPIAPI:

    Type: AWS::ApiGatewayV2::Api

    Properties:

      Name: FastAPITradeGraphQLAPI

      ProtocolType: HTTP


  FastAPIIntegration:

    Type: AWS::ApiGatewayV2::Integration

    Properties:

      ApiId: !Ref FastAPIAPI

      IntegrationType: AWS_PROXY

      IntegrationUri: !GetAtt FastAPILambdaFunction.Arn


  FastAPIRoute:

    Type: AWS::ApiGatewayV2::Route

    Properties:

      ApiId: !Ref FastAPIAPI

      RouteKey: "POST /graphql"

      Target: !Sub "integrations/${FastAPIIntegration}"


  FastAPIStage:

    Type: AWS::ApiGatewayV2::Stage

    Properties:

      ApiId: !Ref FastAPIAPI

      StageName: "$default"

      AutoDeploy: true


  FastAPILoadBalancer:

    Type: AWS::ElasticLoadBalancingV2::LoadBalancer

    Properties:

      Name: FastAPITradeGraphQLLB

      Scheme: internet-facing

      Type: application

      Subnets:

        - subnet-xxxxxxxx

        - subnet-yyyyyyyy

      SecurityGroups:

        - !GetAtt FastAPILBSecurityGroup.GroupId


  FastAPILBSecurityGroup:

    Type: AWS::EC2::SecurityGroup

    Properties:

      GroupDescription: Allow HTTP

      SecurityGroupIngress:

        - IpProtocol: tcp

          FromPort: 80

          ToPort: 80

          CidrIp: 0.0.0.0/0


Outputs:

  FastAPIInvokeURL:

    Value: !Sub "https://${FastAPIAPI}.execute-api.${AWS::Region}.amazonaws.com/"


# Instructions to upload Lambda code:

# 1. Zip your FastAPI app code (including dependencies) into fastapi_trade_graphql.zip.

# 2. Upload the zip to an S3 bucket named '<account-id>-fastapi-app' (replace with your AWS Account ID).

#    aws s3 cp fastapi_trade_graphql.zip s3://<account-id>-fastapi-app/

# 3. Deploy the CloudFormation template:

#    aws cloudformation create-stack --stack-name FastAPITradeGraphQLStack --template-body file://fastapi_cft.yaml --capabilities CAPABILITY_IAM


# FastAPI Code:

from fastapi import FastAPI, HTTPException, Request, Depends

from google.protobuf.json_format import MessageToJson

from google.protobuf.message import DecodeError

from confluent_kafka import Consumer, KafkaException

import puls_pb2  # Import your compiled Protobuf file

from datetime import datetime, timedelta

import strawberry

from strawberry.fastapi import GraphQLRouter

import time


app = FastAPI()


def authenticate(request: Request):

    username = request.headers.get("username")

    password = request.headers.get("password")

    if username != "aaa" or password != "bbb":

        raise HTTPException(status_code=401, detail="Invalid credentials")


def get_redpanda_data(symbol: str, start_timestamp: int, end_timestamp: int):

    conf = {

        'bootstrap.servers': 'localhost:9092',  # Change to your Redpanda broker

        'group.id': 'redpanda-group',

        'auto.offset.reset': 'earliest'

    }

    consumer = Consumer(conf)

    consumer.subscribe(["trades"])


    trades = []

    try:

        while True:

            msg = consumer.poll(1.0)

            if msg is None:

                break

            if msg.error():

                raise KafkaException(msg.error())


            trade_msg = puls_pb2.Trade()

            try:

                trade_msg.ParseFromString(msg.value())

                if (trade_msg.symbol == symbol and

                        start_timestamp <= trade_msg.sourcetime <= end_timestamp):

                    trades.append(MessageToJson(trade_msg))

            except DecodeError:

                continue

    finally:

        consumer.close()

    

    return trades if trades else None


@strawberry.type

class Trade:

    feedmsgseq: int

    sourcetime: int

    symbol: str

    tradeid: int

    price: float

    volume: int

    tradecondition1: str

    tradecondition2: str

    marketid: str


@strawberry.type

class Query:

    @strawberry.field

    def trade(

        self, 

        symbol: str, 

        last_hours: int = None, 

        info: strawberry.types.Info

    ) -> list[Trade]:

        authenticate(info.context['request'])


        end_timestamp = int(time.time())

        start_timestamp = end_timestamp - (last_hours * 3600)

        

        data_json_list = get_redpanda_data(symbol, start_timestamp, end_timestamp)

        if not data_json_list:

            raise HTTPException(status_code=404, detail="No trade data found")


        trades = []

        for data_json in data_json_list:

            data = puls_pb2.Trade()

            data.ParseFromString(data_json.encode())

            trades.append(Trade(

                feedmsgseq=data.feedmsgseq,

                sourcetime=data.sourcetime,

                symbol=data.symbol,

                tradeid=data.tradeid,

                price=data.price,

                volume=data.volume,

                tradecondition1=data.tradecondition1,

                tradecondition2=data.tradecondition2,

                marketid=data.marketid

            ))

        return trades


def get_context():

    return {"request": Request}


schema = strawberry.Schema(query=Query)

graphql_app = GraphQLRouter(schema, context_getter=get_context)


app.include_router(graphql_app, prefix="/graphql")


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


FastAPI Lambda with GraphQL & Kafka Integration:

 

FastAPI Lambda with GraphQL & KafkaIntegration: Complete Solution

Step 1: Lambda Code (app.py)

from fastapi import FastAPI, HTTPException, Request
from google.protobuf.json_format import MessageToJson
import puls_pb2  # Your compiled Protobuf file
from datetime import datetime
import strawberry
from strawberry.fastapi import GraphQLRouter
from mangum import Mangum  # Lambda integration package

app = FastAPI()

def authenticate(request: Request):
    username = request.headers.get("username")
    password = request.headers.get("password")
    if username != "aaa" or password != "bbb":
        raise HTTPException(status_code=401, detail="Invalid credentials")

@strawberry.type
class Trade:
    feedmsgseq: int
    sourcetime: int
    symbol: str
    tradeid: int
    price: float
    volume: int
    tradecondition1: str
    tradecondition2: str
    marketid: str

@strawberry.type
class Query:
    @strawberry.field
    def trade(self, symbol: str, start_date: str, end_date: str, info: strawberry.types.Info) -> Trade:
        authenticate(info.context['request'])
        try:
            # Validate date format
            datetime.strptime(start_date, "%Y-%m-%d")
            datetime.strptime(end_date, "%Y-%m-%d")
        except ValueError:
            raise HTTPException(status_code=400, detail="Invalid date format")

        data = puls_pb2.Trade(
            feedmsgseq=123456789,
            sourcetime=1631023200,
            symbol=symbol,
            tradeid=1001,
            price=150.5,
            volume=200,
            tradecondition1="A",
            tradecondition2="B",
            marketid="NASDAQ"
        )
        return Trade(
            feedmsgseq=data.feedmsgseq,
            sourcetime=data.sourcetime,
            symbol=data.symbol,
            tradeid=data.tradeid,
            price=data.price,
            volume=data.volume,
            tradecondition1=data.tradecondition1,
            tradecondition2=data.tradecondition2,
            marketid=data.marketid
        )

# Setup the GraphQL router
schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema)

# Include the GraphQL app
app.include_router(graphql_app, prefix="/graphql")

# Use Mangum to integrate with Lambda
handler = Mangum(app)

Step 2: Prepare Lambda Package

  1. Create a requirements.txt file:


fastapi==0.95.0
mangum==0.10.0
strawberry-graphql==0.19.1
google==3.0.3
protobuf==3.20.1


Step 3 Install dependencies:
pip install -r requirements.txt -t lambda_package/


Step 4 Package Lambda Code:
cd lambda_package
zip -r ../your-code.zip .

This will create the your-code.zip file containing all the necessary code and dependencies.

Step 5: Upload Lambda Code to S3

Go to the AWS S3 console.
Upload the your-code.zip file to an S3 bucket.

Step 6: CloudFormation Template (CFT)

Here is the CloudFormation Template (CFT) for deploying the Lambda, API Gateway, and Load Balancer.

AWSTemplateFormatVersion: '2010-09-09'

Resources:

  

  # Lambda Function

  FastAPILambdaFunction:

    Type: 'AWS::Lambda::Function'

    Properties:

      Handler: 'app.handler'

      Runtime: 'python3.8'

      Code:

        S3Bucket: 'your-bucket-name'  # Replace with your S3 bucket name

        S3Key: 'your-code.zip'        # Replace with the path to your zip file in S3

      MemorySize: 128

      Timeout: 30

      Environment:

        Variables:

          ENVIRONMENT: 'production'


  # API Gateway (HTTP API)

  ApiGateway:

    Type: 'AWS::ApiGatewayV2::Api'

    Properties:

      ProtocolType: 'HTTP'

      Target:

        Fn::GetAtt:

          - 'FastAPILambdaFunction'

          - 'Arn'


  # Load Balancer

  LoadBalancer:

    Type: 'AWS::ElasticLoadBalancingV2::LoadBalancer'

    Properties:

      Subnets:

        - Ref: 'SubnetId'

      SecurityGroups:

        - Ref: 'SecurityGroupId'


Outputs:

  FastAPIAppURL:

    Description: 'URL of the FastAPI application'

    Value: !Sub 'http://${LoadBalancer.DNSName}/graphql'


Key Parts of the CloudFormation Template:

  • Lambda Function (FastAPILambdaFunction): Defines the Lambda function that runs your FastAPI code. The .zip file from S3 is used as the code source.

  • API Gateway (ApiGateway): The HTTP API Gateway that triggers the Lambda function for HTTP requests.

  • Load Balancer (LoadBalancer): An Application Load Balancer to distribute traffic.


Step 7: Deploying CloudFormation

  1. Upload the CloudFormation Template to AWS CloudFormation via the AWS Console or AWS CLI.

  2. Update the S3 bucket and .zip file paths in the template (your-bucket-name and your-code.zip).

  3. Deploy the CloudFormation stack. This will automatically create:

    • The Lambda function with your FastAPI code.

    • The API Gateway to handle incoming HTTP requests.

    • The Load Balancer to distribute traffic.

Step 8: Example GraphQL Query

Once the stack is deployed, you can query your FastAPI app using GraphQL. The app is exposed through the Load Balancer's DNS name.

GraphQL Query Example (POST request to http://<your-load-balancer-dns-name>/graphql):


{

  "query": "query { trade(symbol: \"AAPL\", start_date: \"2024-03-01\", end_date: \"2024-03-10\") { feedmsgseq sourcetime symbol tradeid price volume tradecondition1 tradecondition2 marketid }}"

}

Summary

This document guides you through:

  1. Lambda Code: Writing a FastAPI application integrated with GraphQL.

  2. Packaging the Lambda: Installing dependencies and creating a deployable .zip file.

  3. CloudFormation Template: Automating the deployment of the Lambda function, API Gateway, and Load Balancer.

  4. Deployment: Instructions on uploading the Lambda code to S3 and deploying with CloudFormation.

  5. Querying: How to query the deployed FastAPI app via GraphQL.

Python that create API for Redpanda using Graphql

 from fastapi import FastAPI, HTTPException, Request, Depends

from google.protobuf.json_format import MessageToJson

import puls_pb2  # Import your compiled Protobuf file

from datetime import datetime

import strawberry

from strawberry.fastapi import GraphQLRouter


app = FastAPI()


def authenticate(request: Request):

    username = request.headers.get("username")

    password = request.headers.get("password")

    if username != "aaa" or password != "bbb":

        raise HTTPException(status_code=401, detail="Invalid credentials")


def get_redpanda_data(symbol: str, start_date: str, end_date: str):

    # Simulated data retrieval logic (replace with actual Redpanda query)

    trade = puls_pb2.Trade(

        feedmsgseq=123456789,

        sourcetime=1631023200,

        symbol=symbol,

        tradeid=1001,

        price=150.5,

        volume=200,

        tradecondition1="A",

        tradecondition2="B",

        marketid="NASDAQ"

    )

    return MessageToJson(trade)


@strawberry.type

class Trade:

    feedmsgseq: int

    sourcetime: int

    symbol: str

    tradeid: int

    price: float

    volume: int

    tradecondition1: str

    tradecondition2: str

    marketid: str


@strawberry.type

class Query:

    @strawberry.field

    def trade(

        self, 

        symbol: str, 

        start_date: str, 

        end_date: str, 

        info: strawberry.types.Info

    ) -> Trade:

        authenticate(info.context['request'])

        try:

            # Validate date format

            datetime.strptime(start_date, "%Y-%m-%d")

            datetime.strptime(end_date, "%Y-%m-%d")

        except ValueError:

            raise HTTPException(status_code=400, detail="Invalid date format")

        

        data = puls_pb2.Trade(

            feedmsgseq=123456789,

            sourcetime=1631023200,

            symbol=symbol,

            tradeid=1001,

            price=150.5,

            volume=200,

            tradecondition1="A",

            tradecondition2="B",

            marketid="NASDAQ"

        )

        return Trade(

            feedmsgseq=data.feedmsgseq,

            sourcetime=data.sourcetime,

            symbol=data.symbol,

            tradeid=data.tradeid,

            price=data.price,

            volume=data.volume,

            tradecondition1=data.tradecondition1,

            tradecondition2=data.tradecondition2,

            marketid=data.marketid

        )


def get_context():

    return {"request": Request}


schema = strawberry.Schema(query=Query)

graphql_app = GraphQLRouter(schema, context_getter=get_context)


app.include_router(graphql_app, prefix="/graphql")


# CloudFormation Template for FastAPI with Load Balancer and Lambda

cft_template = {

    "AWSTemplateFormatVersion": "2010-09-09",

    "Resources": {

        "FastAPILambdaFunction": {

            "Type": "AWS::Lambda::Function",

            "Properties": {

                "Handler": "app.lambda_handler",

                "Runtime": "python3.8",

                "Code": {

                    "S3Bucket": "your-bucket-name",

                    "S3Key": "your-code.zip"

                },

                "MemorySize": 128,

                "Timeout": 30

            }

        },

        "ApiGateway": {

            "Type": "AWS::ApiGatewayV2::Api",

            "Properties": {

                "ProtocolType": "HTTP",

                "Target": {"Fn::GetAtt": ["FastAPILambdaFunction", "Arn"]}

            }

        },

        "LoadBalancer": {

            "Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",

            "Properties": {

                "Subnets": [{"Ref": "SubnetId"}],

                "SecurityGroups": [{"Ref": "SecurityGroupId"}]

            }

        }

    }

}


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


# Example of GraphQL Query

# Send a POST request to http://<your-load-balancer-dns-name>/graphql with headers:

# "username": "aaa"

# "password": "bbb"

# Body:

# {

#   "query": "query { trade(symbol: \"AAPL\", start_date: \"2024-03-01\", end_date: \"2024-03-10\") { feedmsgseq sourcetime symbol tradeid price volume tradecondition1 tradecondition2 marketid }}"

# }


Python that create API for Redpanda

from fastapi import FastAPI, HTTPException, Query, Depends

from google.protobuf.json_format import MessageToJson

import puls_pb2  # Import your compiled Protobuf file

from datetime import datetime


app = FastAPI()


def authenticate(username: str, password: str):

    # Simulated authentication (replace with actual logic)

    if username != "aaa" or password != "bbb":

        raise HTTPException(status_code=401, detail="Invalid credentials")


def get_redpanda_data(symbol: str, start_date: str, end_date: str):

    # Simulated data retrieval logic (replace with actual Redpanda query)

    trade = puls_pb2.Trade(

        feedmsgseq=123456789,

        sourcetime=1631023200,

        symbol=symbol,

        tradeid=1001,

        price=150.5,

        volume=200,

        tradecondition1="A",

        tradecondition2="B",

        marketid="NASDAQ"

    )

    return MessageToJson(trade)


@app.get("/trade/{symbol}")

def get_trade(

    symbol: str,

    start_date: str = Query(..., description="Start date in YYYY-MM-DD format"),

    end_date: str = Query(..., description="End date in YYYY-MM-DD format"),

    username: str = Query(..., description="Redpanda username"),

    password: str = Query(..., description="Redpanda password")

):

    authenticate(username, password)

    try:

        # Validate date format

        datetime.strptime(start_date, "%Y-%m-%d")

        datetime.strptime(end_date, "%Y-%m-%d")

    except ValueError:

        raise HTTPException(status_code=400, detail="Invalid date format")

    

    return get_redpanda_data(symbol, start_date, end_date)


# CloudFormation Template for FastAPI with Load Balancer and Lambda

cft_template = {

    "AWSTemplateFormatVersion": "2010-09-09",

    "Resources": {

        "FastAPILambdaFunction": {

            "Type": "AWS::Lambda::Function",

            "Properties": {

                "Handler": "app.lambda_handler",

                "Runtime": "python3.8",

                "Code": {

                    "S3Bucket": "your-bucket-name",

                    "S3Key": "your-code.zip"

                },

                "MemorySize": 128,

                "Timeout": 30

            }

        },

        "ApiGateway": {

            "Type": "AWS::ApiGatewayV2::Api",

            "Properties": {

                "ProtocolType": "HTTP",

                "Target": {"Fn::GetAtt": ["FastAPILambdaFunction", "Arn"]}

            }

        },

        "LoadBalancer": {

            "Type": "AWS::ElasticLoadBalancingV2::LoadBalancer",

            "Properties": {

                "Subnets": [{"Ref": "SubnetId"}],

                "SecurityGroups": [{"Ref": "SecurityGroupId"}]

            }

        }

    }

}


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


Sunday, 16 March 2025

Installing Java, Setting up Environment Variables, and Kafka Protobuf Consumer Program

 Complete Guide: Installing Java, Setting up Environment Variables, and Redpanda Protobuf Consumer Program 

1. Installing Java 

To install the latest version of Java (Java 17 or later), follow these steps: 

1. **Download the Latest JDK**: Visit the AdoptOpenJDK or OpenJDK website and download the latest version (Java 17 or later). 

2. **Install Java**: Follow installation steps based on your OS (Windows, Linux, etc.). 

3. **Verify Installation**: Run `java -version` in the terminal or Command Prompt to check the Java version. 

2. Setting Up Environment Variables 

After installing Java, you need to set up the environment variables for easy access to Java commands. 

### For Windows 
1. Right-click "This PC" > "Properties" > "Advanced system settings" > "Environment Variables". 
2. Under "System Variables", click "New" to create a new variable:  
   - **Variable Name**: `JAVA_HOME` 
   - **Variable Value**: `C:\Program Files\AdoptOpenJDK\jdk-17.0.1` (or your installation path) 
3. Find "Path" variable, click "Edit" and add the following path: 
   - `C:\Program Files\AdoptOpenJDK\jdk-17.0.1\bin` (or your installation path) 
4. Click "OK" to save the changes. 
 

### For Linux 
1. Open `.bashrc` or `.bash_profile` in a text editor:  
   ```bash 
   nano ~/.bashrc 
   ``` 
2. Add the following lines to the file: 
   ```bash 
   export JAVA_HOME=/usr/lib/jvm/java-17-openjdk 
   export PATH=$JAVA_HOME/bin:$PATH 
   ``` 
3. Save the file and run `source ~/.bashrc` to reload the file. 
4. Verify by running `echo $JAVA_HOME` and `java -version`. 
 

3. Gradle Setup and Dependencies 

Now let's set up your project using Gradle and add the necessary dependencies. 

1. **Create Project Structure**: 
   Your project structure should look like this: 
   ``` 
   <project-root> 
   ├── build.gradle 
   └── src 
       └── main 
           └── java 
               └── com 
                   └── nnnn 
                       └── pp 
                           └── RedpandaProtobufConsumer.java 
   ``` 

2. **Add Dependencies in build.gradle**: 
Below is the Gradle configuration file to include Kafka and Protobuf dependencies. 
```gradle 
plugins { 
    id 'java' 
} 
 
group = 'com.nnnn' 
version = '1.0-SNAPSHOT' 
sourceCompatibility = '17' 
 
repositories { 
    mavenCentral() 
} 
 
dependencies { 
    implementation 'org.apache.kafka:kafka-clients:3.0.0' 
    implementation 'com.google.protobuf:protobuf-java:3.19.1' 
    implementation 'org.apache.kafka:kafka-protobuf-serializer:3.0.0' 
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' 
    testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.7.0' 
} 
 
test { 
    useJUnitPlatform() 
} 
``` 

3. **Install Gradle**: 
Follow the [Gradle Installation Guide](https://gradle.org/install/) to set up Gradle on your system. 

4. **Build the Project**: After adding dependencies, run the following command to build your project: 
```bash 
gradle build 
``` 

4. Protobuf Model Setup 

Create a `.proto` file for your Protobuf model, such as `Ticket.proto`: 
```protobuf 
syntax = "proto3"; 
package com.nnnn.pp; 
 
message Ticket { 
  string ticketId = 1; 
  string ticketType = 2; 
  int32 amount = 3; 
} 
``` 

After generating the Java classes or writing them manually, the `Ticket` class might look like this: 
```java 
package com.nnnn.pp; 
 
public class Ticket { 
    private String ticketId; 
    private String ticketType; 
    private int amount; 
 
    // Getters and setters 
} 
``` 

5. Redpanda Protobuf Consumer Program 

Here is the implementation of the consumer program that connects to Redpanda and fetches Protobuf messages: 
 

```java 
package com.nnnn.pp; 
 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.serialization.ByteArrayDeserializer; 
import com.google.protobuf.InvalidProtocolBufferException; 
import org.apache.kafka.common.serialization.Deserializer; 
 
public class RedpandaProtobufConsumer { 
    public static void main(String[] args) { 
        String bootstrapServers = "localhost:9092"; 
        String groupId = "example-group"; 
        String topic = "your-topic"; 
 
        Properties properties = new Properties(); 
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); 
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class); 
 
        KafkaConsumer<byte[], Ticket> consumer = new KafkaConsumer<>(properties); 
        consumer.subscribe(Collections.singletonList(topic)); 
 
        while (true) { 
            var records = consumer.poll(1000); 
            records.forEach(record -> { 
                try { 
                    Ticket ticket = Ticket.parseFrom(record.value()); 
                    System.out.println("Ticket ID: " + ticket.getTicketId()); 
                    System.out.println("Ticket Type: " + ticket.getTicketType()); 
                    System.out.println("Ticket Amount: " + ticket.getAmount()); 
                } catch (InvalidProtocolBufferException e) { 
                    e.printStackTrace(); 
                } 
            }); 
        } 
    } 
 
    static class ProtobufDeserializer implements Deserializer<byte[]> { 
        @Override 
        public byte[] deserialize(String topic, byte[] data) { 
            return data; 
        } 
    } 
} 
``` 

6. Running the Program 

1. **Build the Project**: Run `gradle build` to compile the project. 
2. **Run the Program**: Run `gradle run` to start consuming messages.