Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): fix wrong default column matching #20348

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ create table non_exist ( id INT,
PRIMARY KEY (id)
) from mysql_mytest table 'mytest.non_exist';


# FIXME(kexiang): Currently, the generated rows (next_order_id in this case) must be at the end of schema, otherwise the frontend panics.
statement ok
create table orders_test (
order_id int,
Expand All @@ -170,6 +170,7 @@ create table orders_test (
price decimal,
product_id int,
order_status smallint,
next_order_id int as order_id + 1,
PRIMARY KEY (order_id)
) from mysql_mytest table 'mytest.orders';

Expand All @@ -189,6 +190,13 @@ CREATE TABLE test_my_default_value (
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
CREATE TABLE test_my_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -200,6 +208,13 @@ select * from test_my_default_value;
----
2 jack Shanghai

statement ok
INSERT INTO test_my_default_value_disorderd (id) VALUES (2);

query II
select * from test_my_default_value_disorderd;
----
Shanghai 2

statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';
Expand Down Expand Up @@ -291,11 +306,11 @@ Milk Milk is a white liquid food
Juice 100ml Juice

query ITTT
SELECT order_id,order_date,customer_name,product_id FROM orders_test order by order_id limit 3
SELECT order_id,order_date,customer_name,product_id,next_order_id FROM orders_test order by order_id limit 3
----
10001 2020-07-30 10:08:22 Jark 102
10002 2020-07-30 10:11:09 Sally 105
10003 2020-07-30 12:00:30 Edward 106
10001 2020-07-30 10:08:22 Jark 102 10002
10002 2020-07-30 10:11:09 Sally 105 10003
10003 2020-07-30 12:00:30 Edward 106 10004

query IIIIITTTTTTTTT
SELECT c_boolean, c_bit, c_tinyint, c_smallint, c_mediumint, c_integer, c_bigint, c_decimal, c_float, c_double, c_char_255, c_varchar_10000, c_date, c_time, c_datetime, c_timestamp FROM mysql_all_types order by c_bigint;
Expand Down Expand Up @@ -379,13 +394,15 @@ CREATE TABLE upper_orders_shared (
name varchar
) FROM pg_source TABLE 'public.Orders';

# FIXME(kexiang): Currently, the generated rows (next_id in this case) must be at the end of schema, otherwise the frontend panics.
statement ok
CREATE TABLE person_new (
id int,
name varchar,
email_address varchar,
credit_card varchar,
city varchar,
next_id int as id + 1,
PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
Expand Down Expand Up @@ -445,22 +462,22 @@ cdc_test public person


query ITTTT
SELECT id,name,email_address,credit_card,city from person_new order by id;
SELECT id,name,email_address,credit_card,city,next_id from person_new order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles
1100 noris [email protected] 1864 2539 enne
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne 1001
1001 peter white [email protected] 1781 2313 8157 6974 boise 1002
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles 1003
1100 noris [email protected] 1864 2539 enne 1101
1101 white [email protected] 8157 6974 se 1102
1102 spencer [email protected] 9481 6270 angeles 1103

# historical data
query ITTTT
SELECT id,name,email_address,credit_card,city from person_new where commit_ts = '1970-01-01 00:00:00+00:00' order by id;
SELECT id,name,email_address,credit_card,city,next_id from person_new where commit_ts = '1970-01-01 00:00:00+00:00' order by id;
----
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne
1001 peter white [email protected] 1781 2313 8157 6974 boise
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles
1000 vicky noris [email protected] 7878 5821 1864 2539 cheyenne 1001
1001 peter white [email protected] 1781 2313 8157 6974 boise 1002
1002 sarah spencer [email protected] 3453 4987 9481 6270 los angeles 1003

# incremental data
query ITTTT
Expand Down Expand Up @@ -650,6 +667,13 @@ CREATE TABLE test_pg_default_value (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
CREATE TABLE test_pg_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -661,6 +685,14 @@ select * from test_pg_default_value;
----
1 noris Shanghai

statement ok
INSERT INTO test_pg_default_value_disorderd (id) VALUES (1);

query II
select * from test_pg_default_value_disorderd;
----
Shanghai 1

### BEGIN reset the password to the original one
onlyif can-use-recover
statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'mydb.dbo.orders';

# column name mismatch
statement error INVALID_ARGUMENT: Column 'wrong_order_date' not found in the upstream database
statement error Column 'wrong_order_date' not found in the upstream database
CREATE TABLE shared_orders (
order_id INT,
wrong_order_date BIGINT,
Expand Down
20 changes: 8 additions & 12 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_common::license::Feature;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -1182,20 +1181,17 @@ pub(super) async fn handle_create_table_plan(
let table: ExternalTableImpl = ExternalTableImpl::connect(config)
.await
.context("failed to auto derive table schema")?;
let external_columns: Vec<_> = table
let external_columns: HashMap<&str, &ColumnDesc> = table
.column_descs()
.iter()
.cloned()
.map(|column_desc| ColumnCatalog {
column_desc,
is_hidden: false,
})
.map(|column_desc| (column_desc.name.as_str(), column_desc))
.collect();
for (col, external_col) in
columns.iter_mut().zip_eq_fast(external_columns.into_iter())
{
col.column_desc.generated_or_default_column =
external_col.column_desc.generated_or_default_column;

for col in &mut columns {
if let Some(external_col) = external_columns.get(col.name()) {
col.column_desc.generated_or_default_column =
external_col.generated_or_default_column.clone();
}
}
(columns, pk_names)
}
Expand Down
Loading