From fa28fbdbe55b6e34017c9b3eb8dc814f9458a3f1 Mon Sep 17 00:00:00 2001 From: Gary Li Date: Fri, 10 Feb 2023 11:40:47 +0800 Subject: [PATCH] [BitSail][Connector] cdc source add coordinator state (#380) --- .../cdc/error/BinlogReaderErrorCode.java | 4 +- .../cdc/option/BinlogReaderOptions.java | 9 +++ .../connector/cdc/source/BinlogSource.java | 10 +-- .../CDCSourceSplitCoordinator.java | 49 +++++++------- .../state/AssignmentStateSerializer.java | 26 ++++++++ .../state/BaseAssignmentState.java} | 23 +++---- .../state/BinlogAssignmentState.java | 37 +++++++++++ .../state/SnapshotAssignmentState.java | 32 ++++++++++ .../source/coordinator/state/SplitType.java | 22 +++++++ .../cdc/source/offset/BinlogOffset.java | 64 ++++++++++++++----- .../cdc/source/offset/BinlogOffsetType.java | 28 ++++++++ .../state/AssignmentStateSerializerTest.java | 43 +++++++++++++ .../cdc/source/offset/BinlogOffsetTests.java | 43 +++++++++++++ .../debezium/MysqlBinlogSplitReader.java | 19 ++++-- 14 files changed, 346 insertions(+), 63 deletions(-) create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializer.java rename bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/{state/BinlogOffsetState.java => coordinator/state/BaseAssignmentState.java} (63%) create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BinlogAssignmentState.java create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SnapshotAssignmentState.java create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SplitType.java create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetType.java create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializerTest.java create mode 100644 bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetTests.java diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/error/BinlogReaderErrorCode.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/error/BinlogReaderErrorCode.java index 2252a88ea..9ebedf454 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/error/BinlogReaderErrorCode.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/error/BinlogReaderErrorCode.java @@ -23,7 +23,9 @@ public enum BinlogReaderErrorCode implements ErrorCode { REQUIRED_VALUE("Cdc-00", "The configuration file is lack of necessary options"), CONFIG_ERROR("Cdc-01", "The configuration has wrong option"), CONVERT_ERROR("Cdc-02", "Failed to convert mysql cdc result to row"), - UNSUPPORTED_ERROR("Cdc-03", "Operation is not supported yet"); + UNSUPPORTED_ERROR("Cdc-03", "Operation is not supported yet"), + + OFFSET_VAL_ERROR("Cdc-04", "specified binlog offset require initial binlog offset, but binlog offset value is not set."); private final String code; diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/option/BinlogReaderOptions.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/option/BinlogReaderOptions.java index 1fc5e4ef4..3aa0e173b 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/option/BinlogReaderOptions.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/option/BinlogReaderOptions.java @@ -19,6 +19,7 @@ import com.bytedance.bitsail.common.option.ConfigOption; import com.bytedance.bitsail.common.option.ReaderOptions; import com.bytedance.bitsail.connector.cdc.model.ClusterInfo; +import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffsetType; import com.alibaba.fastjson.TypeReference; @@ -51,4 +52,12 @@ public interface BinlogReaderOptions extends ReaderOptions.BaseReaderOptions { ConfigOption QUERY_RETRY_TIMES = key(READER_PREFIX + "query_retry_times") .defaultValue(3); + + ConfigOption INITIAL_OFFSET_TYPE = + key(READER_PREFIX + "initial_offset_type") + .defaultValue(String.valueOf(BinlogOffsetType.EARLIEST).toLowerCase()); + + ConfigOption INITIAL_OFFSET_VALUE = + key(READER_PREFIX + "initial_offset_value") + .noDefaultValue(String.class); } diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/BinlogSource.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/BinlogSource.java index 58bbff762..6190c38ba 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/BinlogSource.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/BinlogSource.java @@ -30,15 +30,15 @@ import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; import com.bytedance.bitsail.common.type.TypeInfoConverter; import com.bytedance.bitsail.connector.cdc.source.coordinator.CDCSourceSplitCoordinator; +import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BaseAssignmentState; import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; -import com.bytedance.bitsail.connector.cdc.source.state.BinlogOffsetState; import java.io.IOException; /** * WIP: Source to read mysql binlog. */ -public abstract class BinlogSource implements Source, ParallelismComputable { +public abstract class BinlogSource implements Source, ParallelismComputable { protected BitSailConfiguration jobConf; @@ -57,8 +57,8 @@ public Boundedness getSourceBoundedness() { public abstract SourceReader createReader(SourceReader.Context readerContext); @Override - public SourceSplitCoordinator createSplitCoordinator( - SourceSplitCoordinator.Context coordinatorContext) { + public SourceSplitCoordinator createSplitCoordinator( + SourceSplitCoordinator.Context coordinatorContext) { return new CDCSourceSplitCoordinator(coordinatorContext, jobConf); } @@ -68,7 +68,7 @@ public BinarySerializer getSplitSerializer() { } @Override - public BinarySerializer getSplitCoordinatorCheckpointSerializer() { + public BinarySerializer getSplitCoordinatorCheckpointSerializer() { return new SimpleVersionedBinarySerializer<>(); } diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/CDCSourceSplitCoordinator.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/CDCSourceSplitCoordinator.java index e441dc518..2f9a07a77 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/CDCSourceSplitCoordinator.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/CDCSourceSplitCoordinator.java @@ -19,14 +19,13 @@ import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent; import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BaseAssignmentState; +import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BinlogAssignmentState; import com.bytedance.bitsail.connector.cdc.source.event.BinlogCompleteAckEvent; import com.bytedance.bitsail.connector.cdc.source.event.BinlogCompleteEvent; -import com.bytedance.bitsail.connector.cdc.source.event.BinlogStopReadEvent; import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffset; import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; -import com.bytedance.bitsail.connector.cdc.source.state.BinlogOffsetState; -import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,45 +33,51 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; -public class CDCSourceSplitCoordinator implements SourceSplitCoordinator { +public class CDCSourceSplitCoordinator implements SourceSplitCoordinator { private static final Logger LOG = LoggerFactory.getLogger(CDCSourceSplitCoordinator.class); - private final SourceSplitCoordinator.Context context; + private final SourceSplitCoordinator.Context context; private final BitSailConfiguration jobConf; - private final Map> splitAssignmentPlan; + private boolean isBinlogAssigned; - public CDCSourceSplitCoordinator(SourceSplitCoordinator.Context context, + public CDCSourceSplitCoordinator(SourceSplitCoordinator.Context context, BitSailConfiguration jobConf) { this.context = context; this.jobConf = jobConf; - this.splitAssignmentPlan = Maps.newConcurrentMap(); + if (context.isRestored()) { + BaseAssignmentState restoredState = context.getRestoreState(); + this.isBinlogAssigned = ((BinlogAssignmentState) restoredState).isAssigned(); + LOG.info(String.format("Restore coordinator state, state type is: %s, binlog is assigned: %s", + restoredState.getType(), this.isBinlogAssigned)); + } else { + this.isBinlogAssigned = false; + } } @Override public void start() { int totalReader = this.context.registeredReaders().size(); LOG.info("Total registered reader number: {}", totalReader); - // assign binlog split to reader - List initialSplit = new ArrayList<>(); - initialSplit.add(createSplit()); - // test assign split to task0 - this.context.assignSplit(0, initialSplit); + if (!this.isBinlogAssigned) { + List splitList = new ArrayList<>(); + BinlogSplit split = createSplit(this.jobConf); + splitList.add(split); + LOG.info("binlog is not assigned, assigning a new binlog split to reader: " + split.toString()); + this.context.assignSplit(0, splitList); + this.isBinlogAssigned = true; + } } @Override public void addReader(int subtaskId) { // do not support add reader during the job is running - context.sendEventToSourceReader(subtaskId, new BinlogStopReadEvent()); } @Override public void addSplitsBack(List splits, int subtaskId) { - LOG.info("Add split back to assignment plan: {}", splits); - splitAssignmentPlan.get(subtaskId).addAll(splits); + // do nothing } @Override @@ -91,9 +96,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } @Override - public BinlogOffsetState snapshotState() throws Exception { + public BaseAssignmentState snapshotState() { // currently store nothing in state - return new BinlogOffsetState(); + return new BinlogAssignmentState(this.isBinlogAssigned); } @Override @@ -106,8 +111,8 @@ public void close() { LOG.info("Closing MysqlSourceSplitCoordinator"); } - private BinlogSplit createSplit() { - BinlogOffset begin = BinlogOffset.earliest(); + private BinlogSplit createSplit(BitSailConfiguration jobConf) { + BinlogOffset begin = BinlogOffset.createFromJobConf(jobConf); BinlogOffset end = BinlogOffset.boundless(); diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializer.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializer.java new file mode 100644 index 000000000..da5aa8866 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializer.java @@ -0,0 +1,26 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.coordinator.state; + +import com.bytedance.bitsail.base.serializer.SimpleVersionedBinarySerializer; + +public class AssignmentStateSerializer extends SimpleVersionedBinarySerializer { + + public AssignmentStateSerializer() { + + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/state/BinlogOffsetState.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BaseAssignmentState.java similarity index 63% rename from bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/state/BinlogOffsetState.java rename to bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BaseAssignmentState.java index c0768fe91..7bb93abc9 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/state/BinlogOffsetState.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BaseAssignmentState.java @@ -14,25 +14,18 @@ * limitations under the License. */ -package com.bytedance.bitsail.connector.cdc.source.state; +package com.bytedance.bitsail.connector.cdc.source.coordinator.state; + +import javax.annotation.Nullable; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -/** - * Store mysql offset in Flink State. - */ -public class BinlogOffsetState implements Serializable { - private static final long serialVersionUID = 1L; +public abstract class BaseAssignmentState implements Serializable { - Map offsetStore; + private static final long serialVersionUID = 1L; - public BinlogOffsetState() { - this.offsetStore = new HashMap<>(); - } + @Nullable + transient byte[] serializedCache; - public BinlogOffsetState(Map offsetStore) { - this.offsetStore = offsetStore; - } + public abstract int getType(); } diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BinlogAssignmentState.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BinlogAssignmentState.java new file mode 100644 index 000000000..cd943fabc --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/BinlogAssignmentState.java @@ -0,0 +1,37 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.coordinator.state; + +/** + * State to store whether the binlog split was already assigned by coordinator. + */ +public class BinlogAssignmentState extends BaseAssignmentState { + private final boolean isAssigned; + + @Override + public int getType() { + return SplitType.BINLOG_SPLIT_TYPE; + } + + public BinlogAssignmentState(boolean isAssigned) { + this.isAssigned = isAssigned; + } + + public boolean isAssigned() { + return isAssigned; + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SnapshotAssignmentState.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SnapshotAssignmentState.java new file mode 100644 index 000000000..4720d26dd --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SnapshotAssignmentState.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.coordinator.state; + +/** + * Store the snapshot assignment status. + */ +public class SnapshotAssignmentState extends BaseAssignmentState { + + public SnapshotAssignmentState() { + + } + + @Override + public int getType() { + return SplitType.SNAPSHOT_SPLIT_TYPE; + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SplitType.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SplitType.java new file mode 100644 index 000000000..fafd1376f --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/SplitType.java @@ -0,0 +1,22 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.coordinator.state; + +public class SplitType { + public static final int BINLOG_SPLIT_TYPE = 1; + public static final int SNAPSHOT_SPLIT_TYPE = 2; +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffset.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffset.java index 5f2fc8aa1..4ec524b3a 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffset.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffset.java @@ -16,6 +16,11 @@ package com.bytedance.bitsail.connector.cdc.source.offset; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.error.BinlogReaderErrorCode; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; + import lombok.Data; import java.io.Serializable; @@ -25,32 +30,61 @@ public class BinlogOffset implements Serializable { private static final long serialVersionUID = 1L; - private final OffsetType offsetType; + private final BinlogOffsetType offsetType; private final Properties props; - public BinlogOffset(OffsetType offsetType, Properties props) { + public BinlogOffset(BinlogOffsetType offsetType, Properties props) { this.offsetType = offsetType; this.props = props; } public static BinlogOffset earliest() { - return new BinlogOffset(OffsetType.EARLIEST, new Properties()); + return new BinlogOffset(BinlogOffsetType.EARLIEST, new Properties()); + } + + public static BinlogOffset latest() { + return new BinlogOffset(BinlogOffsetType.LATEST, new Properties()); } public static BinlogOffset boundless() { - return new BinlogOffset(OffsetType.BOUNDLESS, new Properties()); - } - - public enum OffsetType { - // earliest point in binlog - EARLIEST, - // latest point in binlog - LATEST, - // specified point in the binlog file - SPECIFIED, - // represent an endless point, could only be the end point - BOUNDLESS + return new BinlogOffset(BinlogOffsetType.BOUNDLESS, new Properties()); + } + + public static BinlogOffset specified() { + return new BinlogOffset(BinlogOffsetType.SPECIFIED, new Properties()); + } + + public static BinlogOffset createFromJobConf(BitSailConfiguration jobConf) { + String rawOffsetType = jobConf.getNecessaryOption( + BinlogReaderOptions.INITIAL_OFFSET_TYPE, BinlogReaderErrorCode.REQUIRED_VALUE).toUpperCase().trim(); + BinlogOffsetType offsetType = BinlogOffsetType.valueOf(rawOffsetType); + switch (offsetType) { + case LATEST: + return latest(); + case EARLIEST: + return earliest(); + case BOUNDLESS: + return boundless(); + case SPECIFIED: + // currently only support mysql. Using format filename,offset + String offsetValue = jobConf.getNecessaryOption( + BinlogReaderOptions.INITIAL_OFFSET_VALUE, BinlogReaderErrorCode.OFFSET_VAL_ERROR); + // TODO: make this more robust + String filename = offsetValue.split(",")[0]; + String offset = offsetValue.split(",")[1]; + BinlogOffset result = specified(); + // TODO: move constant to common place + result.addProps("filename", filename); + result.addProps("offset", offset); + return result; + default: + throw new BitSailException(BinlogReaderErrorCode.UNSUPPORTED_ERROR, "Unsupported offset type: " + rawOffsetType); + } + } + + public void addProps(String key, String value) { + this.props.put(key, value); } @Override diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetType.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetType.java new file mode 100644 index 000000000..7bc2c267f --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetType.java @@ -0,0 +1,28 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.offset; + +public enum BinlogOffsetType { + // earliest point in binlog + EARLIEST, + // latest point in binlog + LATEST, + // specified point in the binlog file + SPECIFIED, + // represent an endless point, could only be the end point + BOUNDLESS +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializerTest.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializerTest.java new file mode 100644 index 000000000..0ee005f9b --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/coordinator/state/AssignmentStateSerializerTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.coordinator.state; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class AssignmentStateSerializerTest { + @Test + public void testSerializeBinlogState() throws IOException { + AssignmentStateSerializer serializer = new AssignmentStateSerializer(); + BinlogAssignmentState binlogAssignmentState = new BinlogAssignmentState(true); + byte[] obj = serializer.serialize(binlogAssignmentState); + BaseAssignmentState desObj = serializer.deserialize(serializer.getVersion(), obj); + Assert.assertEquals(SplitType.BINLOG_SPLIT_TYPE, desObj.getType()); + Assert.assertTrue(((BinlogAssignmentState) desObj).isAssigned()); + } + + @Test + public void testSerializeSnapshotState() throws IOException { + AssignmentStateSerializer serializer = new AssignmentStateSerializer(); + SnapshotAssignmentState snapshotAssignmentState = new SnapshotAssignmentState(); + byte[] obj = serializer.serialize(snapshotAssignmentState); + BaseAssignmentState desObj = serializer.deserialize(serializer.getVersion(), obj); + Assert.assertEquals(SplitType.SNAPSHOT_SPLIT_TYPE, desObj.getType()); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetTests.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetTests.java new file mode 100644 index 000000000..ad013b326 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/test/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffsetTests.java @@ -0,0 +1,43 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.source.offset; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; + +import org.junit.Assert; +import org.junit.Test; + +public class BinlogOffsetTests { + @Test + public void testSpecifiedOffsetFromConfig() { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(BinlogReaderOptions.INITIAL_OFFSET_TYPE, "specified"); + jobConf.set(BinlogReaderOptions.INITIAL_OFFSET_VALUE, "mysql.0001,1111"); + BinlogOffset result = BinlogOffset.createFromJobConf(jobConf); + Assert.assertEquals("mysql.0001", result.getProps().getProperty("filename")); + Assert.assertEquals("1111", result.getProps().getProperty("offset")); + } + + @Test + public void testLatestOffsetFromConfig() { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(BinlogReaderOptions.INITIAL_OFFSET_TYPE, "latest"); + BinlogOffset result = BinlogOffset.createFromJobConf(jobConf); + Assert.assertEquals(BinlogOffsetType.LATEST, result.getOffsetType()); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-mysql/src/main/java/com/bytedance/bitsail/connector/cdc/mysql/source/debezium/MysqlBinlogSplitReader.java b/bitsail-connectors/connector-cdc/connector-cdc-mysql/src/main/java/com/bytedance/bitsail/connector/cdc/mysql/source/debezium/MysqlBinlogSplitReader.java index 006dfd770..cc45363a5 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-mysql/src/main/java/com/bytedance/bitsail/connector/cdc/mysql/source/debezium/MysqlBinlogSplitReader.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-mysql/src/main/java/com/bytedance/bitsail/connector/cdc/mysql/source/debezium/MysqlBinlogSplitReader.java @@ -63,11 +63,14 @@ /** * Reader that actually execute the Debezium task. + * This reader is stateless and will read binlog starting from the given begin offset. */ public class MysqlBinlogSplitReader implements BinlogSplitReader { private static final Logger LOG = LoggerFactory.getLogger(MysqlBinlogSplitReader.class); + public static final int ROW_SIZE = 5; + private boolean isRunning; private final MysqlConfig mysqlConfig; @@ -183,7 +186,7 @@ public void readSplit(BinlogSplit split) { dispatcher, errorHandler, Clock.SYSTEM, - taskContext, // reuse binary log client + taskContext, new MySqlStreamingChangeEventSourceMetrics( taskContext, queue, metadataProvider) ); @@ -241,20 +244,27 @@ public boolean isRunning() { } } + @Override public boolean isCompleted() { return !isRunning; } + @Override public Row poll() { SourceRecord record = this.recordIterator.next(); this.offset = record.sourceOffset(); LOG.info("OFFSET:" + record.sourceOffset()); - LOG.info("poll one record {}", record.value()); + LOG.info("Record: " + record.toString()); + //LOG.info("poll one record {}", record.value()); + + Row result = new Row(2); + result.setField(0, record.timestamp()); + result.setField(1, record.value()); // TODO: Build BitSail row and return - return null; - //return record; + return result; } + @Override public boolean hasNext() throws InterruptedException { if (this.recordIterator.hasNext()) { return true; @@ -268,7 +278,6 @@ private boolean pollNextBatch() throws InterruptedException { if (isRunning) { List dbzRecords = queue.poll(); while (dbzRecords.isEmpty()) { - //sleep 10s LOG.info("No record found, sleep for 5s in reader"); TimeUnit.SECONDS.sleep(5); dbzRecords = queue.poll();