From 10c74aff465309299e402f0f18e91ba5d19ec4d1 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 11 Jan 2025 20:29:59 +0800 Subject: [PATCH] fixed --- .../alibaba/fluss/server/coordinator/CoordinatorService.java | 4 ++-- .../server/kv/mergeengine/DeduplicateRowMergeEngine.java | 1 + .../fluss/server/kv/mergeengine/FirstRowMergeEngine.java | 1 + .../alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java | 1 + .../fluss/server/kv/mergeengine/VersionRowMergeEngine.java | 1 + .../test/java/com/alibaba/fluss/server/kv/KvTabletTest.java | 1 + 6 files changed, 7 insertions(+), 2 deletions(-) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index a7025b28..9a269de6 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -203,14 +203,14 @@ private void checkMergeEngine(MergeEngine mergeEngine, Schema schema) { RowType rowType = schema.toRowType(); int fieldIndex = rowType.getFieldIndex(column); if (fieldIndex == -1) { - throw new IllegalArgumentException( + throw new InvalidTableException( String.format( "The version merge engine column %s does not exist.", column)); } DataType dataType = rowType.getTypeAt(fieldIndex); if (!VersionRowMergeEngine.VERSION_MERGE_ENGINE_SUPPORTED_DATA_TYPES.contains( dataType.getClass().getName())) { - throw new IllegalArgumentException( + throw new InvalidTableException( String.format( "The version merge engine column does not support type %s .", dataType)); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java index b3abe51c..33848043 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/DeduplicateRowMergeEngine.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java index e834f943..1f90e48a 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java index 1ea0bda9..4eda7ca3 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.row.BinaryRow; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java index 54617cc8..5cdb8718 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.alibaba.fluss.server.kv.mergeengine; import com.alibaba.fluss.metadata.MergeEngine; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 4bfcd2b5..57340f7b 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -658,6 +658,7 @@ void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) "k2".getBytes(), new Object[] {2, 1000L}), // not update kvRecordFactory.ofRecord( "k1".getBytes(), new Object[] {1, 1001L}), // -U , +U + kvRecordFactory.ofRecord("k1".getBytes(), null), // not update kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, 1000L})); // +I KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); kvTablet.putAsLeader(kvRecordBatch2, null, DATA3_SCHEMA_PK);