diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index d137a903d2a..68d83c0286d 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -66,7 +66,7 @@ public class InlongSingleTopicManager extends TopicManager { private static final Logger LOGGER = LoggerFactory.getLogger(InlongSingleTopicManager.class); private final ConcurrentHashMap fetchers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap pulsarClients = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap pulsarClients = new ConcurrentHashMap<>(); private final ConcurrentHashMap tubeFactories = new ConcurrentHashMap<>(); private final PeriodicTask updateMetaDataWorker; @@ -199,7 +199,7 @@ public boolean clean() { } closeFetcher(); - closePulsarClient(); + // closePulsarClient(); closeTubeSessionFactory(); LOGGER.info("close finished {}", sortTaskId); return true; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java index 6d3343293b2..0d7086de819 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java @@ -65,7 +65,7 @@ public class InlongTopicManager extends TopicManager { private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private final Map fetchers = new ConcurrentHashMap<>(); - private final Map pulsarClients = new ConcurrentHashMap<>(); + private static final Map pulsarClients = new ConcurrentHashMap<>(); private final Map tubeFactories = new ConcurrentHashMap<>(); protected final ForkJoinPool pool; @@ -88,7 +88,7 @@ public boolean clean() { LOGGER.info("start to clean topic manager, sortTaskId={}", sortTaskId); stopAssign = true; closeAllFetchers(); - closeAllPulsarClients(); + // closeAllPulsarClients(); closeAllTubeFactories(); LOGGER.info("success to clean topic manager, sortTaskId={}", sortTaskId); return true; @@ -359,33 +359,38 @@ private void onlinePulsarClients() { private void createPulsarClient(CacheZoneCluster cluster) { LOGGER.info("start to init pulsar client for cluster={}", cluster); - if (cluster.getBootstraps() != null) { - try { - String token = cluster.getToken(); - Authentication auth = null; - if (StringUtils.isNoneBlank(token)) { - auth = AuthenticationFactory.token(token); - } - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(cluster.getBootstraps()) - .authentication(auth) - .build(); - LOGGER.info("create pulsar client succ cluster:{}, token:{}", - cluster.getClusterId(), - cluster.getToken()); - PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient); - if (oldClient != null && !oldClient.isClosed()) { - LOGGER.warn("close new pulsar client for cluster={}", cluster); - pulsarClient.close(); - } - } catch (Exception e) { - LOGGER.error("create pulsar client error for cluster={}", cluster, e); - return; - } - LOGGER.info("success to init pulsar client for cluster={}", cluster); - } else { + String clientKey = cluster.getBootstraps(); + if (clientKey == null) { LOGGER.error("bootstrap is null for cluster={}", cluster); + return; + } + if (pulsarClients.containsKey(clientKey)) { + LOGGER.info("Repeat to init pulsar client for cluster={}", cluster); + return; + } + try { + String token = cluster.getToken(); + Authentication auth = null; + if (StringUtils.isNoneBlank(token)) { + auth = AuthenticationFactory.token(token); + } + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(cluster.getBootstraps()) + .authentication(auth) + .build(); + LOGGER.info("create pulsar client succ cluster:{}, token:{}", + cluster.getClusterId(), + cluster.getToken()); + PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient); + if (oldClient != null && !oldClient.isClosed()) { + LOGGER.warn("close new pulsar client for cluster={}", cluster); + pulsarClient.close(); + } + } catch (Exception e) { + LOGGER.error("create pulsar client error for cluster={}", cluster, e); + return; } + LOGGER.info("success to init pulsar client for cluster={}", cluster); } private List getCacheZoneClusters(InlongTopicTypeEnum type) {