Skip to content

Commit

Permalink
[FLINK-33265] Support source parallelism setting for Kafka connector
Browse files Browse the repository at this point in the history
  • Loading branch information
X-czh committed Nov 16, 2024
1 parent 727327d commit 6ffb8cc
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 20 deletions.
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,14 @@ CREATE TABLE KafkaTable (
<td>Duration</td>
<td>Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>定义 Kafka source 算子的并行度。默认情况下会使用全局默认并发。</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>可选</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ of all available metadata fields.
</ul>
</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并发。</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td>可选</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ Connector Options
<td>Duration</td>
<td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically. To disable this feature, you need to explicitly set the 'scan.topic-partition-discovery.interval' value to 0.</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the Kafka source operator. If not set, the global default parallelism is used.</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>optional</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ Connector Options
format which means that key columns appear in the data type for both the key and value format.
</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the upsert-kafka source operator. If not set, the global default parallelism is used.</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class KafkaConnectorOptions {
ValueFieldsStrategy.EXCEPT_KEY))
.build());

public static final ConfigOption<Integer> SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM;
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -171,6 +172,9 @@ public class KafkaDynamicSource

protected final String tableIdentifier;

/** Parallelism of the physical Kafka consumer. * */
protected final @Nullable Integer parallelism;

public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
Expand All @@ -188,7 +192,8 @@ public KafkaDynamicSource(
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier) {
String tableIdentifier,
@Nullable Integer parallelism) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -228,6 +233,7 @@ public KafkaDynamicSource(
this.boundedTimestampMillis = boundedTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
this.parallelism = parallelism;
}

@Override
Expand Down Expand Up @@ -267,6 +273,11 @@ public DataStream<RowData> produceDataStream(
public boolean isBounded() {
return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
}

@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}
};
}

Expand Down Expand Up @@ -344,7 +355,8 @@ public DynamicTableSource copy() {
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier);
tableIdentifier,
parallelism);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
Expand Down Expand Up @@ -384,7 +396,8 @@ public boolean equals(Object o) {
&& boundedTimestampMillis == that.boundedTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
&& Objects.equals(parallelism, that.parallelism);
}

@Override
Expand All @@ -409,7 +422,8 @@ public int hashCode() {
boundedTimestampMillis,
upsertMode,
tableIdentifier,
watermarkStrategy);
watermarkStrategy,
parallelism);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -152,6 +153,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_BOUNDED_MODE);
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
options.add(SCAN_PARALLELISM);
return options;
}

Expand All @@ -166,6 +168,7 @@ public Set<ConfigOption<?>> forwardOptions() {
SCAN_STARTUP_SPECIFIC_OFFSETS,
SCAN_TOPIC_PARTITION_DISCOVERY,
SCAN_STARTUP_TIMESTAMP_MILLIS,
SCAN_PARALLELISM,
SINK_PARTITIONER,
SINK_PARALLELISM,
TRANSACTIONAL_ID_PREFIX)
Expand Down Expand Up @@ -215,6 +218,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

final Integer parallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);

return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
Expand All @@ -231,7 +236,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boundedOptions.boundedMode,
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
context.getObjectIdentifier().asSummaryString());
context.getObjectIdentifier().asSummaryString(),
parallelism);
}

@Override
Expand Down Expand Up @@ -396,7 +402,8 @@ protected KafkaDynamicSource createKafkaTableSource(
BoundedMode boundedMode,
Map<KafkaTopicPartition, Long> specificEndOffsets,
long endTimestampMillis,
String tableIdentifier) {
String tableIdentifier,
Integer parallelism) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
Expand All @@ -414,7 +421,8 @@ protected KafkaDynamicSource createKafkaTableSource(
specificEndOffsets,
endTimestampMillis,
false,
tableIdentifier);
tableIdentifier,
parallelism);
}

protected KafkaDynamicSink createKafkaTableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
Expand Down Expand Up @@ -115,6 +116,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
options.add(DELIVERY_GUARANTEE);
options.add(TRANSACTIONAL_ID_PREFIX);
options.add(SCAN_PARALLELISM);
return options;
}

Expand Down Expand Up @@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);

Integer parallelism = tableOptions.get(SCAN_PARALLELISM);

return new KafkaDynamicSource(
context.getPhysicalRowDataType(),
keyDecodingFormat,
Expand All @@ -167,7 +171,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
true,
context.getObjectIdentifier().asSummaryString());
context.getObjectIdentifier().asSummaryString(),
parallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,56 @@ public void testTableSource() {
KAFKA_SOURCE_PROPERTIES,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
0,
null);
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertKafkaSource(provider);
}

@Test
public void testTableSourceWithParallelism() {
final Map<String, String> modifiedOptions =
getModifiedOptions(
getBasicSourceOptions(), options -> options.put("scan.parallelism", "100"));
final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions);
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;

final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);

final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
new DecodingFormatMock(",", true);

// Test scan source equals
final KafkaDynamicSource expectedKafkaSource =
createExpectedScanSource(
SCHEMA_DATA_TYPE,
null,
valueDecodingFormat,
new int[0],
new int[] {0, 1, 2},
null,
Collections.singletonList(TOPIC),
null,
KAFKA_SOURCE_PROPERTIES,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0,
100);
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider;
assertThat(sourceProvider.getParallelism().isPresent()).isTrue();
assertThat(sourceProvider.getParallelism().get()).isEqualTo(100);
}

@Test
public void testTableSourceWithPattern() {
final Map<String, String> modifiedOptions =
Expand Down Expand Up @@ -254,7 +296,8 @@ public void testTableSourceWithPattern() {
KAFKA_SOURCE_PROPERTIES,
StartupMode.EARLIEST,
specificOffsets,
0);
0,
null);
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

Expand Down Expand Up @@ -295,7 +338,8 @@ public void testTableSourceWithKeyValue() {
KAFKA_FINAL_SOURCE_PROPERTIES,
StartupMode.GROUP_OFFSETS,
Collections.emptyMap(),
0);
0,
null);

assertThat(actualSource).isEqualTo(expectedKafkaSource);
}
Expand Down Expand Up @@ -346,7 +390,8 @@ public void testTableSourceWithKeyValueAndMetadata() {
KAFKA_FINAL_SOURCE_PROPERTIES,
StartupMode.GROUP_OFFSETS,
Collections.emptyMap(),
0);
0,
null);
expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp");

Expand Down Expand Up @@ -1188,7 +1233,8 @@ public void testDiscoverPartitionByDefault() {
props,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
0,
null);
assertThat(actualSource).isEqualTo(expectedKafkaSource);
ScanTableSource.ScanRuntimeProvider provider =
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
Expand Down Expand Up @@ -1226,7 +1272,8 @@ public void testDisableDiscoverPartition() {
props,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
0,
null);
assertThat(actualSource).isEqualTo(expectedKafkaSource);
ScanTableSource.ScanRuntimeProvider provider =
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
Expand All @@ -1249,7 +1296,8 @@ private static KafkaDynamicSource createExpectedScanSource(
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
long startupTimestampMillis,
@Nullable Integer parallelism) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
Expand All @@ -1267,7 +1315,8 @@ private static KafkaDynamicSource createExpectedScanSource(
Collections.emptyMap(),
0,
false,
FactoryMocks.IDENTIFIER.asSummaryString());
FactoryMocks.IDENTIFIER.asSummaryString(),
parallelism);
}

private static KafkaDynamicSink createExpectedSink(
Expand Down
Loading

0 comments on commit 6ffb8cc

Please sign in to comment.