Spektor?.dev

How To Rank Data From Kafka Using Apache Flink

December 10, 2021

Apache Flink

Sometimes data needs to be ranked according to top-n entities. For example, a company wants to find out its top 5 products per each country by the amount of products are sold. If all the product data resides in a single SQL table and the dataset is small then a single groupwise query can be run against the table. However the dataset can be quite large so that the cost of running such query can be prohibitive in a production environment. In addition the source of the data may be other than a database e.g.: a message queue, files etc. In such cases a processing engine like Apache Flink can be used. In this post I will show how to set up such processing pipeline with the help of only 3 (🔥) Flink SQL commands.

The pipeline will consume from a Kafka topic and output the result to a Postgres table. The input data will contain country and product information, where each Kafka message is sent after the product was bought. The results will be summarized in a table which will have 3 columns: country, product, purchases, specifically top 3 products by amount of purchases.

Flink has several API layers the most high level of which uses familiar SQL syntax while the SQL commands are run via its embedded SQL client, therefore there’s no need for even a line of Java/Scala code.

First we need to set up a Flink application cluster. By default official Flink docker image comes with a limited set of connectors. For the demo we’ll need a Kafka and JDBC connector as well as Postgres driver therefore we’ll extend the official docker image with the jar files of the connectors. Kafka connector can be downloaded here while JDBC connector and Postgres driver can be downloaded from here. Place the downloaded files in the same folder as the Dockerfile. In the Dockerfile the jar files are copied to /opt/flink/lib folder because Flink classpath is set to this folder:

FROM flink:1.14.0
COPY flink-connector-jdbc_2.12-1.14.0.jar /opt/flink/lib/flink-connector-jdbc_2.12-1.14.0.jar
COPY flink-sql-connector-kafka_2.11-1.14.0.jar /opt/flink/lib/flink-sql-connector-kafka_2.11-1.14.0.jar
COPY postgresql-42.3.1.jar /opt/flink/lib/postgresql-42.3.1.jar

Place the docker-compose.yml in the same folder as the Dockerfile:

version: "2.1"
services:
  jobmanager:
    build:
      context: .
      dockerfile: ./Dockerfile
    ports:
      - "8081:8081"
    command: jobmanager
    container_name: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    build:
      context: .
      dockerfile: ./Dockerfile
    depends_on:
      - jobmanager
    command: taskmanager
    container_name: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  kafka:
    image: obsidiandynamics/kafka
    restart: "no"
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  postgres:
    image: postgres:12.7
    container_name: postgres
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres

  adminer:
    image: adminer
    ports:
      - 8080:8080

The docker-compose.yml contains all the required services as well as adminer service which provides a web GUI for Postgres client.

  1. We’ll create the aggregations table in Postgres using adminer. Go to localhost:8080 and login (the password is postgres as well):

adminer

  1. Create the table:
CREATE TABLE IF NOT EXISTS aggregations
(
  country VARCHAR ( 50 ) NOT NULL,
  product VARCHAR ( 50 ) NOT NULL,
  purchases BIGINT NOT NULL,
  PRIMARY KEY (country, product)
);
  1. The Kafka topic called purchases should be created and populated with messages whose values are of JSON format as follows (messages can be created using kcat CLI utility):
{
  "country": "some country",
  "product": "some product"
}

Finally, we can enter Flink jobmanager docker container in order to interact with its SQL API. Run docker exec -it jobmanager ./bin/sql-client.sh. Once inside the Flink SQL shell we need to create a purchases table which uses source connector to Kafka:

CREATE TABLE purchases (
  country STRING,
  product STRING
) WITH (
   'connector' = 'kafka',
   'topic' = 'purchases',
   'properties.bootstrap.servers' = 'kafka:29092',
   'value.format' = 'json',
   'properties.group.id' = '1',
   'scan.startup.mode' = 'earliest-offset'
);

Next we create an aggregations table which will output the top 3 products by purchase volume to the corresponding Postgres table:

CREATE TABLE aggregations (
  `country` STRING,
  `product` STRING,
  `purchases` BIGINT NOT NULL,
  PRIMARY KEY (`country`, `product`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://postgres:5432/postgres?&user=postgres&password=postgres',
  'table-name' = 'aggregations'
);

Lastly, we start the processing by running the following query:

insert into aggregations
SELECT `country`, `product`, `purchases`
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY `purchases` DESC) AS row_num
  FROM (SELECT country, product, count(*) AS `purchases` FROM purchases GROUP BY country, product))
WHERE row_num <= 3;

The Flink Top-N query which was used above is results updating which means if we select all rows in aggregations table in Postgres we will receive the most up-to-date result of the processing (meaning that if previously product x had top amount of purchases but now there’re 3 other products which have more purchases than x it will disappear from aggregations table). It’s worth noting that the query above uses No Ranking Output Optimization that is the table indeed contains top 3 products by country by purchases volume however the results are not sorted by purchases. Since the aggregations table generally should not be large it can easily be sorted by the consuming application:

SELECT * FROM aggregations
ORDER BY country, purchases DESC

In one of my next posts I will describe how to run a similar aggregation using Flink Window Top-N query which is even cooler!