Monday, 1 January 2024

nodejs and proto file

 

1. Prerequisites

Before starting, ensure you have the following installed:

  1. Node.js: Install Node.js from nodejs.org and verify:

bash

node -v

npm -v

  1. Kafka: The Kafka server must be running, and you should know the bootstrap_servers, topic, and other required connection details.
  2. Required Libraries: Install the following Node.js libraries:

bash

npm install kafka-node google-protobuf


2. Create and Compile the .proto File

Step 1: Write the .proto File

Assume your .proto file (TTTMessage.proto) defines the structure like this:

proto

syntax = "proto3";

 

message TTTMessage {

  string id = 1;

  string symbol = 2;

  int64 timestamp = 3;

  float price = 4;

  int32 volume = 5;

}

Save this as TTTMessage.proto.


Step 2: Generate the Proto Class for Node.js

Run the following command to generate the JavaScript class:

bash

protoc --js_out=import_style=commonjs,binary:. TTTMessage.proto

This will create a file called TTTMessage_pb.js in the same directory as your .proto file.


3. Implement the Kafka Consumer in Node.js

Below is the complete Node.js implementation of the Kafka consumer script with Proto message deserialization:

javascript

// Import required libraries

const kafka = require('kafka-node');

const TTTMessage = require('./TTTMessage_pb'); // Generated from .proto

 

// Kafka Consumer Class

class Consumer {

  constructor(topic, durationMinutes = 5) {

    this.topic = topic;

    this.durationMinutes = durationMinutes;

    this.consumer = null;

  }

 

  // Start the Kafka consumer

  start() {

    const client = new kafka.KafkaClient({ kafkaHost: 'broker.bqt.pulse.ABCD:9092' });

    this.consumer = new kafka.Consumer(

      client,

      [{ topic: this.topic, partition: 0 }],

      {

        groupId: 'group_id', // Replace with your consumer group

        autoCommit: true,

        fromOffset: 'latest',

      }

    );

 

    console.log(`Consumer started for topic: ${this.topic}`);

 

    // Listen to incoming messages

    this.consumer.on('message', (message) => {

      console.log('Raw Kafka Message:', message);

 

      // Deserialize the message using the Proto class

      const buffer = Buffer.from(message.value, 'binary'); // Convert to buffer

      const TTTMessage = TTTMessage.TTTMessage.deserializeBinary(buffer);

 

      // Print deserialized Proto message as an object

      console.log('Deserialized Message:', TTTMessage.toObject());

    });

 

    // Handle errors

    this.consumer.on('error', (err) => {

      console.error('Kafka Consumer Error:', err);

    });

 

    // Stop the consumer after the specified duration

    setTimeout(() => this.stop(), this.durationMinutes * 60 * 1000);

  }

 

  // Stop the Kafka consumer

  stop() {

    if (this.consumer) {

      console.log('Stopping Kafka Consumer...');

      this.consumer.close(true, () => {

        console.log('Kafka Consumer stopped.');

      });

    }

  }

}

 

// Main function

const main = () => {

  const topic = 'bqt_trd_str_1'; // Replace with your Kafka topic

  const durationMinutes = 1; // Duration to consume messages

  const consumer = new Consumer(topic, durationMinutes);

  consumer.start();

};

 

// Run the main function

main();


4. Explanation of the Code

  1. Kafka Consumer Setup:
    • The kafka-node library is used to connect to the Kafka broker.
    • The KafkaClient object specifies the broker address.
    • The Consumer object listens for messages on a specific topic.
  2. Deserializing Proto Messages:
    • The google-protobuf library is used to deserialize the message.
    • deserializeBinary(buffer) converts the binary Kafka message into a Proto object.
    • .toObject() converts the Proto object to a plain JavaScript object for easier processing.
  3. Stopping the Consumer:
    • The setTimeout function stops the consumer after the specified duration.

5. Directory Structure

Your project should have the following structure:

bash

Copy code

/your-project

|-- TTTMessage.proto         # The Protocol Buffers schema

|-- TTTMessage_pb.js         # Generated JavaScript class from .proto

|-- consumer.js              # The Kafka consumer implementation

|-- package.json             # Your Node.js package file


6. Dependencies in package.json

Run npm init -y to create a package.json file. Then, install the required dependencies:

bash

Copy code

npm install kafka-node google-protobuf

Your package.json will look like this:

json

Copy code

{

  "name": "kafka-consumer-nodejs",

  "version": "1.0.0",

  "main": "consumer.js",

  "dependencies": {

    "google-protobuf": "^3.21.12",

    "kafka-node": "^5.0.0"

  }

}


7. Testing the Consumer

  1. Ensure the Kafka broker is running.
  2. Place the TTTMessage.proto file in the same directory and generate the TTTMessage_pb.js file.
  3. Start the consumer:

bash

Copy code

node consumer.js

  1. Send a test message to the Kafka topic in the binary format using a producer.

8. Debugging Tips

  1. Kafka Broker Connection Errors:
    • Ensure the kafkaHost matches the correct broker address and port.
    • Verify that the Kafka topic exists.
  2. Proto Message Deserialization Issues:
    • Ensure the producer is sending messages in the format defined by the .proto file.
    • Use a debugger to inspect the raw binary message before deserialization.

 

No comments:

Post a Comment