1. Prerequisites
Install Java
Make sure you have the Java Development Kit (JDK) installed
(version 8 or higher). Verify installation:
bash
java -version
javac -version
If not installed, download it from the Oracle JDK website or use an open-source version like OpenJDK.
Install Apache Maven
Maven is required to manage dependencies in the Java
project. Verify installation:
bash
mvn -version
If not installed, download it from the Maven website.
2. Create and Compile the .proto File
Step 1: Write the .proto File
Let’s assume the file TTTMessage.proto contains:
proto
syntax = "proto3";
message TTTMessage {
string id = 1;
string symbol = 2;
int64 timestamp = 3;
float price = 4;
int32 volume = 5;
}
Step 2: Compile the .proto File
Use protoc to generate Java classes from the .proto file.
Install Protoc
If not installed, download protoc from the Protocol
Buffers GitHub releases.
Generate Java Classes
Run the following command to generate the Java class:
bash
protoc --java_out=./src/main/java TTTMessage.proto
This will generate the TTTMessage.java file inside the
specified directory (./src/main/java).
3. Create a Maven Project
- Create
a new Maven project:
bash
mvn archetype:generate -DgroupId=com.example.kafka
-DartifactId=kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart
-DinteractiveMode=false
cd kafka-consumer
- Add
dependencies to the pom.xml file:
xml
<dependencies>
<!-- Kafka
Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
<!-- Protobuf
-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.24.0</version>
</dependency>
</dependencies>
- Add
the generated TTTMessage.java file to the src/main/java folder in your
Maven project.
4. Implement the Kafka Consumer
Here’s the complete Java implementation:
java
package com.example.kafka;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import
org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
// Import the generated Protobuf class
import com.example.protobuf.TTTMessageProtos.TTTMessage;
public class Consumer {
public static void
main(String[] args) {
// Kafka topic
String topic =
"TTT_trd_str_1";
// Kafka
consumer properties
Properties props
= new Properties();
props.put("bootstrap.servers",
"broker.TTT.pulse.ABCD:9092");
props.put("group.id",
"group_id"); // Replace with your consumer group
props.put("key.deserializer",
StringDeserializer.class.getName());
props.put("value.deserializer",
org.apache.kafka.common.serialization.ByteArrayDeserializer.class.getName());
props.put("auto.offset.reset",
"latest");
props.put("enable.auto.commit",
"true");
// Create
Kafka consumer
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
// Subscribe
to the topic
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Consumer started for topic: " + topic);
try {
// Poll
messages
while (true)
{
ConsumerRecords<String, byte[]> records =
consumer.poll(Duration.ofMillis(1000));
for
(ConsumerRecord<String, byte[]> record : records) {
System.out.println("Raw Kafka Message Key: " + record.key());
System.out.println("Raw Kafka Message Value: " + new String(record.value()));
//
Deserialize the message using Protobuf
try
{
TTTMessage message = TTTMessage.parseFrom(record.value());
System.out.println("Deserialized Protobuf Message: ");
System.out.println("ID: " + message.getId());
System.out.println("Symbol: " + message.getSymbol());
System.out.println("Timestamp: " + message.getTimestamp());
System.out.println("Price: " + message.getPrice());
System.out.println("Volume: " + message.getVolume());
} catch
(InvalidProtocolBufferException e) {
System.err.println("Failed to deserialize Protobuf message: "
+ e.getMessage());
}
}
}
} catch
(Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
5. Project Directory Structure
The project directory should look like this:
lua
/kafka-consumer
|-- pom.xml
|-- src
|-- main
|-- java
|-- com
|-- example
|--
kafka
|-- Consumer.java
|--
protobuf
|-- TTTMessage.java
6. Run the Kafka Consumer
- Compile
the project:
bash
mvn clean install
- Run
the consumer:
bash
java -cp target/kafka-consumer-1.0-SNAPSHOT.jar
com.example.kafka.Consumer
7. Explanation of Code
- Kafka
Consumer Setup:
- The KafkaConsumer
connects to the Kafka broker and subscribes to the specified topic.
- Messages
are deserialized as byte arrays since Protobuf encodes data in binary
format.
- Protobuf
Message Parsing:
- The
generated TTTMessage class is used to parse the byte array.
- parseFrom(record.value())
converts the binary message into a Protobuf object.
- Deserialized
Data:
- The
Protobuf object allows access to fields like id, symbol, timestamp, etc.
8. Debugging Tips
- Common
Errors:
- Broker
connection issues: Verify bootstrap.servers and the Kafka broker are
running.
- Protobuf
parsing failure: Ensure the producer and consumer use the same .proto
schema.
- Testing
with Mock Data:
- Send
test messages to the Kafka topic using a producer (e.g., a Python or
Node.js script).
No comments:
Post a Comment