Thursday, 1 August 2024

java code for kafka for protobuf and gradle

 

redpanda-protobuf-consumer/

── build.gradle

── gradle/

   └── wrapper/

       ── gradle-wrapper.jar

       └── gradle-wrapper.properties

── gradlew

── gradlew.bat

── settings.gradle

── src/

   ── main/

      ── java/

         └── com/

             └── example/

                 └── redpanda/

                     └── ConsumerApp.java

      ── proto/

         └── BQT_Cloud_Streaming.proto

      └── resources/

          └── application.properties

   └── test/

       └── java/

 

Protobuf File

Download the BQT_Cloud_Streaming.proto file and place it in the src/main/proto/ directory.

build.gradle

plugins {

    id 'java'

    id 'com.google.protobuf' version '0.9.4'

}

 

group = 'com.example.redpanda'

version = '1.0.0'

 

repositories {

    mavenCentral()

}

 

dependencies {

    implementation 'org.apache.kafka:kafka-clients:3.5.1'

    implementation 'com.google.protobuf:protobuf-java:3.24.3'

}

 

protobuf {

    protoc {

        artifact = "com.google.protobuf:protoc:3.24.3"

    }

    generateProtoTasks {

        all().each { task ->

            task.builtins {

                java { }

            }

        }

    }

}

 

java {

    sourceCompatibility = JavaVersion.VERSION_17

    targetCompatibility = JavaVersion.VERSION_17

}

 

 

settings.gradle

rootProject.name = 'redpanda-protobuf-consumer'

 

__

ConsumerApp.java

package com.example.redpanda;


import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;


import java.time.Duration;

import java.util.Collections;

import java.util.Properties;


// Import the generated Protobuf class from the 'pulse' package

import pulse.BQTMessageOuterClass.BQTMessage;


public class NYSE_BQTMessage_Consumer {


    public static void main(String[] args) {

        // Kafka consumer configuration settings

        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Change to your Kafka server

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id"); // Set your group ID

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Change as needed

       

        // SASL configuration

        properties.put("sasl.mechanism", "PLAIN"); // or another mechanism if required

        properties.put("security.protocol", "SASL_PLAINTEXT"); // Change to SASL_SSL if needed

        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +

                "username='your-username' " + // Set your username

                "password='your-password';"); // Set your password


        // Create Kafka consumer

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);


        // Subscribe to the topic

        consumer.subscribe(Collections.singletonList("bqt-topic"));


        System.out.println("Listening for messages on topic: bqt-topic");


        try {

            while (true) {

                // Poll for records

                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, byte[]> record : records) {

                    try {

                        // Parse the Protobuf message

                        BQTMessage message = BQTMessage.parseFrom(record.value());

                        System.out.println("Received message:");

                        System.out.println("Symbol: " + message.getSymbol());

                        System.out.println("Price: " + message.getPrice());

                        System.out.println("Timestamp: " + message.getTimestamp());

                    } catch (Exception e) {

                        System.err.println("Failed to parse message: " + e.getMessage());

                    }

                }

            }

        } finally {

            consumer.close();

        }

    }

}


Build the Project

./gradlew build

 

Run the Consumer

./gradlew run

 

Notes

  1. Protobuf Classes: When you run the Gradle build, the Protobuf plugin generates Java classes in build/generated/source/proto/main/java/. Use these classes (like BQTMessage above) in your code.
  2. Kafka Setup: Ensure Redpanda or Kafka is running and that the topic (bqt-topic) is available with Protobuf-encoded messages.
  3. Dependencies: Update dependency versions if needed.
  4. Testing: Use a tool like kafkacat or a custom producer to send test messages to your topic.

 

 

Where should i copy file which create by protoc complier?

 

When using the Gradle Protobuf plugin, you don’t need to manually copy files generated by the Protobuf compiler. Gradle will handle the compilation and place the generated files in a designated directory.

Where Protobuf Files Are Generated

After running ./gradlew build, the Protobuf plugin generates Java files from the .proto definitions and places them in:

 

build/generated/source/proto/main/java/

 

These files are automatically included in the project's classpath by Gradle.

 

Using Gradle's Automatic Protobuf Integration

If you use the Gradle Protobuf plugin, ensure your .proto files are in the following directory:

src/main/proto/

 

When you build the project, Gradle automatically:

  1. Compiles the .proto files.
  2. Places the generated .java files in build/generated/source/proto/main/java/.
  3. Includes them in your project's classpath.


.bat file

@echo off
REM Set root directory
set ROOT=C:\redpanda-protobuf-consumer

REM Create folder structure
mkdir "%ROOT%\src\main\java\com\example\redpanda"
mkdir "%ROOT%\src\main\proto"

REM Create build.gradle file
echo plugins {> "%ROOT%\build.gradle"
echo     id 'java'>> "%ROOT%\build.gradle"
echo     id 'com.google.protobuf' version '0.9.4'>> "%ROOT%\build.gradle"
echo }>> "%ROOT%\build.gradle"
echo.>> "%ROOT%\build.gradle"
echo group = 'com.example.redpanda'>> "%ROOT%\build.gradle"
echo version = '1.0.0'>> "%ROOT%\build.gradle"
echo.>> "%ROOT%\build.gradle"
echo repositories {>> "%ROOT%\build.gradle"
echo     mavenCentral()>> "%ROOT%\build.gradle"
echo }>> "%ROOT%\build.gradle"
echo.>> "%ROOT%\build.gradle"
echo dependencies {>> "%ROOT%\build.gradle"
echo     implementation 'org.apache.kafka:kafka-clients:3.5.1'>> "%ROOT%\build.gradle"
echo     implementation 'com.google.protobuf:protobuf-java:3.24.3'>> "%ROOT%\build.gradle"
echo }>> "%ROOT%\build.gradle"
echo.>> "%ROOT%\build.gradle"
echo protobuf {>> "%ROOT%\build.gradle"
echo     protoc {>> "%ROOT%\build.gradle"
echo         artifact = "com.google.protobuf:protoc:3.24.3">> "%ROOT%\build.gradle"
echo     }>> "%ROOT%\build.gradle"
echo     generateProtoTasks {>> "%ROOT%\build.gradle"
echo         all().each { task ->>> "%ROOT%\build.gradle"
echo             task.builtins {>> "%ROOT%\build.gradle"
echo                 java { }>> "%ROOT%\build.gradle"
echo             }>> "%ROOT%\build.gradle"
echo         }>> "%ROOT%\build.gradle"
echo     }>> "%ROOT%\build.gradle"
echo }>> "%ROOT%\build.gradle"
echo.>> "%ROOT%\build.gradle"
echo java {>> "%ROOT%\build.gradle"
echo     sourceCompatibility = JavaVersion.VERSION_17>> "%ROOT%\build.gradle"
echo     targetCompatibility = JavaVersion.VERSION_17>> "%ROOT%\build.gradle"
echo }>> "%ROOT%\build.gradle"

REM Create settings.gradle file
echo rootProject.name = 'redpanda-protobuf-consumer'> "%ROOT%\settings.gradle"

REM Create NYSE_BQTMessage_Consumer.java file
echo package com.example.redpanda;> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo.>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import org.apache.kafka.clients.consumer.ConsumerConfig;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import org.apache.kafka.clients.consumer.ConsumerRecords;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import org.apache.kafka.clients.consumer.KafkaConsumer;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import org.apache.kafka.clients.consumer.ConsumerRecord;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo.>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import java.time.Duration;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import java.util.Collections;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import java.util.Properties;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo.>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo import nyse.bqt.BQTMessageOuterClass.BQTMessage;>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo.>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo public class NYSE_BQTMessage_Consumer {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo     public static void main(String[] args) {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         Properties properties = new Properties();>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bqt-consumer-group");>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo.>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         consumer.subscribe(Collections.singletonList("bqt-topic"));>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         System.out.println("Listening for messages on topic: bqt-topic");>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo.>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         try {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo             while (true) {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                 ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                 for (ConsumerRecord<String, byte[]> record : records) {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                     try {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                         BQTMessage message = BQTMessage.parseFrom(record.value());>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                         System.out.println("Received: " + message.getSymbol());>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                     } catch (Exception e) {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                         System.err.println("Error: " + e.getMessage());>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                     }>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo                 }>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo             }>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         } finally {>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo             consumer.close();>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo         }>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo     }>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"
echo }>> "%ROOT%\src\main\java\com\example\redpanda\NYSE_BQTMessage_Consumer.java"

REM Create placeholder Protobuf file
echo // Placeholder for BQT_Cloud_Streaming.proto> "%ROOT%\src\main\proto\BQT_Cloud_Streaming.proto"

echo Done! The project structure has been created.
pause

 readme

# NYSE BQT Message Consumer

 

This project is a Java-based Kafka consumer that reads messages from a Kafka topic. The messages are serialized using Protocol Buffers (Protobuf) format. The consumer connects to a Kafka cluster using SASL authentication.

 

## Project Structure

 

 

## Required Versions

 

- **Java**: Version 17

- **Gradle**: Version 8.0 or higher

- **Kafka**: Version 3.5.1

- **Protobuf**: Version 3.24.3

 

## Gradle Configuration

 

### build.gradle

 

This file contains the configuration needed to build the project:

 

```groovy

plugins {

    id 'java'

    id 'com.google.protobuf' version '0.9.4'

    id 'application'

}

 

mainClassName = 'com.example.redpanda.NYSE_BQTMessage_Consumer'

 

repositories {

    mavenCentral()

}

 

dependencies {

    implementation 'org.apache.kafka:kafka-clients:3.5.1'

    implementation 'com.google.protobuf:protobuf-java:3.24.3'

}

 

protobuf {

    protoc {

        artifact = "com.google.protobuf:protoc:3.24.3"

    }

    generateProtoTasks {

        all().each { task ->

            task.builtins {

                java { }

            }

        }

    }

}

 

java {

    sourceCompatibility = JavaVersion.VERSION_17

    targetCompatibility = JavaVersion.VERSION_17

}

 

 

Running the Consumer

  1. Build the Project: Use the following command to build the project:

bash

Copy code

./gradlew clean build   # For Linux/Mac

gradlew clean build      # For Windows

  1. Run the Consumer: Start the consumer application with:

bash

Copy code

./gradlew run   # For Linux/Mac

gradlew run      # For Windows

 


No comments:

Post a Comment