Skip to content

Commit

Permalink
[INLONG-11711][SDK] SortSDK shares the same PulsarClient among differ…
Browse files Browse the repository at this point in the history
…ent SortTasks to avoid performance bottlenecks caused by too many PulsarClients (#11712)
  • Loading branch information
luchunliang authored Feb 10, 2025
1 parent 86c893c commit 22ee4c3
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class InlongSingleTopicManager extends TopicManager {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongSingleTopicManager.class);

private final ConcurrentHashMap<String, TopicFetcher> fetchers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories = new ConcurrentHashMap<>();

private final PeriodicTask updateMetaDataWorker;
Expand Down Expand Up @@ -199,7 +199,7 @@ public boolean clean() {
}

closeFetcher();
closePulsarClient();
// closePulsarClient();
closeTubeSessionFactory();
LOGGER.info("close finished {}", sortTaskId);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class InlongTopicManager extends TopicManager {

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final Map<String, TopicFetcher> fetchers = new ConcurrentHashMap<>();
private final Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
private static final Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
private final Map<String, TubeConsumerCreator> tubeFactories = new ConcurrentHashMap<>();

protected final ForkJoinPool pool;
Expand All @@ -88,7 +88,7 @@ public boolean clean() {
LOGGER.info("start to clean topic manager, sortTaskId={}", sortTaskId);
stopAssign = true;
closeAllFetchers();
closeAllPulsarClients();
// closeAllPulsarClients();
closeAllTubeFactories();
LOGGER.info("success to clean topic manager, sortTaskId={}", sortTaskId);
return true;
Expand Down Expand Up @@ -359,33 +359,38 @@ private void onlinePulsarClients() {

private void createPulsarClient(CacheZoneCluster cluster) {
LOGGER.info("start to init pulsar client for cluster={}", cluster);
if (cluster.getBootstraps() != null) {
try {
String token = cluster.getToken();
Authentication auth = null;
if (StringUtils.isNoneBlank(token)) {
auth = AuthenticationFactory.token(token);
}
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(cluster.getBootstraps())
.authentication(auth)
.build();
LOGGER.info("create pulsar client succ cluster:{}, token:{}",
cluster.getClusterId(),
cluster.getToken());
PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
if (oldClient != null && !oldClient.isClosed()) {
LOGGER.warn("close new pulsar client for cluster={}", cluster);
pulsarClient.close();
}
} catch (Exception e) {
LOGGER.error("create pulsar client error for cluster={}", cluster, e);
return;
}
LOGGER.info("success to init pulsar client for cluster={}", cluster);
} else {
String clientKey = cluster.getBootstraps();
if (clientKey == null) {
LOGGER.error("bootstrap is null for cluster={}", cluster);
return;
}
if (pulsarClients.containsKey(clientKey)) {
LOGGER.info("Repeat to init pulsar client for cluster={}", cluster);
return;
}
try {
String token = cluster.getToken();
Authentication auth = null;
if (StringUtils.isNoneBlank(token)) {
auth = AuthenticationFactory.token(token);
}
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(cluster.getBootstraps())
.authentication(auth)
.build();
LOGGER.info("create pulsar client succ cluster:{}, token:{}",
cluster.getClusterId(),
cluster.getToken());
PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
if (oldClient != null && !oldClient.isClosed()) {
LOGGER.warn("close new pulsar client for cluster={}", cluster);
pulsarClient.close();
}
} catch (Exception e) {
LOGGER.error("create pulsar client error for cluster={}", cluster, e);
return;
}
LOGGER.info("success to init pulsar client for cluster={}", cluster);
}

private List<CacheZoneCluster> getCacheZoneClusters(InlongTopicTypeEnum type) {
Expand Down

0 comments on commit 22ee4c3

Please sign in to comment.