From cc7bbd907c05118b160f67cfa2dc02c127418e8b Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Sun, 26 Jan 2025 18:40:16 +0800 Subject: [PATCH] cherrypick 20156 to release 2.1 --- e2e_test/s3/fs_sink.py | 224 ++++++++++++++++-- src/connector/src/parser/parquet_parser.rs | 5 + .../source/iceberg/parquet_file_handler.rs | 108 +++++---- 3 files changed, 268 insertions(+), 69 deletions(-) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index ac99f9b121195..1979b7f6606ab 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -17,6 +17,12 @@ def gen_data(file_num, item_num_per_file): assert item_num_per_file % 2 == 0, \ f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' + + struct_type = pa.struct([ + ('field1', pa.int32()), + ('field2', pa.string()) + ]) + return [ [{ 'id': file_id * item_num_per_file + item_id, @@ -44,6 +50,7 @@ def gen_data(file_num, item_num_per_file): 'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')), 'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')), 'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')), + 'nested_struct': pa.scalar((item_id, f'struct_value_{item_id}'), type=struct_type), } for item_id in range(item_num_per_file)] for file_id in range(file_num) ] @@ -62,6 +69,50 @@ def do_test(config, file_num, item_num_per_file, prefix): def _table(): return 's3_test_parquet' + print("test table function file scan") + cur.execute(f''' + SELECT + id, + name, + sex, + mark, + test_int, + test_int8, + test_uint8, + test_uint16, + test_uint32, + test_uint64, + test_float_16, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns, + nested_struct + FROM file_scan( + 'parquet', + 's3', + 'http://127.0.0.1:9301', + 'hummockadmin', + 'hummockadmin', + 's3://hummock001/test_file_scan/test_file_scan.parquet' + );''') + try: + result = cur.fetchone() + assert result[0] == 0, f'file scan assertion failed: the first column is {result[0]}, expect 0.' + except ValueError as e: + print(f"cur.fetchone() got ValueError: {e}") + + print("file scan test pass") # Execute a SELECT statement cur.execute(f'''CREATE TABLE {_table()}( id bigint primary key, @@ -88,8 +139,8 @@ def _table(): test_timestamptz_s timestamptz, test_timestamptz_ms timestamptz, test_timestamptz_us timestamptz, - test_timestamptz_ns timestamptz - + test_timestamptz_ns timestamptz, + nested_struct STRUCT<"field1" int, "field2" varchar> ) WITH ( connector = 's3', match_pattern = '*.parquet', @@ -169,7 +220,8 @@ def _table(): test_timestamptz_s, test_timestamptz_ms, test_timestamptz_us, - test_timestamptz_ns + test_timestamptz_ns, + nested_struct from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -179,7 +231,6 @@ def _table(): s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', s3.path = 'test_parquet_sink/', - s3.file_type = 'parquet', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') @@ -187,7 +238,7 @@ def _table(): print('Sink into s3 in parquet encode...') # Execute a SELECT statement cur.execute(f'''CREATE TABLE test_parquet_sink_table( - id bigint primary key,\ + id bigint primary key, name TEXT, sex bigint, mark bigint, @@ -211,7 +262,8 @@ def _table(): test_timestamptz_s timestamptz, test_timestamptz_ms timestamptz, test_timestamptz_us timestamptz, - test_timestamptz_ns timestamptz + test_timestamptz_ns timestamptz, + nested_struct STRUCT<"field1" int, "field2" varchar>, ) WITH ( connector = 's3', match_pattern = 'test_parquet_sink/*.parquet', @@ -220,8 +272,8 @@ def _table(): s3.credentials.access = 'hummockadmin', s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + refresh.interval.sec = 1, ) FORMAT PLAIN ENCODE PARQUET;''') - total_rows = file_num * item_num_per_file MAX_RETRIES = 40 for retry_no in range(MAX_RETRIES): @@ -262,19 +314,20 @@ def _table(): test_timestamptz_s, test_timestamptz_ms, test_timestamptz_us, - test_timestamptz_ns + test_timestamptz_ns, + nested_struct from {_table()} WITH ( - connector = 's3', + connector = 'snowflake', match_pattern = '*.parquet', - s3.region_name = 'custom', - s3.bucket_name = 'hummock001', - s3.credentials.access = 'hummockadmin', - s3.credentials.secret = 'hummockadmin', + snowflake.aws_region = 'custom', + snowflake.s3_bucket = 'hummock001', + snowflake.aws_access_key_id = 'hummockadmin', + snowflake.aws_secret_access_key = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', s3.path = 'test_json_sink/', - s3.file_type = 'json', type = 'append-only', - force_append_only='true' + force_append_only='true', + refresh.interval.sec = 1, ) FORMAT PLAIN ENCODE JSON(force_append_only='true');''') print('Sink into s3 in json encode...') @@ -304,7 +357,8 @@ def _table(): test_timestamptz_s timestamptz, test_timestamptz_ms timestamptz, test_timestamptz_us timestamptz, - test_timestamptz_ns timestamptz + test_timestamptz_ns timestamptz, + nested_struct STRUCT<"field1" int, "field2" varchar> ) WITH ( connector = 's3', match_pattern = 'test_json_sink/*.json', @@ -343,8 +397,127 @@ def _assert_eq(field, got, expect): cur.execute(f'drop table test_parquet_sink_table') cur.execute(f'drop sink test_file_sink_json') cur.execute(f'drop table test_json_sink_table') + cur.execute(f'drop table s3_test_parquet') + cur.close() + conn.close() + +def test_file_sink_batching(): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE t (v1 int, v2 int);''') + + print('test file sink batching...\n') + cur.execute(f'''CREATE sink test_file_sink_batching as select + v1, v2 from t WITH ( + connector = 's3', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_file_sink_batching/', + type = 'append-only', + rollover_seconds = 5, + max_row_count = 5, + force_append_only='true' + ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') + + cur.execute(f'''CREATE TABLE test_file_sink_batching_table( + v1 int, + v2 int, + ) WITH ( + connector = 's3', + match_pattern = 'test_file_sink_batching/*.parquet', + refresh.interval.sec = 1, + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + ) FORMAT PLAIN ENCODE PARQUET;''') + + cur.execute(f'''ALTER SINK test_file_sink_batching SET PARALLELISM = 2;''') + + cur.execute(f'''INSERT INTO t VALUES (10, 10);''') + + + cur.execute(f'select count(*) from test_file_sink_batching_table') + # no item will be selectedpsq + result = cur.fetchone() + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + def _assert_greater(field, got, expect): + assert got > expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], 0) + print('the rollover_seconds has not reached, count(*) = 0') + + + time.sleep(11) + + cur.execute(f'select count(*) from test_file_sink_batching_table') + result = cur.fetchone() + _assert_eq('count(*)', result[0], 1) + print('the rollover_seconds has reached, count(*) = ', result[0]) + + cur.execute(f''' + INSERT INTO t VALUES (20, 20); + INSERT INTO t VALUES (30, 30); + INSERT INTO t VALUES (40, 40); + INSERT INTO t VALUES (50, 10); + ''') + + cur.execute(f'select count(*) from test_file_sink_batching_table') + # count(*) = 1 + result = cur.fetchone() + _assert_eq('count(*)', result[0], 1) + print('the max row count has not reached, count(*) = ', result[0]) + + cur.execute(f''' + INSERT INTO t VALUES (60, 20); + INSERT INTO t VALUES (70, 30); + INSERT INTO t VALUES (80, 10); + INSERT INTO t VALUES (90, 20); + INSERT INTO t VALUES (100, 30); + INSERT INTO t VALUES (100, 10); + ''') + + time.sleep(10) + + cur.execute(f'select count(*) from test_file_sink_batching_table') + result = cur.fetchone() + _assert_greater('count(*)', result[0], 1) + print('the rollover_seconds has reached, count(*) = ', result[0]) + + cur.execute(f'drop sink test_file_sink_batching;') + cur.execute(f'drop table t;') + cur.execute(f'drop table test_file_sink_batching_table;') cur.close() conn.close() + # delete objects + + client = Minio( + "127.0.0.1:9301", + "hummockadmin", + "hummockadmin", + secure=False, + ) + objects = client.list_objects("hummock001", prefix="test_file_sink_batching/", recursive=True) + + for obj in objects: + client.remove_object("hummock001", obj.object_name) + print(f"Deleted: {obj.object_name}") @@ -374,6 +547,21 @@ def _assert_eq(field, got, expect): _s3(idx), _local(idx) ) + # put parquet file to test table function file scan + if data: + first_file_data = data[0] + first_table = pa.Table.from_pandas(pd.DataFrame(first_file_data)) + + first_file_name = f"test_file_scan.parquet" + first_file_path = f"test_file_scan/{first_file_name}" + + pq.write_table(first_table, "data_0.parquet") + + client.fput_object( + "hummock001", + first_file_path, + "data_0.parquet" + ) # do test do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) @@ -384,7 +572,9 @@ def _assert_eq(field, got, expect): do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) - # clean up s3 files + # clean up s3 files for idx, _ in enumerate(data): client.remove_object("hummock001", _s3(idx)) + # test file sink batching + test_file_sink_batching() \ No newline at end of file diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index f656555d7c119..d87481ef64546 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -26,6 +26,7 @@ use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use crate::parser::ConnectorResult; use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use crate::source::filesystem::opendal_source::{OpendalGcs, OpendalPosixFs, OpendalS3}; +use crate::source::iceberg::is_parquet_schema_match_source_schema; use crate::source::reader::desc::SourceDesc; use crate::source::{ConnectorProperties, SourceColumnDesc}; /// `ParquetParser` is responsible for converting the incoming `record_batch_stream` @@ -110,6 +111,10 @@ impl ParquetParser { if let Some(parquet_column) = record_batch.column_by_name(rw_column_name) + && is_parquet_schema_match_source_schema( + parquet_column.data_type(), + rw_data_type, + ) { let arrow_field = IcebergArrowConvert .to_arrow_field(rw_column_name, rw_data_type)?; diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 8425a907f8d05..c745839aa95a5 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -36,7 +36,7 @@ use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataRead use risingwave_common::array::arrow::arrow_schema_udf::{DataType as ArrowDateType, IntervalUnit}; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::ColumnId; +use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::types::DataType as RwDataType; use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use url::Url; @@ -183,55 +183,64 @@ pub async fn list_s3_directory( } } -/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. +/// Extracts a suitable `ProjectionMask` from a Parquet file schema based on the user's requested schema. /// -/// This function is used for column pruning of Parquet files. It calculates the intersection -/// between the columns in the currently read Parquet file and the schema provided by the user. -/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that -/// only the necessary columns are read. +/// This function is utilized for column pruning of Parquet files. It checks the user's requested schema +/// against the schema of the currently read Parquet file. If the provided `columns` are `None` +/// or if the Parquet file contains nested data types, it returns `ProjectionMask::all()`. Otherwise, +/// it returns only the columns where both the data type and column name match the requested schema, +/// facilitating efficient reading of the `RecordBatch`. /// /// # Parameters -/// - `columns`: A vector of `Column` representing the user's requested schema. +/// - `columns`: An optional vector of `Column` representing the user's requested schema. /// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. /// /// # Returns -/// - A `ConnectorResult>`, which contains the indices of the valid columns in the -/// Parquet file schema that match the requested schema. If an error occurs during processing, -/// it returns an appropriate error. -pub fn extract_valid_column_indices( - rw_columns: Vec, +/// - A `ConnectorResult`, which represents the valid columns in the Parquet file schema +/// that correspond to the requested schema. If an error occurs during processing, it returns an +/// appropriate error. +pub fn get_project_mask( + columns: Option>, metadata: &FileMetaData, -) -> ConnectorResult> { - let parquet_column_names = metadata - .schema_descr() - .columns() - .iter() - .map(|c| c.name()) - .collect_vec(); +) -> ConnectorResult { + match columns { + Some(rw_columns) => { + let root_column_names = metadata + .schema_descr() + .root_schema() + .get_fields() + .iter() + .map(|field| field.name()) + .collect_vec(); - let converted_arrow_schema = - parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) - .map_err(anyhow::Error::from)?; + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + root_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type(); + let rw_data_type: &risingwave_common::types::DataType = &column.data_type; + if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { + Some(pos) + } else { + None + } + }) + }) + .collect(); - let valid_column_indices: Vec = rw_columns - .iter() - .filter_map(|column| { - parquet_column_names - .iter() - .position(|&name| name == column.name) - .and_then(|pos| { - let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); - let rw_data_type: &risingwave_common::types::DataType = &column.data_type; - - if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { - Some(pos) - } else { - None - } - }) - }) - .collect(); - Ok(valid_column_indices) + Ok(ProjectionMask::roots( + metadata.schema_descr(), + valid_column_indices, + )) + } + None => Ok(ProjectionMask::all()), + } } /// Reads a specified Parquet file and converts its content into a stream of chunks. @@ -255,13 +264,7 @@ pub async fn read_parquet_file( let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; let file_metadata = parquet_metadata.file_metadata(); - let projection_mask = match rw_columns { - Some(columns) => { - let column_indices = extract_valid_column_indices(columns, file_metadata)?; - ProjectionMask::leaves(file_metadata.schema_descr(), column_indices) - } - None => ProjectionMask::all(), - }; + let projection_mask = get_project_mask(rw_columns, file_metadata)?; // For the Parquet format, we directly convert from a record batch to a stream chunk. // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. @@ -284,11 +287,12 @@ pub async fn read_parquet_file( .enumerate() .map(|(index, field_ref)| { let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap(); - SourceColumnDesc::simple( + let column_desc = ColumnDesc::named( field_ref.name().clone(), - data_type, ColumnId::new(index as i32), - ) + data_type, + ); + SourceColumnDesc::from(&column_desc) }) .collect(), }; @@ -333,7 +337,7 @@ pub async fn get_parquet_fields( /// - Arrow's `UInt32` matches with RisingWave's `Int64`. /// - Arrow's `UInt64` matches with RisingWave's `Decimal`. /// - Arrow's `Float16` matches with RisingWave's `Float32`. -fn is_parquet_schema_match_source_schema( +pub fn is_parquet_schema_match_source_schema( arrow_data_type: &ArrowDateType, rw_data_type: &RwDataType, ) -> bool {