https://github.com/polyzos/advent-of-code-flink-paimon.git
Stream Processing with Apache Flink and Paimon
This repository is a getting started guide for the [25 Open Source Advent of Code]().
We have provided a docker-compose.yaml file, that will spin up a Flink Cluster.
We also have a Dockerfile that builds on the official Flink image and add the required connector dependencies.
In order to spin-up a Flink cluster, all you have to do is run:
docker-compose up
When the cluster is up navigate to localhost:8081 and you should see the Flink Web UI.
Note: The docker containers also mount to your local directory, so you should see a directory logs/flink created.
All the data files will be created inside that directory.
Grab a terminal inside your JobManager container
docker exec -it jobmanager bash
then you can start a Flink SQL client, by running:
./bin/sql-client.sh
At this point you are ready to start interacting with Flink.
By default the catalog is inMemory, which means everything gets lost when the session closes.
Run the following command:
Flink SQL> SHOW CATALOGS;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set
We can see we are under the default_catalog and if you run:
Flink SQL> SHOW DATABASES;
+------------------+
| database name |
+------------------+
| default_database |
+------------------+
1 row in set
We can also see we have a default_database.
You can run more commands like SHOW FUNCTIONS and SHOW VIEWS to view the available functions and views.
So let's go and create a few tables and generate some data.
We will be using some mock data for sensor measurements and sensor information and we will use Flink's build-in datagen connector to achieve this.
CREATE TABLE measurements (
sensor_id BIGINT,
reading DECIMAL(5, 1)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000',
'fields.sensor_id.kind'='random',
'fields.sensor_id.min'='0',
'fields.sensor_id.max'='1000',
'fields.reading.kind'='random',
'fields.reading.min'='0.0',
'fields.reading.max'='45.0'
);
Also run the following command. Thi basically changes the output mode of the client when trying to view results.
SET 'sql-client.execution.result-mode' = 'tableau';
Then let's verify we can query our table and see some measurements data.
link SQL> SELECT * FROM measurements LIMIT 10;
+----+----------------------+---------+
| op | sensor_id | reading |
+----+----------------------+---------+
| +I | 68804 | 37.8 |
| +I | 62277 | 16.5 |
| +I | 70131 | 29.9 |
| +I | 74241 | 1.8 |
| +I | 59583 | 4.5 |
| +I | 10658 | 44.2 |
| +I | 3125 | 3.5 |
| +I | 22844 | 29.3 |
| +I | 63999 | 13.2 |
| +I | 31699 | 11.4 |
+----+----------------------+---------+
Received a total of 10 rows
We will create another table called sensor_info.
CREATE TABLE sensor_info (
sensor_id BIGINT,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
generation INT,
updated_at TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'fields.sensor_id.kind'='sequence',
'fields.sensor_id.start'='1',
'fields.sensor_id.end'='1000',
'fields.latitude.kind'='random',
'fields.latitude.min'='-90.0',
'fields.latitude.max'='90.0',
'fields.longitude.kind'='random',
'fields.longitude.min'='-180.0',
'fields.longitude.max'='180.0',
'fields.generation.kind'='random',
'fields.generation.min'='0',
'fields.generation.max'='3',
'fields.updated_at.max-past'='0'
);
Notice: how we specify a sequence for sensor_ids of 100000 records.
This means that this table is bounded, i.e its not a never-ending stream with events.
Flink SQL> SELECT * FROM sensor_info LIMIT 10;
>
+----+----------------------+--------------------------------+--------------------------------+-------------+-------------------------+
| op | sensor_id | latitude | longitude | generation | updated_at |
+----+----------------------+--------------------------------+--------------------------------+-------------+-------------------------+
| +I | 80 | 81.86511315876018 | 54.55085194269276 | 2 | 2023-11-23 07:33:46.959 |
| +I | 5 | 85.25579038990347 | -141.729484364909 | 2 | 2023-11-23 07:33:46.960 |
| +I | 81 | 59.28885905269882 | 125.25733278264516 | 1 | 2023-11-23 07:33:46.960 |
| +I | 31 | -68.93407946310646 | -148.84245945377683 | 2 | 2023-11-23 07:33:46.960 |
| +I | 1 | -26.012451262689765 | 15.43214445471898 | 1 | 2023-11-23 07:33:46.960 |
| +I | 64 | 52.5999653172305 | 46.85505502069498 | 0 | 2023-11-23 07:33:46.960 |
| +I | 34 | -29.75182311185338 | -28.679515250258333 | 1 | 2023-11-23 07:33:46.960 |
| +I | 35 | 70.47215865601012 | -99.79991569639377 | 0 | 2023-11-23 07:33:46.960 |
| +I | 25 | 20.618020925638717 | -0.5002813485619697 | 3 | 2023-11-23 07:33:46.960 |
| +I | 71 | 47.85809156450427 | 110.42800034166112 | 0 | 2023-11-23 07:33:46.960 |
+----+----------------------+--------------------------------+--------------------------------+-------------+-------------------------+
Received a total of 10 rows
With our tables containing some data, we are ready now to go and start with Apache Paimon.
You can find the tutorial guide under the tutorial/guide.md file.