from confluent_kafka import Producer
import socket
import netifaces
# Configuration for Redpanda Producer
conf = {
'bootstrap.servers': '192.168.1.100:9092', # Replace with your Redpanda broker
'client.id': socket.gethostname() # Set client ID
}
# Create a producer instance
producer = Producer(conf)
# Function to retrieve the network interface based on the IP address
def get_network_interface(ip_address):
for iface in netifaces.interfaces():
addrs = netifaces.ifaddresses(iface)
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
if addr.get('addr') == ip_address:
return iface
return "Unknown Interface"
# Function to log the network interface being used
def log_network_info():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(('192.168.1.100', 9092)) # Replace with your broker's address
local_ip, local_port = sock.getsockname()
# Get the network interface using the local IP
interface = get_network_interface(local_ip)
print(f"Data sent using Interface: {interface} | Local IP: {local_ip} | Local Port: {local_port}")
# Function to handle delivery reports
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Send a test message
topic = "test_topic" # Replace with your topic
key = "key1"
value = "Hello, Redpanda!"
print("Starting Redpanda producer...")
log_network_info() # Log network details before sending data
producer.produce(topic, key=key, value=value, callback=delivery_report)
producer.flush()
No comments:
Post a Comment