Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33265] Support source parallelism setting for Kafka connector #134

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,14 @@ CREATE TABLE KafkaTable (
<td>Duration</td>
<td>Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>定义 Kafka source 算子的并行度。默认情况下会使用全局默认并行度。</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>可选</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ of all available metadata fields.
</ul>
</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td>可选</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ Connector Options
<td>Duration</td>
<td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically. To disable this feature, you need to explicitly set the 'scan.topic-partition-discovery.interval' value to 0.</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the Kafka source operator. If not set, the global default parallelism is used.</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>optional</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ Connector Options
format which means that key columns appear in the data type for both the key and value format.
</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the upsert-kafka source operator. If not set, the global default parallelism is used.</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class KafkaConnectorOptions {
ValueFieldsStrategy.EXCEPT_KEY))
.build());

public static final ConfigOption<Integer> SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM;
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -171,6 +172,9 @@ public class KafkaDynamicSource

protected final String tableIdentifier;

/** Parallelism of the physical Kafka consumer. * */
protected final @Nullable Integer parallelism;

public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
Expand All @@ -188,7 +192,8 @@ public KafkaDynamicSource(
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier) {
String tableIdentifier,
@Nullable Integer parallelism) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -228,6 +233,7 @@ public KafkaDynamicSource(
this.boundedTimestampMillis = boundedTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
this.parallelism = parallelism;
}

@Override
Expand Down Expand Up @@ -267,6 +273,11 @@ public DataStream<RowData> produceDataStream(
public boolean isBounded() {
return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
}

@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}
};
}

Expand Down Expand Up @@ -344,7 +355,8 @@ public DynamicTableSource copy() {
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier);
tableIdentifier,
parallelism);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
Expand Down Expand Up @@ -384,7 +396,8 @@ public boolean equals(Object o) {
&& boundedTimestampMillis == that.boundedTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
&& Objects.equals(parallelism, that.parallelism);
}

@Override
Expand All @@ -409,7 +422,8 @@ public int hashCode() {
boundedTimestampMillis,
upsertMode,
tableIdentifier,
watermarkStrategy);
watermarkStrategy,
parallelism);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -152,6 +153,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_BOUNDED_MODE);
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
options.add(SCAN_PARALLELISM);
return options;
}

Expand All @@ -166,6 +168,7 @@ public Set<ConfigOption<?>> forwardOptions() {
SCAN_STARTUP_SPECIFIC_OFFSETS,
SCAN_TOPIC_PARTITION_DISCOVERY,
SCAN_STARTUP_TIMESTAMP_MILLIS,
SCAN_PARALLELISM,
SINK_PARTITIONER,
SINK_PARALLELISM,
TRANSACTIONAL_ID_PREFIX)
Expand Down Expand Up @@ -215,6 +218,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

final Integer parallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);

return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
Expand All @@ -231,7 +236,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boundedOptions.boundedMode,
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
context.getObjectIdentifier().asSummaryString());
context.getObjectIdentifier().asSummaryString(),
parallelism);
}

@Override
Expand Down Expand Up @@ -396,7 +402,8 @@ protected KafkaDynamicSource createKafkaTableSource(
BoundedMode boundedMode,
Map<KafkaTopicPartition, Long> specificEndOffsets,
long endTimestampMillis,
String tableIdentifier) {
String tableIdentifier,
Integer parallelism) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
Expand All @@ -414,7 +421,8 @@ protected KafkaDynamicSource createKafkaTableSource(
specificEndOffsets,
endTimestampMillis,
false,
tableIdentifier);
tableIdentifier,
parallelism);
}

protected KafkaDynamicSink createKafkaTableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
Expand Down Expand Up @@ -115,6 +116,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
options.add(DELIVERY_GUARANTEE);
options.add(TRANSACTIONAL_ID_PREFIX);
options.add(SCAN_PARALLELISM);
return options;
}

Expand Down Expand Up @@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);

Integer parallelism = tableOptions.get(SCAN_PARALLELISM);
X-czh marked this conversation as resolved.
Show resolved Hide resolved

return new KafkaDynamicSource(
context.getPhysicalRowDataType(),
keyDecodingFormat,
Expand All @@ -167,7 +171,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
true,
context.getObjectIdentifier().asSummaryString());
context.getObjectIdentifier().asSummaryString(),
parallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
import static org.apache.flink.table.factories.FactoryUtil.SOURCE_PARALLELISM;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -212,14 +213,57 @@ public void testTableSource() {
KAFKA_SOURCE_PROPERTIES,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
0,
null);
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertKafkaSource(provider);
}

@Test
public void testTableSourceWithParallelism() {
final Map<String, String> modifiedOptions =
getModifiedOptions(
getBasicSourceOptions(),
options -> options.put(SOURCE_PARALLELISM.key(), "100"));
final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions);
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;

final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);

final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
new DecodingFormatMock(",", true);

// Test scan source equals
final KafkaDynamicSource expectedKafkaSource =
createExpectedScanSource(
SCHEMA_DATA_TYPE,
null,
valueDecodingFormat,
new int[0],
new int[] {0, 1, 2},
null,
Collections.singletonList(TOPIC),
null,
KAFKA_SOURCE_PROPERTIES,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0,
100);
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider;
assertThat(sourceProvider.getParallelism()).isPresent();
assertThat(sourceProvider.getParallelism()).hasValue(100);
}

@Test
public void testTableSourceWithPattern() {
final Map<String, String> modifiedOptions =
Expand Down Expand Up @@ -254,7 +298,8 @@ public void testTableSourceWithPattern() {
KAFKA_SOURCE_PROPERTIES,
StartupMode.EARLIEST,
specificOffsets,
0);
0,
null);
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

Expand Down Expand Up @@ -295,7 +340,8 @@ public void testTableSourceWithKeyValue() {
KAFKA_FINAL_SOURCE_PROPERTIES,
StartupMode.GROUP_OFFSETS,
Collections.emptyMap(),
0);
0,
null);

assertThat(actualSource).isEqualTo(expectedKafkaSource);
}
Expand Down Expand Up @@ -346,7 +392,8 @@ public void testTableSourceWithKeyValueAndMetadata() {
KAFKA_FINAL_SOURCE_PROPERTIES,
StartupMode.GROUP_OFFSETS,
Collections.emptyMap(),
0);
0,
null);
expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp");

Expand Down Expand Up @@ -1188,7 +1235,8 @@ public void testDiscoverPartitionByDefault() {
props,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
0,
null);
assertThat(actualSource).isEqualTo(expectedKafkaSource);
ScanTableSource.ScanRuntimeProvider provider =
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
Expand Down Expand Up @@ -1226,7 +1274,8 @@ public void testDisableDiscoverPartition() {
props,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
0,
null);
assertThat(actualSource).isEqualTo(expectedKafkaSource);
ScanTableSource.ScanRuntimeProvider provider =
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
Expand All @@ -1249,7 +1298,8 @@ private static KafkaDynamicSource createExpectedScanSource(
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
long startupTimestampMillis,
@Nullable Integer parallelism) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
Expand All @@ -1267,7 +1317,8 @@ private static KafkaDynamicSource createExpectedScanSource(
Collections.emptyMap(),
0,
false,
FactoryMocks.IDENTIFIER.asSummaryString());
FactoryMocks.IDENTIFIER.asSummaryString(),
parallelism);
}

private static KafkaDynamicSink createExpectedSink(
Expand Down
Loading