Step 1: Install Protocol Buffers (protoc)
- Download
Protocol Buffers:
- Visit
the Protocol Buffers GitHub releases page.
- Download
the appropriate version for your system.
- Install
Protocol Buffers:
- Extract
the downloaded file and move the protoc binary to a directory included in
your PATH.
Example for Linux/Mac:
bash
sudo mv protoc /usr/local/bin/
Example for Windows:
- Add
the protoc binary path to your system environment variables.
- Verify
Installation: Run the following command to confirm installation:
bash
protoc --version
Step 2: Generate Java Classes from Proto File
- Locate
your .proto file (e.g., PPP_Cloud_Streaming.proto).
- Run protoc
to generate Java classes:
bash
protoc --java_out=PATH_TO_OUTPUT_DIRECTORY
PATH_TO_PROTO_FILE
Example:
bash
protoc --java_out=./src/main/java PPP_Cloud_Streaming.proto
- Include
the generated Java file in your Java project.
Step 3: Set Up Java Project with Kafka Dependencies
- Create
a Maven or Gradle project.
- Add
the required dependencies to pom.xml (for Maven) or build.gradle
(for Gradle).
Maven:
Add the following dependencies for Kafka and Protocol
Buffers:
xml
<dependencies>
<!-- Kafka
Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
<!-- Use the appropriate Kafka version -->
</dependency>
<!-- Protocol
Buffers -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.24.0</version>
<!-- Use the version matching your protoc -->
</dependency>
</dependencies>
Gradle:
Add the dependencies to build.gradle:
gradle
dependencies {
// Kafka Client
implementation
'org.apache.kafka:kafka-clients:3.5.1'
// Protocol
Buffers
implementation
'com.google.protobuf:protobuf-java:3.24.0'
}
Step 4: Java Code Implementation
Here’s a Java equivalent of your Python Kafka consumer:
java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
// Import generated Protobuf classes
import your.package. PPPCloudStreaming;
public class KafkaProtobufConsumer {
private final
KafkaConsumer<String, byte[]> consumer;
private final
String topicName;
public KafkaProtobufConsumer(String
topicName) {
this.topicName
= topicName;
// Kafka
consumer configuration
Properties props
= new Properties();
props.setProperty("bootstrap.servers", "broker.lt.use1. :9094");
// Adjust as needed
props.setProperty("group.id", "my-group");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"SCRAM\" password=\"password\";");
this.consumer
= new KafkaConsumer<>(props);
}
public void start()
{
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true)
{
ConsumerRecords<String,
byte[]> records = consumer.poll(Duration.ofMillis(100));
for
(ConsumerRecord<String, byte[]> record : records) {
//
Deserialize Protobuf message
PPPCloudStreaming. PPPMessage message = PPPCloudStreaming. PPPMessage.parseFrom(record.value());
//
Process the message
System.out.println("Received message: " + message);
}
}
} catch
(Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void
main(String[] args) {
KafkaProtobufConsumer
consumer = new KafkaProtobufConsumer(" PPP_trd_str_1");
consumer.start();
}
}
Key Points:
- Replace
your.package in the Java code with the package where the generated
Protobuf classes are located.
- Make
sure the Protobuf .proto file is up to date with the message structure.
- Test
the application by consuming messages from your Kafka topic.
No comments:
Post a Comment