Friday, 9 August 2024

Flink with protobuf new

 

Complete Guide to Set Up Apache Flink with Protobuf, Redpanda, and SQL Queries on AWS EC2 (Offline Setup)

1. Java 17 Installation and Setup on AWS EC2 (Linux)

First, ensure that you have Java 17 installed on your AWS EC2 instance.

Steps to Install Java 17 on AWS EC2

  1. Update your package list:

    bash
    sudo yum update -y
  2. Install OpenJDK 17:

    bash
    sudo yum install java-17-openjdk-devel -y
  3. Verify Java Installation:

    bash
    java -version

    This should output the version of Java installed (e.g., openjdk version "17").


2. Set Up Apache Flink (Offline Setup)

Since your EC2 instance does not have internet access, you must download the necessary Apache Flink binaries manually and transfer them to your EC2 instance.

Download Apache Flink

  1. Download Flink from the official website on a machine with internet access:

    • Go to the Flink download page.
    • Choose the appropriate version (e.g., 1.16.x) and download the binary.
  2. Transfer Flink to EC2:

    • Use scp to transfer the Flink tarball (apache-flink-1.x.x-bin-scala_2.x.tgz) to your EC2 instance.
      bash
      scp apache-flink-1.x.x-bin-scala_2.x.tgz ec2-user@your-ec2-ip:/home/ec2-user
  3. Extract Flink:

    bash
    tar -xvzf apache-flink-1.x.x-bin-scala_2.x.tgz cd apache-flink-1.x.x
  4. Start Flink:

    • Start the Flink cluster:
      bash
      ./bin/start-cluster.sh

3. Set Up Redpanda (Kafka) on AWS EC2

Redpanda is a Kafka-compatible system. Make sure Redpanda is set up on your EC2 instance. You'll be using the Kafka consumer in Flink to connect to Redpanda.

Redpanda Configuration

You need to set the Redpanda broker URL, username, and password for connecting from Flink.

  1. Configure Redpanda URL and Authentication in Flink.

4. Protobuf Deserialization in Flink

You need to create a custom deserialization schema to handle Protobuf messages in Flink.

Custom Protobuf Deserialization Schema

Create a ProtobufDeserializationSchema.java file:

java
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import com.google.protobuf.InvalidProtocolBufferException; import trust.SMSMessage; // Generated Protobuf class from your .proto file public class ProtobufDeserializationSchema extends AbstractDeserializationSchema<SMSMessage> { @Override public SMSMessage deserialize(byte[] message) throws InvalidProtocolBufferException { return SMSMessage.parseFrom(message); } }

Ensure that you have Protobuf generated classes (SMSMessage, Trade, StockSummary, etc.) from your .proto file.

Protobuf File (SMSMessage.proto)

Here is your .proto file:

proto
syntax = "proto2"; package trust; message SMSMessage { required uint32 msg_type = 1; oneof payload { Trade trade = 2; StockSummary ssm = 3; } } message StockSummary { optional uint64 feedmsgseq = 1; optional uint64 sourcetime = 2; optional string symbol = 3; optional double hiprice = 4; optional double loprice = 5; optional double listingmktopenprice = 6; optional uint32 grpvol = 7; optional string mktofhiprice = 8; optional string mktofloprice = 9; optional string mktofopenprice = 10; optional uint32 numclsprice = 11; optional string mktofcloseprice = 12; optional double listingmktcloseprice = 13; optional double conshiprice = 14; optional double consloprice = 15; optional double consfirstprice = 16; optional double conslastprice = 17; optional string complete = 18; } message Trade { optional uint64 feedmsgseq = 1; optional uint64 sourcetime = 2; optional string symbol = 3; optional uint32 tradeid = 4; optional double price = 5; optional uint32 volume = 6; optional string tradecondition1 = 7; optional string tradecondition2 = 8; optional string tradecondition3 = 9; optional string tradecondition4 = 10; optional string marketid = 11; }

Generate Java classes from the .proto file using the Protobuf Compiler (protoc):

bash
protoc --java_out=. SMSMessage.proto

5. Flink Job with Protobuf Deserialization and SQL Query Execution

Complete Flink Job

Here’s how to set up your Flink job to read messages from Redpanda, deserialize them using Protobuf, and execute SQL queries:

java
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.*; import org.apache.flink.api.java.tuple.Tuple3; import java.util.Properties; public class FlinkProtobufProcessingWithSQL { public static void main(String[] args) throws Exception { // Set up the Flink execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Kafka properties (Redpanda properties) Properties properties = new Properties(); // Redpanda Broker URL (replace with your Redpanda cluster address) properties.setProperty("bootstrap.servers", "your-redpanda-broker-url:9092"); // Authentication (if applicable) properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"your-username\" password=\"your-password\";"); properties.setProperty("security.protocol", "SASL_PLAINTEXT"); // Or "SASL_SSL" if SSL is enabled properties.setProperty("sasl.mechanism", "PLAIN"); // Optional: Consumer group ID properties.setProperty("group.id", "flink-group"); // Create Kafka consumer with the custom deserialization schema FlinkKafkaConsumer<byte[]> consumer = new FlinkKafkaConsumer<>( "your_protobuf_topic", // Redpanda topic name new ProtobufDeserializationSchema(), properties ); // Read the stream DataStream<Tuple3<Integer, String, String>> stream = env.addSource(consumer); // Register the stream as a Table with Flink SQL API Table inputTable = tableEnv.fromDataStream(stream, $("msg_type").as("msg_type"), $("payload").as("payload"), $("smsMessage").as("smsMessage") // You can rename your fields as needed ); // Register the table with a name for SQL queries tableEnv.createTemporaryView("protobuf_table", inputTable); // Execute SQL queries String sqlQuery = "SELECT msg_type, payload FROM protobuf_table WHERE msg_type = 1"; Table resultTable = tableEnv.sqlQuery(sqlQuery); // Convert the result table back to a stream and print the results DataStream<Tuple3<Integer, String, String>> resultStream = tableEnv.toAppendStream(resultTable, Tuple3.class); resultStream.print(); // Execute the Flink job env.execute("Flink Protobuf SQL Processing Job"); } }

SQL Queries in Flink

Here are some sample queries you can run once you have your Protobuf messages registered as tables:

  • Select specific fields:

    sql
    SELECT msg_type, payload.symbol, payload.price FROM protobuf_table WHERE msg_type = 2;
  • Count the number of messages:

    sql
    SELECT msg_type, COUNT(*) AS message_count FROM protobuf_table GROUP BY msg_type;

6. Conclusion

This document covers:

  1. Installing Java 17 on AWS EC2.
  2. Setting up Apache Flink in an offline environment.
  3. Connecting Flink to Redpanda using Kafka connectors.
  4. Deserializing Protobuf messages using a custom schema in Flink.
  5. Executing SQL queries on Protobuf data.

By following this guide, you can easily process Redpanda data in Apache Flink and execute SQL queries directly on the data in an offline environment.

No comments:

Post a Comment