Monday, 1 January 2024

Java and proto

 

1. Prerequisites

Install Java

Make sure you have the Java Development Kit (JDK) installed (version 8 or higher). Verify installation:

bash

 

java -version

javac -version

If not installed, download it from the Oracle JDK website or use an open-source version like OpenJDK.

Install Apache Maven

Maven is required to manage dependencies in the Java project. Verify installation:

bash

 

mvn -version

If not installed, download it from the Maven website.


2. Create and Compile the .proto File

Step 1: Write the .proto File

Let’s assume the file TTTMessage.proto contains:

proto

 

syntax = "proto3";

 

message TTTMessage {

  string id = 1;

  string symbol = 2;

  int64 timestamp = 3;

  float price = 4;

  int32 volume = 5;

}

Step 2: Compile the .proto File

Use protoc to generate Java classes from the .proto file.

Install Protoc

If not installed, download protoc from the Protocol Buffers GitHub releases.

Generate Java Classes

Run the following command to generate the Java class:

bash

 

protoc --java_out=./src/main/java TTTMessage.proto

This will generate the TTTMessage.java file inside the specified directory (./src/main/java).


3. Create a Maven Project

  1. Create a new Maven project:

bash

 

mvn archetype:generate -DgroupId=com.example.kafka -DartifactId=kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

cd kafka-consumer

  1. Add dependencies to the pom.xml file:

xml

 

<dependencies>

    <!-- Kafka Client -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>3.5.1</version>

    </dependency>

 

    <!-- Protobuf -->

    <dependency>

        <groupId>com.google.protobuf</groupId>

        <artifactId>protobuf-java</artifactId>

        <version>3.24.0</version>

    </dependency>

</dependencies>

  1. Add the generated TTTMessage.java file to the src/main/java folder in your Maven project.

4. Implement the Kafka Consumer

Here’s the complete Java implementation:

java

 

package com.example.kafka;

 

import com.google.protobuf.InvalidProtocolBufferException;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

 

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

 

// Import the generated Protobuf class

import com.example.protobuf.TTTMessageProtos.TTTMessage;

 

public class Consumer {

    public static void main(String[] args) {

        // Kafka topic

        String topic = "TTT_trd_str_1";

 

        // Kafka consumer properties

        Properties props = new Properties();

        props.put("bootstrap.servers", "broker.TTT.pulse.ABCD:9092");

        props.put("group.id", "group_id"); // Replace with your consumer group

        props.put("key.deserializer", StringDeserializer.class.getName());

        props.put("value.deserializer", org.apache.kafka.common.serialization.ByteArrayDeserializer.class.getName());

        props.put("auto.offset.reset", "latest");

        props.put("enable.auto.commit", "true");

 

        // Create Kafka consumer

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

 

        // Subscribe to the topic

        consumer.subscribe(Collections.singletonList(topic));

        System.out.println("Consumer started for topic: " + topic);

 

        try {

            // Poll messages

            while (true) {

                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));

 

                for (ConsumerRecord<String, byte[]> record : records) {

                    System.out.println("Raw Kafka Message Key: " + record.key());

                    System.out.println("Raw Kafka Message Value: " + new String(record.value()));

 

                    // Deserialize the message using Protobuf

                    try {

                        TTTMessage message = TTTMessage.parseFrom(record.value());

                        System.out.println("Deserialized Protobuf Message: ");

                        System.out.println("ID: " + message.getId());

                        System.out.println("Symbol: " + message.getSymbol());

                        System.out.println("Timestamp: " + message.getTimestamp());

                        System.out.println("Price: " + message.getPrice());

                        System.out.println("Volume: " + message.getVolume());

                    } catch (InvalidProtocolBufferException e) {

                        System.err.println("Failed to deserialize Protobuf message: " + e.getMessage());

                    }

                }

            }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            consumer.close();

        }

    }

}


5. Project Directory Structure

The project directory should look like this:

lua

 

/kafka-consumer

|-- pom.xml

|-- src

    |-- main

        |-- java

            |-- com

                |-- example

                    |-- kafka

                        |-- Consumer.java

                    |-- protobuf

                        |-- TTTMessage.java


6. Run the Kafka Consumer

  1. Compile the project:

bash

 

mvn clean install

  1. Run the consumer:

bash

 

java -cp target/kafka-consumer-1.0-SNAPSHOT.jar com.example.kafka.Consumer


7. Explanation of Code

  1. Kafka Consumer Setup:
    • The KafkaConsumer connects to the Kafka broker and subscribes to the specified topic.
    • Messages are deserialized as byte arrays since Protobuf encodes data in binary format.
  2. Protobuf Message Parsing:
    • The generated TTTMessage class is used to parse the byte array.
    • parseFrom(record.value()) converts the binary message into a Protobuf object.
  3. Deserialized Data:
    • The Protobuf object allows access to fields like id, symbol, timestamp, etc.

8. Debugging Tips

  1. Common Errors:
    • Broker connection issues: Verify bootstrap.servers and the Kafka broker are running.
    • Protobuf parsing failure: Ensure the producer and consumer use the same .proto schema.
  2. Testing with Mock Data:
    • Send test messages to the Kafka topic using a producer (e.g., a Python or Node.js script).

 

No comments:

Post a Comment