Monday, 1 January 2024

Transfer file from EC2 to local using SSM

 import boto3

import base64

import os

import time


def get_instance_id_by_private_ip(private_ip, region="us-east-1", aws_profile=None):

    # Set the AWS profile, if specified

    if aws_profile:

        boto3.setup_default_session(profile_name=aws_profile)


    # Initialize EC2 client

    ec2_client = boto3.client("ec2", region_name=region)

    

    # Describe instances filtered by private IP

    response = ec2_client.describe_instances(

        Filters=[{"Name": "private-ip-address", "Values": [private_ip]}]

    )

    

    # Extract the instance ID from the response

    for reservation in response["Reservations"]:

        for instance in reservation["Instances"]:

            return instance["InstanceId"]

    

    # If no instance found, return None

    return None


def copy_files_from_ec2(file_paths_on_ec2, local_save_dir, instance_id, region="us-east-1", aws_profile=None):

    # Set the AWS profile, if specified

    if aws_profile:

        boto3.setup_default_session(profile_name=aws_profile)


    # Initialize SSM client

    ssm_client = boto3.client("ssm", region_name=region)


    for file_path_on_ec2 in file_paths_on_ec2:

        # Extract filename to save locally

        filename = os.path.basename(file_path_on_ec2)

        local_save_path = os.path.join(local_save_dir, filename)


        # Base64 encode the file on the EC2 instance using an SSM command

        commands = [f'base64 {file_path_on_ec2}']

        

        # Send the command to EC2 via SSM

        response = ssm_client.send_command(

            DocumentName="AWS-RunShellScript",

            Parameters={"commands": commands},

            InstanceIds=[instance_id],

        )

        

        # Fetch the command ID to track the output

        command_id = response["Command"]["CommandId"]


        # Wait for the command to complete

        time.sleep(2)  # Initial wait before polling for command status

        while True:

            result = ssm_client.get_command_invocation(

                CommandId=command_id,

                InstanceId=instance_id,

            )

            if result["Status"] == "Success":

                # Retrieve the Base64 output

                encoded_content = result["StandardOutputContent"]

                break

            elif result["Status"] in ["Failed", "Cancelled", "TimedOut"]:

                raise Exception(f"SSM command failed for '{file_path_on_ec2}' with status: {result['Status']}")

            time.sleep(1)  # Poll every second


        # Decode the Base64 content and write it to a local file

        decoded_content = base64.b64decode(encoded_content)

        with open(local_save_path, "wb") as file:

            file.write(decoded_content)


        print(f"File '{file_path_on_ec2}' copied from instance '{instance_id}' to local path '{local_save_path}'.")


# Example usage


# Retrieve the instance ID using a private IP

private_ip = "10.0.0.1"  # Replace with the private IP address you're looking for

region = "us-east-1"  # Replace with your AWS region if different

aws_profile = "your-profile-name"  # Replace with your AWS CLI profile name


instance_id = get_instance_id_by_private_ip(private_ip, region, aws_profile)

if instance_id:

    print(f"The instance ID for private IP {private_ip} is {instance_id}.")


    # Specify files to copy from EC2 and the local save directory

    file_paths_on_ec2 = ["/home/ec2-user/test1.py", "/home/ec2-user/test2.py"]  # Paths to files on EC2 instance

    local_save_dir = "./"  # Local directory to save the files


    # Copy files from EC2 to local machine

    copy_files_from_ec2(file_paths_on_ec2, local_save_dir, instance_id, region, aws_profile)

else:

    print(f"No instance found with private IP {private_ip}.")


Conversation from nano sec to micro sec

from datetime import datetime


# Given data (replace these values with your actual data)

sourcetime = 1731614518958014585  # Source time in nanoseconds

fh_sourcetime = 1731614518        # FH time in Linux seconds (Unix time in seconds since epoch)

fh_sourcetimeNS = 989645647       # Remaining nanoseconds for FH time

pps_sourcetime = 1731614518       # PPS time in Linux seconds (Unix time in seconds since epoch)

pps_sourcetimeNS = 964054168      # Remaining nanoseconds for PPS time


# Get the current time in milliseconds since epoch

current_time_ms = int(datetime.now().timestamp() * 1000)


# 1. Convert `sourcetime` from nanoseconds to milliseconds

# - `sourcetime` is in nanoseconds, so divide by 1,000,000 to convert to milliseconds.

source_time_ms = sourcetime / 1_000_000


# Calculate delta from `sourcetime` to `current_time_ms`

delta_source_to_current = current_time_ms - source_time_ms


# 2. Convert `fh_sourcetime` and `fh_sourcetimeNS` to milliseconds

# - `fh_sourcetime` is in seconds, so multiply by 1,000 to get milliseconds.

# - `fh_sourcetimeNS` is in nanoseconds, so divide by 1,000,000 to convert to milliseconds.

# - Add both results to get the full `fh_time_ms` in milliseconds.

fh_time_ms = (fh_sourcetime * 1_000) + (fh_sourcetimeNS / 1_000_000)


# Calculate delta from `source_time_ms` to `fh_time_ms`

delta_source_to_fh = fh_time_ms - source_time_ms


# 3. Convert `pps_sourcetime` and `pps_sourcetimeNS` to milliseconds

# - `pps_sourcetime` is in seconds, so multiply by 1,000 to get milliseconds.

# - `pps_sourcetimeNS` is in nanoseconds, so divide by 1,000,000 to convert to milliseconds.

# - Add both results to get the full `pps_time_ms` in milliseconds.

pps_time_ms = (pps_sourcetime * 1_000) + (pps_sourcetimeNS / 1_000_000)


# Calculate delta from `fh_time_ms` to `pps_time_ms`

delta_fh_to_pps = pps_time_ms - fh_time_ms


# 4. Calculate delta from `pps_time_ms` to `current_time_ms`

delta_pps_to_current = current_time_ms - pps_time_ms


# Display the results with explanations

print("Delta (Source Time to Current Time):", delta_source_to_current, "ms")

print("Delta (Source Time to FH Time):", delta_source_to_fh, "ms")

print("Delta (FH Time to PPS Time):", delta_fh_to_pps, "ms")

print("Delta (PPS Time to Current Time):", delta_pps_to_current, "ms")


Java and proto

 

1. Prerequisites

Install Java

Make sure you have the Java Development Kit (JDK) installed (version 8 or higher). Verify installation:

bash

 

java -version

javac -version

If not installed, download it from the Oracle JDK website or use an open-source version like OpenJDK.

Install Apache Maven

Maven is required to manage dependencies in the Java project. Verify installation:

bash

 

mvn -version

If not installed, download it from the Maven website.


2. Create and Compile the .proto File

Step 1: Write the .proto File

Let’s assume the file TTTMessage.proto contains:

proto

 

syntax = "proto3";

 

message TTTMessage {

  string id = 1;

  string symbol = 2;

  int64 timestamp = 3;

  float price = 4;

  int32 volume = 5;

}

Step 2: Compile the .proto File

Use protoc to generate Java classes from the .proto file.

Install Protoc

If not installed, download protoc from the Protocol Buffers GitHub releases.

Generate Java Classes

Run the following command to generate the Java class:

bash

 

protoc --java_out=./src/main/java TTTMessage.proto

This will generate the TTTMessage.java file inside the specified directory (./src/main/java).


3. Create a Maven Project

  1. Create a new Maven project:

bash

 

mvn archetype:generate -DgroupId=com.example.kafka -DartifactId=kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

cd kafka-consumer

  1. Add dependencies to the pom.xml file:

xml

 

<dependencies>

    <!-- Kafka Client -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>3.5.1</version>

    </dependency>

 

    <!-- Protobuf -->

    <dependency>

        <groupId>com.google.protobuf</groupId>

        <artifactId>protobuf-java</artifactId>

        <version>3.24.0</version>

    </dependency>

</dependencies>

  1. Add the generated TTTMessage.java file to the src/main/java folder in your Maven project.

4. Implement the Kafka Consumer

Here’s the complete Java implementation:

java

 

package com.example.kafka;

 

import com.google.protobuf.InvalidProtocolBufferException;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

 

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

 

// Import the generated Protobuf class

import com.example.protobuf.TTTMessageProtos.TTTMessage;

 

public class Consumer {

    public static void main(String[] args) {

        // Kafka topic

        String topic = "TTT_trd_str_1";

 

        // Kafka consumer properties

        Properties props = new Properties();

        props.put("bootstrap.servers", "broker.TTT.pulse.ABCD:9092");

        props.put("group.id", "group_id"); // Replace with your consumer group

        props.put("key.deserializer", StringDeserializer.class.getName());

        props.put("value.deserializer", org.apache.kafka.common.serialization.ByteArrayDeserializer.class.getName());

        props.put("auto.offset.reset", "latest");

        props.put("enable.auto.commit", "true");

 

        // Create Kafka consumer

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

 

        // Subscribe to the topic

        consumer.subscribe(Collections.singletonList(topic));

        System.out.println("Consumer started for topic: " + topic);

 

        try {

            // Poll messages

            while (true) {

                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));

 

                for (ConsumerRecord<String, byte[]> record : records) {

                    System.out.println("Raw Kafka Message Key: " + record.key());

                    System.out.println("Raw Kafka Message Value: " + new String(record.value()));

 

                    // Deserialize the message using Protobuf

                    try {

                        TTTMessage message = TTTMessage.parseFrom(record.value());

                        System.out.println("Deserialized Protobuf Message: ");

                        System.out.println("ID: " + message.getId());

                        System.out.println("Symbol: " + message.getSymbol());

                        System.out.println("Timestamp: " + message.getTimestamp());

                        System.out.println("Price: " + message.getPrice());

                        System.out.println("Volume: " + message.getVolume());

                    } catch (InvalidProtocolBufferException e) {

                        System.err.println("Failed to deserialize Protobuf message: " + e.getMessage());

                    }

                }

            }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            consumer.close();

        }

    }

}


5. Project Directory Structure

The project directory should look like this:

lua

 

/kafka-consumer

|-- pom.xml

|-- src

    |-- main

        |-- java

            |-- com

                |-- example

                    |-- kafka

                        |-- Consumer.java

                    |-- protobuf

                        |-- TTTMessage.java


6. Run the Kafka Consumer

  1. Compile the project:

bash

 

mvn clean install

  1. Run the consumer:

bash

 

java -cp target/kafka-consumer-1.0-SNAPSHOT.jar com.example.kafka.Consumer


7. Explanation of Code

  1. Kafka Consumer Setup:
    • The KafkaConsumer connects to the Kafka broker and subscribes to the specified topic.
    • Messages are deserialized as byte arrays since Protobuf encodes data in binary format.
  2. Protobuf Message Parsing:
    • The generated TTTMessage class is used to parse the byte array.
    • parseFrom(record.value()) converts the binary message into a Protobuf object.
  3. Deserialized Data:
    • The Protobuf object allows access to fields like id, symbol, timestamp, etc.

8. Debugging Tips

  1. Common Errors:
    • Broker connection issues: Verify bootstrap.servers and the Kafka broker are running.
    • Protobuf parsing failure: Ensure the producer and consumer use the same .proto schema.
  2. Testing with Mock Data:
    • Send test messages to the Kafka topic using a producer (e.g., a Python or Node.js script).

 

nodejs and proto file

 

1. Prerequisites

Before starting, ensure you have the following installed:

  1. Node.js: Install Node.js from nodejs.org and verify:

bash

node -v

npm -v

  1. Kafka: The Kafka server must be running, and you should know the bootstrap_servers, topic, and other required connection details.
  2. Required Libraries: Install the following Node.js libraries:

bash

npm install kafka-node google-protobuf


2. Create and Compile the .proto File

Step 1: Write the .proto File

Assume your .proto file (TTTMessage.proto) defines the structure like this:

proto

syntax = "proto3";

 

message TTTMessage {

  string id = 1;

  string symbol = 2;

  int64 timestamp = 3;

  float price = 4;

  int32 volume = 5;

}

Save this as TTTMessage.proto.


Step 2: Generate the Proto Class for Node.js

Run the following command to generate the JavaScript class:

bash

protoc --js_out=import_style=commonjs,binary:. TTTMessage.proto

This will create a file called TTTMessage_pb.js in the same directory as your .proto file.


3. Implement the Kafka Consumer in Node.js

Below is the complete Node.js implementation of the Kafka consumer script with Proto message deserialization:

javascript

// Import required libraries

const kafka = require('kafka-node');

const TTTMessage = require('./TTTMessage_pb'); // Generated from .proto

 

// Kafka Consumer Class

class Consumer {

  constructor(topic, durationMinutes = 5) {

    this.topic = topic;

    this.durationMinutes = durationMinutes;

    this.consumer = null;

  }

 

  // Start the Kafka consumer

  start() {

    const client = new kafka.KafkaClient({ kafkaHost: 'broker.bqt.pulse.ABCD:9092' });

    this.consumer = new kafka.Consumer(

      client,

      [{ topic: this.topic, partition: 0 }],

      {

        groupId: 'group_id', // Replace with your consumer group

        autoCommit: true,

        fromOffset: 'latest',

      }

    );

 

    console.log(`Consumer started for topic: ${this.topic}`);

 

    // Listen to incoming messages

    this.consumer.on('message', (message) => {

      console.log('Raw Kafka Message:', message);

 

      // Deserialize the message using the Proto class

      const buffer = Buffer.from(message.value, 'binary'); // Convert to buffer

      const TTTMessage = TTTMessage.TTTMessage.deserializeBinary(buffer);

 

      // Print deserialized Proto message as an object

      console.log('Deserialized Message:', TTTMessage.toObject());

    });

 

    // Handle errors

    this.consumer.on('error', (err) => {

      console.error('Kafka Consumer Error:', err);

    });

 

    // Stop the consumer after the specified duration

    setTimeout(() => this.stop(), this.durationMinutes * 60 * 1000);

  }

 

  // Stop the Kafka consumer

  stop() {

    if (this.consumer) {

      console.log('Stopping Kafka Consumer...');

      this.consumer.close(true, () => {

        console.log('Kafka Consumer stopped.');

      });

    }

  }

}

 

// Main function

const main = () => {

  const topic = 'bqt_trd_str_1'; // Replace with your Kafka topic

  const durationMinutes = 1; // Duration to consume messages

  const consumer = new Consumer(topic, durationMinutes);

  consumer.start();

};

 

// Run the main function

main();


4. Explanation of the Code

  1. Kafka Consumer Setup:
    • The kafka-node library is used to connect to the Kafka broker.
    • The KafkaClient object specifies the broker address.
    • The Consumer object listens for messages on a specific topic.
  2. Deserializing Proto Messages:
    • The google-protobuf library is used to deserialize the message.
    • deserializeBinary(buffer) converts the binary Kafka message into a Proto object.
    • .toObject() converts the Proto object to a plain JavaScript object for easier processing.
  3. Stopping the Consumer:
    • The setTimeout function stops the consumer after the specified duration.

5. Directory Structure

Your project should have the following structure:

bash

Copy code

/your-project

|-- TTTMessage.proto         # The Protocol Buffers schema

|-- TTTMessage_pb.js         # Generated JavaScript class from .proto

|-- consumer.js              # The Kafka consumer implementation

|-- package.json             # Your Node.js package file


6. Dependencies in package.json

Run npm init -y to create a package.json file. Then, install the required dependencies:

bash

Copy code

npm install kafka-node google-protobuf

Your package.json will look like this:

json

Copy code

{

  "name": "kafka-consumer-nodejs",

  "version": "1.0.0",

  "main": "consumer.js",

  "dependencies": {

    "google-protobuf": "^3.21.12",

    "kafka-node": "^5.0.0"

  }

}


7. Testing the Consumer

  1. Ensure the Kafka broker is running.
  2. Place the TTTMessage.proto file in the same directory and generate the TTTMessage_pb.js file.
  3. Start the consumer:

bash

Copy code

node consumer.js

  1. Send a test message to the Kafka topic in the binary format using a producer.

8. Debugging Tips

  1. Kafka Broker Connection Errors:
    • Ensure the kafkaHost matches the correct broker address and port.
    • Verify that the Kafka topic exists.
  2. Proto Message Deserialization Issues:
    • Ensure the producer is sending messages in the format defined by the .proto file.
    • Use a debugger to inspect the raw binary message before deserialization.