๐Ÿ“ฆ polyzos / advent-of-code-flink-paimon

๐Ÿ“„ Readme.md ยท 181 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181Stream Processing with Apache Flink and Paimon
----------------------------------------------

<p align="center">
    <img src="assets/cover.png" width="600" height="250">
</p>

This repository is a getting started guide for the **[25 Open Source Advent of Code]()**.


### Table of Contents
1. [Environment Setup](#environment-setup)
2. [Flink SQL Client](#flink-sql-client)
3. [Flink SQL Components](#flink-sql-components)
4. [Table Setup](#table-setup)


### Environment Setup
In order to run the tutorial you will need a Flink Cluster and the Apache Paimon dependencies.

We have provided a `docker-compose.yaml` file, that will spin up a Flink Cluster.
We also have a `Dockerfile` that builds on the official Flink image and add the required connector dependencies.

In order to spin-up a Flink cluster, all you have to do is run:
```shell
docker-compose up
```

When the cluster is up navigate to [localhost:8081](localhost:8081) and you should see the Flink Web UI.

**Note:** The docker containers also mount to your local directory, so you should see a directory `logs/flink` created.

All the data files will be created inside that directory.

### Flink SQL Client
In order to execute Flink SQL jobs, we will be using the Flink SQL Client.

Grab a terminal inside your JobManager container
```shell
docker exec -it jobmanager bash
```

then you can start a Flink SQL client, by running:
```shell
./bin/sql-client.sh
```

At this point you are ready to start interacting with Flink.

### Flink SQL Components
The high level component of Flink SQL is the catalog. It keeps metadata about databases, tables, functions and more.

By default the catalog is inMemory, which means everything gets lost when the session closes.

Run the following command:
```shell
Flink SQL> SHOW CATALOGS;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set
```
We can see we are under the `default_catalog` and if you run:

```shell
Flink SQL> SHOW DATABASES;
+------------------+
|    database name |
+------------------+
| default_database |
+------------------+
1 row in set
```
We can also see we have a `default_database`.

You can run more commands like `SHOW FUNCTIONS` and `SHOW VIEWS` to view the available functions and views.

### Table Setup
Throughout the tutorial we will needs some table and data to play with.

So let's go and create a few tables and generate some data. 

We will be using some mock data for sensor measurements and sensor information and we will use
Flink's build-in [datagen connector](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/datagen/) to achieve this.

```sql
CREATE TABLE measurements (
    sensor_id BIGINT,
    reading DECIMAL(5, 1)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1000',
    'fields.sensor_id.kind'='random',
    'fields.sensor_id.min'='0',
    'fields.sensor_id.max'='1000',
    'fields.reading.kind'='random',
    'fields.reading.min'='0.0',
    'fields.reading.max'='45.0'
);
```

Also run the following command. 
Thi basically changes the output mode of the client when trying to view results.
```shell
SET 'sql-client.execution.result-mode' = 'tableau';
```

Then let's verify we can query our table and see some `measurements` data.
```sql
link SQL> SELECT * FROM measurements LIMIT 10;
+----+----------------------+---------+
| op |            sensor_id | reading |
+----+----------------------+---------+
| +I |                68804 |    37.8 |
| +I |                62277 |    16.5 |
| +I |                70131 |    29.9 |
| +I |                74241 |     1.8 |
| +I |                59583 |     4.5 |
| +I |                10658 |    44.2 |
| +I |                 3125 |     3.5 |
| +I |                22844 |    29.3 |
| +I |                63999 |    13.2 |
| +I |                31699 |    11.4 |
+----+----------------------+---------+
Received a total of 10 rows
```

We will create another table called `sensor_info`.
```sql
CREATE TABLE sensor_info (
    sensor_id BIGINT,
    latitude DOUBLE PRECISION,
    longitude DOUBLE PRECISION,
    generation INT,
    updated_at TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'fields.sensor_id.kind'='sequence',
    'fields.sensor_id.start'='1',
    'fields.sensor_id.end'='1000',
    'fields.latitude.kind'='random',
    'fields.latitude.min'='-90.0',
    'fields.latitude.max'='90.0',
    'fields.longitude.kind'='random',
    'fields.longitude.min'='-180.0',
    'fields.longitude.max'='180.0',
    'fields.generation.kind'='random',
    'fields.generation.min'='0',
    'fields.generation.max'='3',
    'fields.updated_at.max-past'='0'
);
```
**Notice:** how we specify a sequence for `sensor_ids` of `100000` records.

This means that this table is `bounded`, i.e its not a never-ending stream with events.

```sql
Flink SQL> SELECT * FROM sensor_info LIMIT 10;
>
+----+----------------------+--------------------------------+--------------------------------+-------------+-------------------------+
| op |            sensor_id |                       latitude |                      longitude |  generation |              updated_at |
+----+----------------------+--------------------------------+--------------------------------+-------------+-------------------------+
| +I |                   80 |              81.86511315876018 |              54.55085194269276 |           2 | 2023-11-23 07:33:46.959 |
| +I |                    5 |              85.25579038990347 |              -141.729484364909 |           2 | 2023-11-23 07:33:46.960 |
| +I |                   81 |              59.28885905269882 |             125.25733278264516 |           1 | 2023-11-23 07:33:46.960 |
| +I |                   31 |             -68.93407946310646 |            -148.84245945377683 |           2 | 2023-11-23 07:33:46.960 |
| +I |                    1 |            -26.012451262689765 |              15.43214445471898 |           1 | 2023-11-23 07:33:46.960 |
| +I |                   64 |               52.5999653172305 |              46.85505502069498 |           0 | 2023-11-23 07:33:46.960 |
| +I |                   34 |             -29.75182311185338 |            -28.679515250258333 |           1 | 2023-11-23 07:33:46.960 |
| +I |                   35 |              70.47215865601012 |             -99.79991569639377 |           0 | 2023-11-23 07:33:46.960 |
| +I |                   25 |             20.618020925638717 |            -0.5002813485619697 |           3 | 2023-11-23 07:33:46.960 |
| +I |                   71 |              47.85809156450427 |             110.42800034166112 |           0 | 2023-11-23 07:33:46.960 |
+----+----------------------+--------------------------------+--------------------------------+-------------+-------------------------+
Received a total of 10 rows
```

With our tables containing some data, we are ready now to go and start with **Apache Paimon**.

You can find the tutorial guide under the `tutorial/guide.md` file.