📦 apache / fluss

📄 getting-started.md · 226 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226---
sidebar_label: "Getting Started"
title: "Getting Started with Flink"
sidebar_position: 1
---

# Getting Started with Flink Engine
## Quick Start
For a quick introduction to running Flink, refer to the [Quick Start](quickstart/flink.md) guide.


## Supported Flink Versions
| Fluss Connector Versions | Supported Flink Versions |
|--------------------------|--------------------------| 
| $FLUSS_VERSION_SHORT$    | 1.18, 1.19, 1.20         |


## Feature Support
Fluss supports Apache Flink's Table API and Flink's DataStream API.

For Flink's Table API, Fluss supports the following features:

| Feature Support                                                 | Flink | Notes                                    |
|-----------------------------------------------------------------|-------|------------------------------------------|
| [SQL Create Catalog](ddl.md#create-catalog)                     | ✔️    |                                          |
| [SQL Create Database](ddl.md#create-database)                   | ✔️    |                                          |
| [SQL Drop Database](ddl.md#drop-database)                       | ✔️    |                                          |
| [SQL Create Table](ddl.md#create-table)                         | ✔️    |                                          |
| [SQL Create Table Like](ddl.md#create-table-like)               | ✔️    |                                          |
| [SQL Drop Table](ddl.md#drop-table)                             | ✔️    |                                          |
| [SQL Alter Table](ddl.md#alter-table)                           | ✔️    | SET/RESET properties                     |
| [SQL Create Materialized Table](ddl.md#materialized-table)      | ✔️    | Continuous refresh mode only             |
| [SQL Alter Materialized Table](ddl.md#alter-materialized-table) | ✔️    | Suspend/Resume support                   |
| [SQL Drop Materialized Table](ddl.md#drop-materialized-table)   | ✔️    |                                          |
| [SQL Show Partitions](ddl.md#show-partitions)                   | ✔️    |                                          |
| [SQL Add Partition](ddl.md#add-partition)                       | ✔️    |                                          |
| [SQL Drop Partition](ddl.md#drop-partition)                     | ✔️    |                                          |
| [Procedures](ddl.md#procedures)                                 | ✔️    | ACL management and cluster configuration |
| [SQL Select](reads.md)                                          | ✔️    | Support both streaming and batch mode.   |
| [SQL Limit](reads.md#limit-read)                                | ✔️    | Only for Log Table                       |
| [SQL Insert Into](writes.md)                                    | ✔️    | Support both streaming and batch mode.   |
| [SQL Delete From](writes.md#delete-from)                        | ✔️    | Only in batch mode.                      |
| [SQL Update](writes.md#update)                                  | ✔️    | Only in batch mode.                      |
| [SQL Lookup Join](lookups.md)                                   | ✔️    |                                          |

For Flink's DataStream API, you can see [DataStream API](docs/engine-flink/datastream.mdx) for more details.

## Preparation when using Flink SQL Client
- **Download Flink**

Flink runs on all UNIX-like environments, i.e., Linux, Mac OS X, and Cygwin (for Windows).
If you haven’t downloaded Flink, you can download [the binary release](https://flink.apache.org/downloads.html) of Flink, then extract the archive with the following command.
```shell
tar -xzf flink-1.20.3-bin-scala_2.12.tgz
```
- **Copy Fluss Flink Bundled Jar**

Download [Fluss Flink Bundled jar](/downloads) and copy to the `lib` directory of your Flink home.

```shell
cp fluss-flink-$FLUSS_VERSION$.jar <FLINK_HOME>/lib/
```
:::note
If you use [Amazon S3](http://aws.amazon.com/s3/), [Aliyun OSS](https://www.aliyun.com/product/oss) or [HDFS(Hadoop Distributed File System)](https://hadoop.apache.org/docs/stable/) as Fluss's [remote storage](maintenance/tiered-storage/remote-storage.md),
you should download the corresponding [Fluss filesystem jar](/downloads#filesystem-jars) and also copy it to the lib directory of your Flink home.
:::

- **Start a local cluster**

To start a local cluster, run the bash script that comes with Flink:
```shell
<FLINK_HOME>/bin/start-cluster.sh
```
You should be able to navigate to the web UI at [localhost:8081](http://localhost:8081/) to view the Flink dashboard and see that the cluster is up and running. You can also check its status with the following command:
```shell
ps aux | grep flink
```
- **Start a SQL Client**

To quickly stop the cluster and all running components, you can use the provided script:
```shell
<FLINK_HOME>/bin/sql-client.sh
```


## Creating a Catalog
You can use the following SQL statement to create a catalog.
```sql title="Flink SQL"
CREATE CATALOG fluss_catalog WITH (
  'type' = 'fluss',
  'bootstrap.servers' = 'localhost:9123'
);
```

:::note
1. The `bootstrap.servers` means the Fluss server address. Before you config the `bootstrap.servers`,
   you should start the Fluss server first. See [Deploying Fluss](install-deploy/overview.md#how-to-deploy-fluss)
   for how to build a Fluss cluster.
   Here, it is assumed that there is a Fluss cluster running on your local machine and the CoordinatorServer port is 9123.
2. The`bootstrap.servers` configuration is used to discover all nodes within the Fluss cluster. It can be set with one or more (up to three) Fluss server addresses (either CoordinatorServer or TabletServer) separated by commas.
:::

## Creating a Table
```sql title="Flink SQL"
USE CATALOG fluss_catalog;
```

```sql title="Flink SQL"
CREATE TABLE pk_table (
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
  'bucket.num' = '4'
);
```

## Data Writing
To append new data to a table, you can use `INSERT INTO` in batch mode or streaming mode:
```sql title="Flink SQL"
-- Execute the flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
```

```sql title="Flink SQL"
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
```

```sql title="Flink SQL"
INSERT INTO pk_table VALUES
  (1234, 1234, 1, 1),
  (12345, 12345, 2, 2),
  (123456, 123456, 3, 3);
```

To update data record with the primary key (1234, 1234) in a Flink streaming job, use the UPDATE statement as follows:

```sql title="Flink SQL"
-- should run in batch mode
UPDATE pk_table SET total_amount = 4 WHERE shop_id = 1234 and user_id = 1234;
```

To delete the data record with primary key `(12345, 12345)`, use `DELETE FROM`:

```sql title="Flink SQL"
-- should run in batch mode
DELETE FROM pk_table WHERE shop_id = 12345 and user_id = 12345;
```

## Data Reading

To retrieve data with the primary key `(1234, 1234)`, you can perform a point query by applying a filter on the primary key:
```sql title="Flink SQL"
-- should run in batch mode
SELECT * FROM pk_table WHERE shop_id = 1234 and user_id = 1234;
```

To preview a subset of the data in a table, you can use a `LIMIT` clause.
```sql title="Flink SQL"
-- should run in batch mode
SELECT * FROM pk_table LIMIT 10;
```

Fluss supports processing incremental data reading in flink streaming jobs:
```sql title="Flink SQL"
-- Submit the flink job in streaming mode for current session.
SET 'execution.runtime-mode' = 'streaming';
```

```sql title="Flink SQL"
-- reading changelogs from the primary-key table from beginning.
SELECT * FROM pk_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
```

## Type Conversion
Fluss's integration for Flink automatically converts between Flink and Fluss types.

### Fluss -> Apache Flink

| Fluss         | Flink         |
|---------------|---------------|
| BOOLEAN       | BOOLEAN       |
| TINYINT       | TINYINT       |
| SMALLINT      | SMALLINT      |
| INT           | INT           |
| BIGINT        | BIGINT        |
| FLOAT         | FLOAT         |
| DOUBLE        | DOUBLE        |
| CHAR          | CHAR          |
| STRING        | STRING        |
| DECIMAL       | DECIMAL       |
| DATE          | DATE          |
| TIME          | TIME          |
| TIMESTAMP     | TIMESTAMP     |
| TIMESTAMP_LTZ | TIMESTAMP_LTZ |
| BYTES         | BYTES         |

### Apache Flink -> Fluss

| Flink         | Fluss                                         | 
|---------------|-----------------------------------------------|
| BOOLEAN       | BOOLEAN                                       |       
| SMALLINT      | SMALLINT                                      |
| INT           | INT                                           |
| BIGINT        | BIGINT                                        |
| FLOAT         | FLOAT                                         |
| DOUBLE        | DOUBLE                                        |
| CHAR          | CHAR                                          |
| STRING        | STRING                                        |
| DECIMAL       | DECIMAL                                       |
| DATE          | DATE                                          |
| TIME          | TIME                                          |
| TIMESTAMP     | TIMESTAMP                                     |
| TIMESTAMP_LTZ | TIMESTAMP_LTZ                                 |
| BYTES         | BYTES                                         |
| VARCHAR       | Not supported, suggest to use STRING instead. |
| VARBINARY     | Not supported, suggest to use BYTES instead.  |
| INTERVAL      | Not supported                                 |
| ARRAY         | Not supported                                 |
| MAP           | Not supported                                 |
| MULTISET      | Not supported                                 |
| ROW           | Not supported                                 |
| RAW           | Not supported                                 |