Thursday, 27 February 2025

How to Get and Configure the Flink JDBC Driver for DBeaver

 

How to Get and Configure the Flink JDBC Driver for DBeaver

Where to Get the Flink JDBC Driver

Since Flink does not provide an official pre-built JDBC driver, you need to obtain the Flink SQL Gateway JDBC Driver manually. Here are different ways to get it:


Option 1: Download from Maven Repository (Recommended)

You can download the Flink SQL Gateway JDBC driver from the official Apache Flink Maven repository.

Steps to Download from Maven

  1. Visit the Apache Flink Maven Repository
  2. Search for flink-sql-gateway-driver
  3. Download the latest jar file (e.g., flink-sql-gateway-driver-1.17.0.jar)
  4. Place the downloaded .jar file in a location where DBeaver can access it.

Option 2: Extract from a Flink Distribution (If Available)

If you already installed Flink manually, you can check if the driver is included.

Steps to Locate the Driver in Your Flink Installation

  1. If you have a Flink installation on your local system, check:
    ls $FLINK_HOME/lib/
    
  2. Look for a file like:
    flink-sql-gateway-driver-*.jar
    
  3. If found, copy it to your local system for DBeaver.

Option 3: Build the Driver from Source (Advanced Users)

If the pre-built JAR is not available, you can build it manually.

Steps to Build from Source

  1. Clone the Flink repository:
    git clone https://github.com/apache/flink.git
    cd flink
    
  2. Build the driver:
    mvn clean package -DskipTests -pl flink-table/flink-sql-gateway -am
    
  3. After the build completes, the JAR will be located in:
    flink-table/flink-sql-gateway/target/flink-sql-gateway-driver-*.jar
    
  4. Copy the JAR to your local system.

Adding the JDBC Driver to DBeaver

Once you have the JAR file, follow these steps:

  1. Open DBeaver
  2. Go to DatabaseDriver Manager
  3. Click New → Enter:
    • Driver Name: Flink JDBC
    • Class Name: org.apache.flink.table.client.gateway.JdbcDriver
    • URL Template: jdbc:flink://<jobmanager-host>:8083
  4. Click Add File, then select the downloaded JDBC JAR file
  5. Click OK to save

Now, when you create a new connection in DBeaver, you should be able to select Flink JDBC and connect.


Step-by-Step Guide to Access Flink SQL via JDBC in DBeaver

Step 1: Ensure Flink SQL Gateway is Running

Since you're manually starting the SQL Gateway inside the JobManager, verify it's running:

podman exec -it flink-jobmanager ps aux | grep sql-gateway

If it’s not running, start it inside the JobManager container:

podman exec -it flink-jobmanager /opt/flink/bin/sql-gateway.sh start

Check logs to confirm it started successfully:

podman logs flink-jobmanager | grep "Started SQL Gateway"

By default, the Flink SQL Gateway listens on port 8083.


Step 2: Expose Port 8083 (If Not Already Exposed)

Since your SQL Gateway runs inside the JobManager, ensure port 8083 is exposed for external access:

  1. Check running ports:

    podman port flink-jobmanager
    

    If 8083 is not listed, restart the container with port mapping:

    podman stop flink-jobmanager
    podman rm flink-jobmanager
    podman run -d --name flink-jobmanager -p 8081:8081 -p 8083:8083 flink:latest jobmanager
    
  2. Verify port accessibility from your machine:

    curl http://<jobmanager-host>:8083/v1/info
    

    Replace <jobmanager-host> with the actual hostname or IP.


Step 3: Create a New Connection in DBeaver

  1. Open DBeaver → Click New Database Connection
  2. Choose Flink JDBC (the one you created above)
  3. Enter:
    • URL: jdbc:flink://<jobmanager-host>:8083
    • Username: (leave empty)
    • Password: (leave empty)
  4. Click Test Connection
  5. If successful, click Finish

Step 4: Execute Flink SQL Queries in DBeaver

Now, you can execute SQL queries in DBeaver’s SQL Editor, like:

SHOW TABLES;
SELECT * FROM my_kafka_table LIMIT 10;

Troubleshooting

If you get a driver error:

  1. Check SQL Gateway logs
    podman logs flink-jobmanager | grep "SQL Gateway"
    
  2. Ensure JDBC driver is added in DBeaver
  3. Verify Flink SQL Gateway is reachable
    curl http://<jobmanager-host>:8083/v1/info
    
  4. Restart SQL Gateway if needed
    podman exec -it flink-jobmanager /opt/flink/bin/sql-gateway.sh stop
    podman exec -it flink-jobmanager /opt/flink/bin/sql-gateway.sh start
    

Now your Flink SQL Gateway should be accessible via JDBC in DBeaver.

Wednesday, 26 February 2025

Setting Up Aqua Studio with Apache Flink SQL Gateway

 

Setting Up Aqua Studio with Apache Flink SQL Gateway

This document outlines the steps to connect Aqua Studio to Apache Flink using the JDBC URL configured for the SQL Gateway.

Prerequisites

  • Podman installed on your machine.
  • Apache Flink image available from Docker Hub.

Step 1: Start Apache Flink with SQL Gateway

Start the JobManager Container

Run the Flink JobManager container and enable the SQL Gateway by passing environment variables:


podman run -d \ --name flink-jobmanager \ -p 8081:8081 \ # Port for the Flink Web UI -p 8082:8082 \ # Port for the SQL Gateway -e FLINK_SQL_GATEWAY_ENABLED=true \ # Enable the SQL Gateway -e FLINK_SQL_GATEWAY_PORT=8082 \ # Set the SQL Gateway port apache/flink:latest jobmanager

Start the TaskManager Container

Run the Flink TaskManager container, linking it to the JobManager:


podman run -d \ --name flink-taskmanager \ --link flink-jobmanager:jobmanager \ # Link to the JobManager -e JOBMANAGER_HOST=jobmanager \ # Specify the JobManager host -e JOBMANAGER_PORT=8081 \ # Specify the JobManager port apache/flink:latest taskmanager

Verify the SQL Gateway

  1. Open your web browser.
  2. Navigate to http://localhost:8082 to access the SQL Gateway web interface.

Step 2: Download the Flink JDBC Connector

  1. Go to the Apache Flink download page.
  2. Choose the desired version of Flink and download the binary package (e.g., flink-<version>-bin-scala_<scala_version>.tgz).

Extract the Flink Package

  1. Once the download is complete, extract the package using:

    tar -xzf flink-<version>-bin-scala_<scala_version>.tgz
  2. Change into the extracted directory:

    cd flink-<version>

Locate the JDBC Connector JAR

  1. Inside the extracted Flink directory, navigate to the lib folder:

    cd lib
  2. Look for the JDBC connector JAR file, typically named flink-sql-connector-jdbc-<version>.jar.

Step 3: Set Up Aqua Studio

Open Aqua Studio

  1. Launch Aqua Studio on your computer.

Create a New Connection

  1. In Aqua Studio, navigate to the Connections section.
  2. Click on Add Connection or a similar option to create a new database connection.

Configure the Connection Settings

  1. Connection Type: Select JDBC as the connection type.
  2. JDBC URL: Enter the JDBC URL for the Flink SQL Gateway:

    jdbc:flink://localhost:8082
  3. Driver Class: Specify the JDBC driver class:

    org.apache.flink.streaming.connectors.jdbc.JdbcConnection
  4. Name: Give a meaningful name to your connection (e.g., "Flink SQL Gateway").

Configure the Driver

  1. Add the JDBC connector JAR file (flink-sql-connector-jdbc-<version>.jar) to the connection configuration if prompted.

Test the Connection

  1. Use the Test Connection feature in Aqua Studio to verify the connection.
  2. Ensure you receive a confirmation message indicating success.

Step 4: Execute SQL Queries

Once connected, you can start writing and executing SQL queries against your Flink tables and data streams.

Example Query

You can execute a simple SQL query like:


SELECT * FROM your_table_name;

Troubleshooting

If you encounter any issues:

  • Ensure the Flink SQL Gateway is running and accessible.
  • Check Aqua Studio logs for any connection errors.
  • Verify network settings and firewall rules that might block the connection.

Setting Up Apache Flink with SQL Gateway Using Podman

 

Setting Up Apache Flink with SQL Gateway Using Podman

This document outlines the steps to configure Apache Flink with SQL Gateway running in Podman containers.

Prerequisites

  • Podman installed on your machine.
  • Apache Flink image available from the Docker Hub.

Step 1: Start the JobManager Container

Run the Flink JobManager container and enable the SQL Gateway by passing environment variables:


podman run -d \ --name flink-jobmanager \ -p 8081:8081 \ # Port for the Flink Web UI -p 8082:8082 \ # Port for the SQL Gateway -e FLINK_SQL_GATEWAY_ENABLED=true \ # Enable the SQL Gateway -e FLINK_SQL_GATEWAY_PORT=8082 \ # Set the SQL Gateway port apache/flink:latest jobmanager

Step 2: Start the TaskManager Container

Run the Flink TaskManager container, linking it to the JobManager:


podman run -d \ --name flink-taskmanager \ --link flink-jobmanager:jobmanager \ # Link to the JobManager -e JOBMANAGER_HOST=jobmanager \ # Specify the JobManager host -e JOBMANAGER_PORT=8081 \ # Specify the JobManager port apache/flink:latest taskmanager

Step 3: Verify the SQL Gateway

  1. Open your web browser.
  2. Navigate to http://localhost:8082 to access the SQL Gateway web interface.

Step 4: Connect to the SQL Gateway

You can connect to the Flink SQL Gateway using a SQL client (e.g., Aqua Studio) that supports JDBC.

  • JDBC URL:


    jdbc:flink://localhost:8082
  • Username and Password: Leave these fields blank if you haven't set authentication.

Step 5: Write and Execute SQL Queries

Once connected, you can write and execute SQL queries against your Flink tables and data streams.

Example Query

You can start with a simple query like:

sql

SELECT * FROM your_table_name;

Additional Configuration (Optional)

  • Persistent Storage: To persist data, consider mounting volumes for the Flink containers.
  • Environment Variables: You can set additional environment variables for specific configurations as needed.

Sunday, 23 February 2025

Redpanda with Flink and Podman compose

 

Running Flink SQL on Redpanda with Protobuf using Podman

1. Set Up Flink and Redpanda with Podman

Create a Podman Compose File

Save the following as podman-compose.yml:

version: '3.7'
services:
  redpanda:
    image: docker.redpanda.com/vectorized/redpanda:v23.2.1
    container_name: redpanda
    command:
      - redpanda start
      - --smp 1
      - --memory 1G
      - --overprovisioned
      - --node-id 0
      - --check=false
      - --kafka-addr PLAINTEXT://0.0.0.0:9092
      - --advertise-kafka-addr PLAINTEXT://redpanda:9092
      - --rpc-addr 0.0.0.0:33145
      - --advertise-rpc-addr redpanda:33145
      - --schema-registry-addr 0.0.0.0:8081
    ports:
      - "9092:9092"
      - "8081:8081"
    networks:
      - flink-net

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    container_name: schema-registry
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "redpanda:9092"
      SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
    ports:
      - "8081:8081"
    depends_on:
      - redpanda
    networks:
      - flink-net

  flink-jobmanager:
    image: flink:1.16-scala_2.12
    container_name: flink-jobmanager
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
    ports:
      - "8081:8081"
    networks:
      - flink-net

  flink-taskmanager:
    image: flink:1.16-scala_2.12
    container_name: flink-taskmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
    depends_on:
      - flink-jobmanager
    networks:
      - flink-net

networks:
  flink-net:
    driver: bridge

Run Podman Compose

podman-compose up -d

2. Generate Protobuf Message Class

Install protoc

On Windows:

choco install protoc

On Linux:

sudo dnf install protobuf-compiler

Generate Java Class from .proto

  1. Create your Protobuf file (your_message.proto):

    syntax = "proto3";
    package com.yourcompany;
    
    message YourProtobufClass {
        int64 id = 1;
        string name = 2;
        map<string, string> data = 3;
    }
    
  2. Generate Java class:

    protoc --java_out=./generated your_message.proto
    
  3. Transfer the generated/ folder to your Flink instance:

    scp -r generated user@your-ec2:/home/user/
    

3. Define Flink SQL Table for Redpanda

Open Flink SQL Client

  1. Get the container name:
    podman ps
    
  2. Enter the Flink JobManager container:
    podman exec -it flink-jobmanager /bin/sh
    
  3. Start Flink SQL Client:
    cd /opt/flink
    ./bin/sql-client.sh
    

Create Table for Redpanda

CREATE TABLE redpanda_table (
    id BIGINT,
    name STRING,
    data MAP<STRING, STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'your_protobuf_topic',
    'properties.bootstrap.servers' = 'redpanda:9092',
    'format' = 'protobuf',
    'protobuf.message-class' = 'com.yourcompany.YourProtobufClass',
    'protobuf.descriptor-file' = 'file:///path/to/your_descriptor.desc'
);

4. Query Data from Redpanda

SELECT * FROM redpanda_table LIMIT 10;

5. Generate Protobuf Descriptor File

  1. Generate it using protoc:
    protoc --descriptor_set_out=your_descriptor.desc --proto_path=./ your_message.proto
    
  2. Transfer your_descriptor.desc to EC2:
    scp your_descriptor.desc user@your-ec2:/home/user/
    

✅ Final Steps

  • ✅ Start services: podman-compose up -d
  • ✅ Generate Protobuf class: protoc --java_out=./generated your_message.proto
  • ✅ Generate Protobuf descriptor: protoc --descriptor_set_out=your_descriptor.desc your_message.proto
  • ✅ Define Flink SQL table
  • ✅ Query Redpanda with SELECT * FROM redpanda_table



    ________________________________________________


without schema



Running Flink SQL on Redpanda with Protobuf using Podman

1. Set Up Flink and Redpanda with Podman

Create a Podman Compose File

Save the following as podman-compose.yml:

version: '3.7'
services:
  redpanda:
    image: docker.redpanda.com/vectorized/redpanda:v23.2.1
    container_name: redpanda
    command:
      - redpanda start
      - --smp 1
      - --memory 1G
      - --overprovisioned
      - --node-id 0
      - --check=false
      - --kafka-addr PLAINTEXT://0.0.0.0:9092
      - --advertise-kafka-addr PLAINTEXT://redpanda:9092
      - --rpc-addr 0.0.0.0:33145
      - --advertise-rpc-addr redpanda:33145
    ports:
      - "9092:9092"
    networks:
      - flink-net

  flink-jobmanager:
    image: flink:1.16-scala_2.12
    container_name: flink-jobmanager
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
    ports:
      - "8081:8081"
    networks:
      - flink-net

  flink-taskmanager:
    image: flink:1.16-scala_2.12
    container_name: flink-taskmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
    depends_on:
      - flink-jobmanager
    networks:
      - flink-net

networks:
  flink-net:
    driver: bridge

Run Podman Compose

podman-compose up -d

2. Generate Protobuf Message Class

Install protoc

On Windows:

choco install protoc

On Linux:

sudo dnf install protobuf-compiler

Generate Java Class from .proto

  1. Create your Protobuf file (your_message.proto):

    syntax = "proto3";
    package com.yourcompany;
    
    message YourProtobufClass {
        int64 id = 1;
        string name = 2;
        map<string, string> data = 3;
    }
    
  2. Generate Java class:

    protoc --java_out=./generated your_message.proto
    
  3. Transfer the generated/ folder to your Flink instance:

    scp -r generated user@your-ec2:/home/user/
    

3. Define Flink SQL Table for Redpanda

Open Flink SQL Client

  1. Get the container name:
    podman ps
    
  2. Enter the Flink JobManager container:
    podman exec -it flink-jobmanager /bin/sh
    
  3. Start Flink SQL Client:
    cd /opt/flink
    ./bin/sql-client.sh
    

Create Table for Redpanda (Without Schema Registry)

CREATE TABLE redpanda_table (
    id BIGINT,
    name STRING,
    data MAP<STRING, STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'your_protobuf_topic',
    'properties.bootstrap.servers' = 'redpanda:9092',
    'format' = 'protobuf',
    'protobuf.message-class' = 'com.yourcompany.YourProtobufClass',
    'protobuf.descriptor-file' = 'file:///path/to/your_descriptor.desc'
);

4. Query Data from Redpanda

SELECT * FROM redpanda_table LIMIT 10;

5. Generate Protobuf Descriptor File (From .proto File)

Since Redpanda is not exposing schema, generate a Protobuf descriptor file from your .proto file manually:

  1. Generate it using protoc:
    protoc --descriptor_set_out=your_descriptor.desc --proto_path=./ your_message.proto
    
  2. Transfer your_descriptor.desc to EC2:
    scp your_descriptor.desc user@your-ec2:/home/user/
    

✅ Final Steps

  • ✅ Start services: podman-compose up -d
  • ✅ Generate Protobuf class: protoc --java_out=./generated your_message.proto
  • ✅ Generate Protobuf descriptor: protoc --descriptor_set_out=your_descriptor.desc your_message.proto
  • ✅ Define Flink SQL table without schema registry
  • ✅ Query Redpanda with SELECT * FROM redpanda_table

Monday, 17 February 2025

Trino with Redpanda

 Step 1: Prepare Installation Files on a Machine with Internet Access

  1. Download Trino Server

    • Visit Trino Releases and download the latest Trino server .tar.gz file.
    • Example (for version 433):
      wget https://repo1.maven.org/maven2/io/trino/trino-server/433/trino-server-433.tar.gz
      
  2. Download Trino CLI

    • Example:
      wget https://repo1.maven.org/maven2/io/trino/trino-cli/433/trino-cli-433-executable.jar -O trino
      chmod +x trino
      
  3. Download Required Connectors

    • Download the Kafka connector:
      wget https://repo1.maven.org/maven2/io/trino/trino-kafka/433/trino-kafka-433.tar.gz
      
    • Download the Protobuf decoder:
      wget https://repo1.maven.org/maven2/io/trino/trino-kafka-protobuf-decoder/433/trino-kafka-protobuf-decoder-433.tar.gz
      
  4. Transfer the Files to AWS EC2 Instance

    • Use scp or SFTP to copy the downloaded files to the EC2 instance.

Step 2: Install and Configure Trino on AWS EC2

  1. Extract Trino

    tar -xvzf trino-server-433.tar.gz
    mv trino-server-433 /opt/trino
    
  2. Create Trino Configuration Directories

    mkdir -p /opt/trino/etc
    
  3. Create config.properties

    coordinator=true
    node-scheduler.include-coordinator=true
    http-server.http.port=8080
    discovery-server.enabled=true
    discovery.uri=http://localhost:8080
    
  4. Create node.properties

    node.environment=production
    node.id=trino-node-1
    node.data-dir=/var/trino/data
    
  5. Create jvm.config

    -server
    -Xmx4G
    -XX:+UseG1GC
    -XX:+ExplicitGCInvokesConcurrent
    
  6. Configure Kafka Connector for Redpanda

    mkdir -p /opt/trino/etc/catalog
    
    • Create /opt/trino/etc/catalog/kafka.properties:
      connector.name=kafka
      kafka.nodes=<Redpanda_Broker>:9092
      kafka.table-names-matching=.*
      kafka.default-schema=default
      kafka.messages-format=raw
      
  7. Move Downloaded Connectors

    tar -xvzf trino-kafka-433.tar.gz -C /opt/trino/plugin
    tar -xvzf trino-kafka-protobuf-decoder-433.tar.gz -C /opt/trino/plugin
    

Step 3: Set Up Local Schema Registry

  1. Download and Transfer Schema Registry

    wget https://packages.confluent.io/archive/7.5/confluent-community-7.5.0.tar.gz
    
    • Transfer this file to the EC2 instance.
  2. Install Schema Registry on EC2

    tar -xvzf confluent-community-7.5.0.tar.gz
    mv confluent-7.5.0 /opt/schema-registry
    
  3. Configure Schema Registry

    • Create /opt/schema-registry/etc/schema-registry/schema-registry.properties:
      listeners=http://0.0.0.0:8081
      kafkastore.bootstrap.servers=<Redpanda_Broker>:9092
      kafkastore.topic=_schemas
      debug=false
      
  4. Start Schema Registry

    /opt/schema-registry/bin/schema-registry-start /opt/schema-registry/etc/schema-registry/schema-registry.properties &
    
  5. Verify Schema Registry

    curl http://localhost:8081/subjects
    

Step 4: Configure Trino to Use Local Schema Registry

  1. Update /opt/trino/etc/catalog/kafka.properties

    connector.name=kafka
    kafka.nodes=<Redpanda_Broker>:9092
    kafka.default-schema=default
    kafka.messages-format=protobuf
    kafka.schema-registry-url=http://localhost:8081
    
  2. Restart Trino

    /opt/trino/bin/launcher restart
    
  3. Test Query on Redpanda

    ./trino --server http://localhost:8080 --catalog kafka --schema default
    SELECT * FROM kafka.your_protobuf_topic LIMIT 10;
    

Now, Trino should be connected to Redpanda, allowing SQL queries on your Protobuf topic.

Thursday, 13 February 2025

CDK for EMR

 

Step 1: Install Required Software

  1. Install Node.js

    • Download and install Node.js.
    • Verify installation:
      node -v
      
  2. Install AWS CDK
    Open PowerShell as Administrator and run:

    npm install -g aws-cdk
    
    • Verify installation:
      cdk --version
      
  3. Install AWS CLI


Step 2: Set Up a CDK Project

  1. Create a new directory and navigate into it

    mkdir my-emr-cdk
    cd my-emr-cdk
    
  2. Initialize a CDK project (TypeScript)

    cdk init app --language=typescript
    
  3. Install dependencies for EMR

    npm install @aws-cdk/aws-emr @aws-cdk/aws-iam
    

Step 3: Define the EMR Cluster

  • Open lib/my-emr-cdk-stack.ts using a code editor like VS Code or Notepad++.
  • Replace the contents with:
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as emr from 'aws-cdk-lib/aws-emr';
import * as iam from 'aws-cdk-lib/aws-iam';

export class MyEmrCdkStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // IAM Role for EMR Cluster
    const emrRole = new iam.Role(this, 'EMRClusterRole', {
      assumedBy: new iam.ServicePrincipal('elasticmapreduce.amazonaws.com'),
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonElasticMapReduceRole'),
      ],
    });

    // Define EMR Cluster
    const emrCluster = new emr.CfnCluster(this, 'MyEMRCluster', {
      name: 'MyCDKEMRCluster',
      releaseLabel: 'emr-6.9.0',
      applications: [{ name: 'Hadoop' }, { name: 'Spark' }],
      instances: {
        masterInstanceGroup: {
          instanceType: 'm5.xlarge',
          instanceCount: 1,
        },
        coreInstanceGroup: {
          instanceType: 'm5.xlarge',
          instanceCount: 2,
        },
      },
      jobFlowRole: emrRole.roleArn,
      serviceRole: emrRole.roleArn,
      visibleToAllUsers: true,
    });
  }
}

Step 4: Deploy the CDK Stack

  1. Bootstrap AWS CDK

    cdk bootstrap
    
  2. Synthesize CloudFormation template

    cdk synth
    
  3. Deploy the EMR cluster

    cdk deploy
    

Your EMR cluster should now be created!