1. Prerequisites
Before starting, ensure you have the following installed:
- Node.js:
Install Node.js from nodejs.org
and verify:
bash
node -v
npm -v
- Kafka:
The Kafka server must be running, and you should know the bootstrap_servers,
topic, and other required connection details.
- Required
Libraries: Install the following Node.js libraries:
bash
npm install kafka-node google-protobuf
2. Create and Compile the .proto File
Step 1: Write the .proto File
Assume your .proto file (TTTMessage.proto) defines the
structure like this:
proto
syntax = "proto3";
message TTTMessage {
string id = 1;
string symbol = 2;
int64 timestamp = 3;
float price = 4;
int32 volume = 5;
}
Save this as TTTMessage.proto.
Step 2: Generate the Proto Class for Node.js
Run the following command to generate the JavaScript class:
bash
protoc --js_out=import_style=commonjs,binary:. TTTMessage.proto
This will create a file called TTTMessage_pb.js in the same
directory as your .proto file.
3. Implement the Kafka Consumer in Node.js
Below is the complete Node.js implementation of the Kafka
consumer script with Proto message deserialization:
javascript
// Import required libraries
const kafka = require('kafka-node');
const TTTMessage = require('./TTTMessage_pb'); // Generated
from .proto
// Kafka Consumer Class
class Consumer {
constructor(topic,
durationMinutes = 5) {
this.topic =
topic;
this.durationMinutes
= durationMinutes;
this.consumer = null;
}
// Start the Kafka
consumer
start() {
const client = new
kafka.KafkaClient({ kafkaHost: 'broker.bqt.pulse.ABCD:9092' });
this.consumer = new
kafka.Consumer(
client,
[{ topic: this.topic,
partition: 0 }],
{
groupId: 'group_id',
// Replace with your consumer group
autoCommit: true,
fromOffset: 'latest',
}
);
console.log(`Consumer
started for topic: ${this.topic}`);
// Listen to
incoming messages
this.consumer.on('message',
(message) => {
console.log('Raw
Kafka Message:', message);
// Deserialize
the message using the Proto class
const buffer = Buffer.from(message.value,
'binary'); // Convert to buffer
const TTTMessage
= TTTMessage.TTTMessage.deserializeBinary(buffer);
// Print
deserialized Proto message as an object
console.log('Deserialized
Message:', TTTMessage.toObject());
});
// Handle errors
this.consumer.on('error',
(err) => {
console.error('Kafka
Consumer Error:', err);
});
// Stop the
consumer after the specified duration
setTimeout(()
=> this.stop(), this.durationMinutes * 60 * 1000);
}
// Stop the Kafka
consumer
stop() {
if (this.consumer)
{
console.log('Stopping
Kafka Consumer...');
this.consumer.close(true,
() => {
console.log('Kafka
Consumer stopped.');
});
}
}
}
// Main function
const main = () => {
const topic = 'bqt_trd_str_1';
// Replace with your Kafka topic
const
durationMinutes = 1; // Duration to consume messages
const consumer = new
Consumer(topic, durationMinutes);
consumer.start();
};
// Run the main function
main();
4. Explanation of the Code
- Kafka
Consumer Setup:
- The kafka-node
library is used to connect to the Kafka broker.
- The KafkaClient
object specifies the broker address.
- The Consumer
object listens for messages on a specific topic.
- Deserializing
Proto Messages:
- The google-protobuf
library is used to deserialize the message.
- deserializeBinary(buffer)
converts the binary Kafka message into a Proto object.
- .toObject()
converts the Proto object to a plain JavaScript object for easier
processing.
- Stopping
the Consumer:
- The setTimeout
function stops the consumer after the specified duration.
5. Directory Structure
Your project should have the following structure:
bash
Copy code
/your-project
|-- TTTMessage.proto
# The Protocol Buffers schema
|-- TTTMessage_pb.js
# Generated JavaScript class from .proto
|-- consumer.js
# The Kafka consumer implementation
|-- package.json
# Your Node.js package file
6. Dependencies in package.json
Run npm init -y to create a package.json file. Then, install
the required dependencies:
bash
Copy code
npm install kafka-node google-protobuf
Your package.json will look like this:
json
Copy code
{
"name": "kafka-consumer-nodejs",
"version":
"1.0.0",
"main": "consumer.js",
"dependencies":
{
"google-protobuf":
"^3.21.12",
"kafka-node":
"^5.0.0"
}
}
7. Testing the Consumer
- Ensure
the Kafka broker is running.
- Place
the TTTMessage.proto file in the same directory and generate the TTTMessage_pb.js
file.
- Start
the consumer:
bash
Copy code
node consumer.js
- Send a
test message to the Kafka topic in the binary format using a producer.
8. Debugging Tips
- Kafka
Broker Connection Errors:
- Ensure
the kafkaHost matches the correct broker address and port.
- Verify
that the Kafka topic exists.
- Proto
Message Deserialization Issues:
- Ensure
the producer is sending messages in the format defined by the .proto
file.
- Use
a debugger to inspect the raw binary message before deserialization.
No comments:
Post a Comment