Saturday, 9 December 2023

Java code for Kafka

 

Step 1: Install Protocol Buffers (protoc)

  1. Download Protocol Buffers:
  2. Install Protocol Buffers:
    • Extract the downloaded file and move the protoc binary to a directory included in your PATH.

Example for Linux/Mac:

bash

sudo mv protoc /usr/local/bin/

Example for Windows:

    • Add the protoc binary path to your system environment variables.
  1. Verify Installation: Run the following command to confirm installation:

bash

 

protoc --version


Step 2: Generate Java Classes from Proto File

  1. Locate your .proto file (e.g.,  PPP_Cloud_Streaming.proto).
  2. Run protoc to generate Java classes:

bash

 

protoc --java_out=PATH_TO_OUTPUT_DIRECTORY PATH_TO_PROTO_FILE

Example:

bash

 

protoc --java_out=./src/main/java  PPP_Cloud_Streaming.proto

  1. Include the generated Java file in your Java project.

Step 3: Set Up Java Project with Kafka Dependencies

  1. Create a Maven or Gradle project.
  2. Add the required dependencies to pom.xml (for Maven) or build.gradle (for Gradle).

Maven:

Add the following dependencies for Kafka and Protocol Buffers:

xml

 

<dependencies>

    <!-- Kafka Client -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>3.5.1</version> <!-- Use the appropriate Kafka version -->

    </dependency>

    <!-- Protocol Buffers -->

    <dependency>

        <groupId>com.google.protobuf</groupId>

        <artifactId>protobuf-java</artifactId>

        <version>3.24.0</version> <!-- Use the version matching your protoc -->

    </dependency>

</dependencies>

Gradle:

Add the dependencies to build.gradle:

gradle

 

dependencies {

    // Kafka Client

    implementation 'org.apache.kafka:kafka-clients:3.5.1'

    // Protocol Buffers

    implementation 'com.google.protobuf:protobuf-java:3.24.0'

}


Step 4: Java Code Implementation

Here’s a Java equivalent of your Python Kafka consumer:

java

 

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

 

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

 

// Import generated Protobuf classes

import your.package. PPPCloudStreaming;

 

public class KafkaProtobufConsumer {

    private final KafkaConsumer<String, byte[]> consumer;

    private final String topicName;

 

    public KafkaProtobufConsumer(String topicName) {

        this.topicName = topicName;

 

        // Kafka consumer configuration

        Properties props = new Properties();

        props.setProperty("bootstrap.servers", "broker.lt.use1. :9094"); // Adjust as needed

        props.setProperty("group.id", "my-group");

        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

        props.setProperty("security.protocol", "SASL_PLAINTEXT");

        props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"SCRAM\" password=\"password\";");

 

        this.consumer = new KafkaConsumer<>(props);

    }

 

    public void start() {

        consumer.subscribe(Collections.singletonList(topicName));

        try {

            while (true) {

                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, byte[]> record : records) {

                    // Deserialize Protobuf message

                     PPPCloudStreaming. PPPMessage message =  PPPCloudStreaming. PPPMessage.parseFrom(record.value());

 

                    // Process the message

                    System.out.println("Received message: " + message);

                }

            }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            consumer.close();

        }

    }

 

    public static void main(String[] args) {

        KafkaProtobufConsumer consumer = new KafkaProtobufConsumer(" PPP_trd_str_1");

        consumer.start();

    }

}


Key Points:

  1. Replace your.package in the Java code with the package where the generated Protobuf classes are located.
  2. Make sure the Protobuf .proto file is up to date with the message structure.
  3. Test the application by consuming messages from your Kafka topic.

 

No comments:

Post a Comment