Skip to content

Commit

Permalink
[BitSail][Connector] cdc source add coordinator state (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyli1019 authored Feb 10, 2023
1 parent df25e0d commit fa28fbd
Show file tree
Hide file tree
Showing 14 changed files with 346 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public enum BinlogReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("Cdc-00", "The configuration file is lack of necessary options"),
CONFIG_ERROR("Cdc-01", "The configuration has wrong option"),
CONVERT_ERROR("Cdc-02", "Failed to convert mysql cdc result to row"),
UNSUPPORTED_ERROR("Cdc-03", "Operation is not supported yet");
UNSUPPORTED_ERROR("Cdc-03", "Operation is not supported yet"),

OFFSET_VAL_ERROR("Cdc-04", "specified binlog offset require initial binlog offset, but binlog offset value is not set.");

private final String code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.ReaderOptions;
import com.bytedance.bitsail.connector.cdc.model.ClusterInfo;
import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffsetType;

import com.alibaba.fastjson.TypeReference;

Expand Down Expand Up @@ -51,4 +52,12 @@ public interface BinlogReaderOptions extends ReaderOptions.BaseReaderOptions {
ConfigOption<Integer> QUERY_RETRY_TIMES =
key(READER_PREFIX + "query_retry_times")
.defaultValue(3);

ConfigOption<String> INITIAL_OFFSET_TYPE =
key(READER_PREFIX + "initial_offset_type")
.defaultValue(String.valueOf(BinlogOffsetType.EARLIEST).toLowerCase());

ConfigOption<String> INITIAL_OFFSET_VALUE =
key(READER_PREFIX + "initial_offset_value")
.noDefaultValue(String.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter;
import com.bytedance.bitsail.common.type.TypeInfoConverter;
import com.bytedance.bitsail.connector.cdc.source.coordinator.CDCSourceSplitCoordinator;
import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BaseAssignmentState;
import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit;
import com.bytedance.bitsail.connector.cdc.source.state.BinlogOffsetState;

import java.io.IOException;

/**
* WIP: Source to read mysql binlog.
*/
public abstract class BinlogSource implements Source<Row, BinlogSplit, BinlogOffsetState>, ParallelismComputable {
public abstract class BinlogSource implements Source<Row, BinlogSplit, BaseAssignmentState>, ParallelismComputable {

protected BitSailConfiguration jobConf;

Expand All @@ -57,8 +57,8 @@ public Boundedness getSourceBoundedness() {
public abstract SourceReader<Row, BinlogSplit> createReader(SourceReader.Context readerContext);

@Override
public SourceSplitCoordinator<BinlogSplit, BinlogOffsetState> createSplitCoordinator(
SourceSplitCoordinator.Context<BinlogSplit, BinlogOffsetState> coordinatorContext) {
public SourceSplitCoordinator<BinlogSplit, BaseAssignmentState> createSplitCoordinator(
SourceSplitCoordinator.Context<BinlogSplit, BaseAssignmentState> coordinatorContext) {
return new CDCSourceSplitCoordinator(coordinatorContext, jobConf);
}

Expand All @@ -68,7 +68,7 @@ public BinarySerializer<BinlogSplit> getSplitSerializer() {
}

@Override
public BinarySerializer<BinlogOffsetState> getSplitCoordinatorCheckpointSerializer() {
public BinarySerializer<BaseAssignmentState> getSplitCoordinatorCheckpointSerializer() {
return new SimpleVersionedBinarySerializer<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,65 @@
import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent;
import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BaseAssignmentState;
import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BinlogAssignmentState;
import com.bytedance.bitsail.connector.cdc.source.event.BinlogCompleteAckEvent;
import com.bytedance.bitsail.connector.cdc.source.event.BinlogCompleteEvent;
import com.bytedance.bitsail.connector.cdc.source.event.BinlogStopReadEvent;
import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffset;
import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit;
import com.bytedance.bitsail.connector.cdc.source.state.BinlogOffsetState;

import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class CDCSourceSplitCoordinator implements SourceSplitCoordinator<BinlogSplit, BinlogOffsetState> {
public class CDCSourceSplitCoordinator implements SourceSplitCoordinator<BinlogSplit, BaseAssignmentState> {

private static final Logger LOG = LoggerFactory.getLogger(CDCSourceSplitCoordinator.class);

private final SourceSplitCoordinator.Context<BinlogSplit, BinlogOffsetState> context;
private final SourceSplitCoordinator.Context<BinlogSplit, BaseAssignmentState> context;
private final BitSailConfiguration jobConf;
private final Map<Integer, Set<BinlogSplit>> splitAssignmentPlan;
private boolean isBinlogAssigned;

public CDCSourceSplitCoordinator(SourceSplitCoordinator.Context<BinlogSplit, BinlogOffsetState> context,
public CDCSourceSplitCoordinator(SourceSplitCoordinator.Context<BinlogSplit, BaseAssignmentState> context,
BitSailConfiguration jobConf) {
this.context = context;
this.jobConf = jobConf;
this.splitAssignmentPlan = Maps.newConcurrentMap();
if (context.isRestored()) {
BaseAssignmentState restoredState = context.getRestoreState();
this.isBinlogAssigned = ((BinlogAssignmentState) restoredState).isAssigned();
LOG.info(String.format("Restore coordinator state, state type is: %s, binlog is assigned: %s",
restoredState.getType(), this.isBinlogAssigned));
} else {
this.isBinlogAssigned = false;
}
}

@Override
public void start() {
int totalReader = this.context.registeredReaders().size();
LOG.info("Total registered reader number: {}", totalReader);
// assign binlog split to reader
List<BinlogSplit> initialSplit = new ArrayList<>();
initialSplit.add(createSplit());
// test assign split to task0
this.context.assignSplit(0, initialSplit);
if (!this.isBinlogAssigned) {
List<BinlogSplit> splitList = new ArrayList<>();
BinlogSplit split = createSplit(this.jobConf);
splitList.add(split);
LOG.info("binlog is not assigned, assigning a new binlog split to reader: " + split.toString());
this.context.assignSplit(0, splitList);
this.isBinlogAssigned = true;
}
}

@Override
public void addReader(int subtaskId) {
// do not support add reader during the job is running
context.sendEventToSourceReader(subtaskId, new BinlogStopReadEvent());
}

@Override
public void addSplitsBack(List<BinlogSplit> splits, int subtaskId) {
LOG.info("Add split back to assignment plan: {}", splits);
splitAssignmentPlan.get(subtaskId).addAll(splits);
// do nothing
}

@Override
Expand All @@ -91,9 +96,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}

@Override
public BinlogOffsetState snapshotState() throws Exception {
public BaseAssignmentState snapshotState() {
// currently store nothing in state
return new BinlogOffsetState();
return new BinlogAssignmentState(this.isBinlogAssigned);
}

@Override
Expand All @@ -106,8 +111,8 @@ public void close() {
LOG.info("Closing MysqlSourceSplitCoordinator");
}

private BinlogSplit createSplit() {
BinlogOffset begin = BinlogOffset.earliest();
private BinlogSplit createSplit(BitSailConfiguration jobConf) {
BinlogOffset begin = BinlogOffset.createFromJobConf(jobConf);

BinlogOffset end = BinlogOffset.boundless();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.cdc.source.coordinator.state;

import com.bytedance.bitsail.base.serializer.SimpleVersionedBinarySerializer;

public class AssignmentStateSerializer extends SimpleVersionedBinarySerializer<BaseAssignmentState> {

public AssignmentStateSerializer() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,18 @@
* limitations under the License.
*/

package com.bytedance.bitsail.connector.cdc.source.state;
package com.bytedance.bitsail.connector.cdc.source.coordinator.state;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/**
* Store mysql offset in Flink State.
*/
public class BinlogOffsetState implements Serializable {
private static final long serialVersionUID = 1L;
public abstract class BaseAssignmentState implements Serializable {

Map<String, String> offsetStore;
private static final long serialVersionUID = 1L;

public BinlogOffsetState() {
this.offsetStore = new HashMap<>();
}
@Nullable
transient byte[] serializedCache;

public BinlogOffsetState(Map<String, String> offsetStore) {
this.offsetStore = offsetStore;
}
public abstract int getType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.cdc.source.coordinator.state;

/**
* State to store whether the binlog split was already assigned by coordinator.
*/
public class BinlogAssignmentState extends BaseAssignmentState {
private final boolean isAssigned;

@Override
public int getType() {
return SplitType.BINLOG_SPLIT_TYPE;
}

public BinlogAssignmentState(boolean isAssigned) {
this.isAssigned = isAssigned;
}

public boolean isAssigned() {
return isAssigned;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.cdc.source.coordinator.state;

/**
* Store the snapshot assignment status.
*/
public class SnapshotAssignmentState extends BaseAssignmentState {

public SnapshotAssignmentState() {

}

@Override
public int getType() {
return SplitType.SNAPSHOT_SPLIT_TYPE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.cdc.source.coordinator.state;

public class SplitType {
public static final int BINLOG_SPLIT_TYPE = 1;
public static final int SNAPSHOT_SPLIT_TYPE = 2;
}
Loading

0 comments on commit fa28fbd

Please sign in to comment.