Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): fix wrong default column matching #20348

Merged
merged 7 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ CREATE TABLE test_my_default_value (
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
CREATE TABLE test_my_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -200,6 +207,13 @@ select * from test_my_default_value;
----
2 jack Shanghai

statement ok
INSERT INTO test_my_default_value_disorderd (id) VALUES (2);

query II
select * from test_my_default_value_disorderd;
----
Shanghai 2

statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';
Expand Down Expand Up @@ -650,6 +664,13 @@ CREATE TABLE test_pg_default_value (
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
CREATE TABLE test_pg_default_value_disorderd (
city varchar,
id int,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

Expand All @@ -661,6 +682,14 @@ select * from test_pg_default_value;
----
1 noris Shanghai

statement ok
INSERT INTO test_pg_default_value_disorderd (id) VALUES (1);

query II
select * from test_pg_default_value_disorderd;
----
Shanghai 1

### BEGIN reset the password to the original one
onlyif can-use-recover
statement ok
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_common::license::Feature;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -1182,20 +1181,25 @@ pub(super) async fn handle_create_table_plan(
let table: ExternalTableImpl = ExternalTableImpl::connect(config)
.await
.context("failed to auto derive table schema")?;
let external_columns: Vec<_> = table
let external_columns: HashMap<&str, ColumnCatalog> = table
.column_descs()
.iter()
.cloned()
.map(|column_desc| ColumnCatalog {
column_desc,
is_hidden: false,
.map(|column_desc| {
(
column_desc.name(),
ColumnCatalog {
column_desc: column_desc.clone(),
is_hidden: false,
},
)
})
.collect();
for (col, external_col) in
columns.iter_mut().zip_eq_fast(external_columns.into_iter())
{
col.column_desc.generated_or_default_column =
external_col.column_desc.generated_or_default_column;

for col in &mut columns {
if let Some(external_col) = external_columns.get(col.name()) {
col.column_desc.generated_or_default_column =
external_col.column_desc.generated_or_default_column.clone();
}
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
}
(columns, pk_names)
}
Expand Down
Loading