Running Flink SQL on Redpanda with Protobuf using Podman
1. Set Up Flink and Redpanda with Podman
Create a Podman Compose File
Save the following as podman-compose.yml
:
version: '3.7'
services:
redpanda:
image: docker.redpanda.com/vectorized/redpanda:v23.2.1
container_name: redpanda
command:
- redpanda start
- --smp 1
- --memory 1G
- --overprovisioned
- --node-id 0
- --check=false
- --kafka-addr PLAINTEXT://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:9092
- --rpc-addr 0.0.0.0:33145
- --advertise-rpc-addr redpanda:33145
- --schema-registry-addr 0.0.0.0:8081
ports:
- "9092:9092"
- "8081:8081"
networks:
- flink-net
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
container_name: schema-registry
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "redpanda:9092"
SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
ports:
- "8081:8081"
depends_on:
- redpanda
networks:
- flink-net
flink-jobmanager:
image: flink:1.16-scala_2.12
container_name: flink-jobmanager
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
ports:
- "8081:8081"
networks:
- flink-net
flink-taskmanager:
image: flink:1.16-scala_2.12
container_name: flink-taskmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
depends_on:
- flink-jobmanager
networks:
- flink-net
networks:
flink-net:
driver: bridge
Run Podman Compose
podman-compose up -d
2. Generate Protobuf Message Class
Install protoc
On Windows:
choco install protoc
On Linux:
sudo dnf install protobuf-compiler
Generate Java Class from .proto
-
Create your Protobuf file (
your_message.proto
):syntax = "proto3"; package com.yourcompany; message YourProtobufClass { int64 id = 1; string name = 2; map<string, string> data = 3; }
-
Generate Java class:
protoc --java_out=./generated your_message.proto
-
Transfer the
generated/
folder to your Flink instance:scp -r generated user@your-ec2:/home/user/
3. Define Flink SQL Table for Redpanda
Open Flink SQL Client
- Get the container name:
podman ps
- Enter the Flink JobManager container:
podman exec -it flink-jobmanager /bin/sh
- Start Flink SQL Client:
cd /opt/flink ./bin/sql-client.sh
Create Table for Redpanda
CREATE TABLE redpanda_table (
id BIGINT,
name STRING,
data MAP<STRING, STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'your_protobuf_topic',
'properties.bootstrap.servers' = 'redpanda:9092',
'format' = 'protobuf',
'protobuf.message-class' = 'com.yourcompany.YourProtobufClass',
'protobuf.descriptor-file' = 'file:///path/to/your_descriptor.desc'
);
4. Query Data from Redpanda
SELECT * FROM redpanda_table LIMIT 10;
5. Generate Protobuf Descriptor File
- Generate it using
protoc
:protoc --descriptor_set_out=your_descriptor.desc --proto_path=./ your_message.proto
- Transfer
your_descriptor.desc
to EC2:scp your_descriptor.desc user@your-ec2:/home/user/
✅ Final Steps
- ✅ Start services:
podman-compose up -d
- ✅ Generate Protobuf class:
protoc --java_out=./generated your_message.proto
- ✅ Generate Protobuf descriptor:
protoc --descriptor_set_out=your_descriptor.desc your_message.proto
- ✅ Define Flink SQL table
- ✅ Query Redpanda with
SELECT * FROM redpanda_table
________________________________________________
without schema
Running Flink SQL on Redpanda with Protobuf using Podman
1. Set Up Flink and Redpanda with Podman
Create a Podman Compose File
Save the following as podman-compose.yml
:
version: '3.7'
services:
redpanda:
image: docker.redpanda.com/vectorized/redpanda:v23.2.1
container_name: redpanda
command:
- redpanda start
- --smp 1
- --memory 1G
- --overprovisioned
- --node-id 0
- --check=false
- --kafka-addr PLAINTEXT://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:9092
- --rpc-addr 0.0.0.0:33145
- --advertise-rpc-addr redpanda:33145
ports:
- "9092:9092"
networks:
- flink-net
flink-jobmanager:
image: flink:1.16-scala_2.12
container_name: flink-jobmanager
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
ports:
- "8081:8081"
networks:
- flink-net
flink-taskmanager:
image: flink:1.16-scala_2.12
container_name: flink-taskmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
depends_on:
- flink-jobmanager
networks:
- flink-net
networks:
flink-net:
driver: bridge
Run Podman Compose
podman-compose up -d
2. Generate Protobuf Message Class
Install protoc
On Windows:
choco install protoc
On Linux:
sudo dnf install protobuf-compiler
Generate Java Class from .proto
-
Create your Protobuf file (
your_message.proto
):syntax = "proto3"; package com.yourcompany; message YourProtobufClass { int64 id = 1; string name = 2; map<string, string> data = 3; }
-
Generate Java class:
protoc --java_out=./generated your_message.proto
-
Transfer the
generated/
folder to your Flink instance:scp -r generated user@your-ec2:/home/user/
3. Define Flink SQL Table for Redpanda
Open Flink SQL Client
- Get the container name:
podman ps
- Enter the Flink JobManager container:
podman exec -it flink-jobmanager /bin/sh
- Start Flink SQL Client:
cd /opt/flink ./bin/sql-client.sh
Create Table for Redpanda (Without Schema Registry)
CREATE TABLE redpanda_table (
id BIGINT,
name STRING,
data MAP<STRING, STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'your_protobuf_topic',
'properties.bootstrap.servers' = 'redpanda:9092',
'format' = 'protobuf',
'protobuf.message-class' = 'com.yourcompany.YourProtobufClass',
'protobuf.descriptor-file' = 'file:///path/to/your_descriptor.desc'
);
4. Query Data from Redpanda
SELECT * FROM redpanda_table LIMIT 10;
5. Generate Protobuf Descriptor File (From .proto
File)
Since Redpanda is not exposing schema, generate a Protobuf descriptor file from your .proto
file manually:
- Generate it using
protoc
:protoc --descriptor_set_out=your_descriptor.desc --proto_path=./ your_message.proto
- Transfer
your_descriptor.desc
to EC2:scp your_descriptor.desc user@your-ec2:/home/user/
✅ Final Steps
- ✅ Start services:
podman-compose up -d
- ✅ Generate Protobuf class:
protoc --java_out=./generated your_message.proto
- ✅ Generate Protobuf descriptor:
protoc --descriptor_set_out=your_descriptor.desc your_message.proto
- ✅ Define Flink SQL table without schema registry
- ✅ Query Redpanda with
SELECT * FROM redpanda_table
No comments:
Post a Comment