Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] Split up multi-command Flink SQL code listings into separate code listings #256

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions website/docs/engine-flink/ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ sidebar_position: 2

## Create Catalog
Fluss supports creating and managing tables through the Fluss Catalog.
```sql
```sql title="Flink SQL"
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);
```

```sql title="Flink SQL"
USE CATALOG fluss_catalog;
```
The following properties can be set if using the Fluss catalog:
Expand All @@ -29,18 +31,24 @@ The following introduced statements assuming the the current catalog is switched

By default, FlussCatalog will use the `fluss` database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default `fluss` database:

```sql
```sql title="Flink SQL"
CREATE DATABASE my_db;
```

```sql title="Flink SQL"
USE my_db;
```

## Drop Database

To delete a database, this will drop all the tables in the database as well:

```sql
```sql title="Flink SQL"
-- Flink doesn't allow drop current database, switch to Fluss default database
USE fluss;
```

```sql title="Flink SQL"
-- drop the database
DROP DATABASE my_db;
```
Expand All @@ -50,7 +58,7 @@ DROP DATABASE my_db;
### PrimaryKey Table

The following SQL statement will create a [PrimaryKey Table](table-design/table-types/pk-table.md) with a primary key consisting of shop_id and user_id.
```sql
```sql title="Flink SQL"
CREATE TABLE my_pk_table (
shop_id BIGINT,
user_id BIGINT,
Expand All @@ -66,7 +74,7 @@ CREATE TABLE my_pk_table (

The following SQL statement creates a [Log Table](table-design/table-types/log-table.md) by not specifying primary key clause.

```sql
```sql title="Flink SQL"
CREATE TABLE my_log_table (
order_id BIGINT,
item_id BIGINT,
Expand All @@ -86,7 +94,7 @@ Currently, Fluss only supports one partitioned field with `STRING` type.
Currently, partitioned table must enable auto partition and set auto partition time unit.
:::

```sql
```sql title="Flink SQL"
CREATE TABLE my_part_pk_table (
dt STRING,
shop_id BIGINT,
Expand All @@ -103,7 +111,7 @@ CREATE TABLE my_part_pk_table (

The following SQL statement creates a Partitioned Log Table in Fluss.

```sql
```sql title="Flink SQL"
CREATE TABLE my_part_log_table (
order_id BIGINT,
item_id BIGINT,
Expand Down Expand Up @@ -131,7 +139,7 @@ The supported option in "with" parameters when creating a table are as follows:

To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`.

```sql
```sql title="Flink SQL"
-- there is a temporary datagen table
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
Expand All @@ -143,7 +151,9 @@ CREATE TEMPORARY TABLE datagen (
'connector' = 'datagen',
'rows-per-second' = '10'
);
```

```sql title="Flink SQL"
-- creates Fluss table which derives the metadata from the temporary table excluding options
CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS);
```
Expand All @@ -154,7 +164,7 @@ For more details, refer to the [Flink CREATE TABLE](https://nightlies.apache.org

To delete a table, run:

```sql
```sql title="Flink SQL"
DROP TABLE my_table;
```

Expand Down
26 changes: 18 additions & 8 deletions website/docs/engine-flink/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ To quickly stop the cluster and all running components, you can use the provided

## Creating a Catalog
You can use the following SQL statement to create a catalog.
```sql title="Flink SQL Client"
```sql title="Flink SQL"
CREATE CATALOG fluss_catalog WITH (
'type'='fluss',
'bootstrap.servers' = 'localhost:9123'
Expand All @@ -86,9 +86,11 @@ CREATE CATALOG fluss_catalog WITH (
:::

## Creating a Table
```sql title="Flink SQL Client"
```sql title="Flink SQL"
USE CATALOG `fluss_catalog`;
```

```sql title="Flink SQL"
CREATE TABLE pk_table (
shop_id BIGINT,
user_id BIGINT,
Expand All @@ -102,49 +104,57 @@ CREATE TABLE pk_table (

## Data Writing
To append new data to a table, you can use `INSERT INTO` in batch mode or streaming mode:
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- Execute the flink job in batch mode for current session context
SET 'execution.runtime-mode' = 'batch';
```

```sql title="Flink SQL"
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
```

```sql title="Flink SQL"
INSERT INTO pk_table VALUES
(1234, 1234, 1, 1),
(12345, 12345, 2, 2),
(123456, 123456, 3, 3);
```
To update data record with the primary key (1234, 1234) in a Flink streaming job, use the UPDATE statement as follows:

```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
UPDATE pk_table SET total_amount = 4 WHERE shop_id = 1234 and user_id = 1234;
```

To delete the data record with primary key `(12345, 12345)`, use `DELETE FROM`:

```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
DELETE FROM pk_table WHERE shop_id = 12345 and user_id = 12345;
```

## Data Reading

To retrieve data with the primary key `(1234, 1234)`, you can perform a point query by applying a filter on the primary key:
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
SELECT * FROM pk_table WHERE shop_id = 1234 and user_id = 1234;
```

To preview a subset of the data in a table, you can use a `LIMIT` clause.
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- should run in batch mode
SELECT * FROM pk_table LIMIT 10;
```

Fluss supports processing incremental data reading in flink streaming jobs:
```sql title="Flink SQL Client"
```sql title="Flink SQL"
-- Submit the flink job in streaming mode for current session.
SET 'execution.runtime-mode' = 'streaming';
```

```sql title="Flink SQL"
-- reading changelogs from the primary-key table from beginning.
SELECT * FROM pk_table /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
```
Expand Down
18 changes: 13 additions & 5 deletions website/docs/engine-flink/lookups.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Flink lookup joins are important because they enable efficient, real-time enrich

## Examples
1. Create two tables.
```sql
```sql title="Flink SQL"
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
Expand All @@ -26,8 +26,9 @@ CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_comment` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
```


```sql title="Flink SQL"
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
Expand All @@ -42,18 +43,25 @@ CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
```

2. Perform lookup join.
```sql
```sql title="Flink SQL"
USE CATALOG `fluss_catalog`;
```

```sql title="Flink SQL"
USE my_db;
```

```sql title="Flink SQL"
CREATE TEMPORARY TABLE lookup_join_sink
(
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
```

```sql title="Flink SQL"
-- look up join in asynchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
Expand All @@ -62,8 +70,9 @@ FROM
LEFT JOIN `customer`
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
```


```sql title="Flink SQL"
-- look up join in synchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
Expand All @@ -72,7 +81,6 @@ FROM
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

```

## Lookup Options
Expand Down
Loading