Skip to content

Commit

Permalink
repair CSL on mismatch with Domain config
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Feb 4, 2025
1 parent 7b30704 commit a8cf24c
Show file tree
Hide file tree
Showing 28 changed files with 758 additions and 772 deletions.
28 changes: 9 additions & 19 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2848,30 +2848,20 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
newDefn.mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
bsl::vector<bsl::string> addedIds(d_allocator_p);
bsl::vector<bsl::string> removedIds(d_allocator_p);
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

// TODO: This should be one call - one QueueUpdateAdvisory for all Apps
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId,
&d_clusterOrchestrator,
*it,
bsl::ref(domain)),
this);
}
for (it = removedIds.cbegin(); it != removedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::unregisterAppId,
&d_clusterOrchestrator,
*it,
bsl::ref(domain)),
this);
}
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::updateAppIds,
&d_clusterOrchestrator,
addedIds,
removedIds,
domain.name()),
this);
}

int Cluster::processCommand(mqbcmd::ClusterResult* result,
Expand Down
44 changes: 25 additions & 19 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,31 @@ void ClusterOrchestrator::registerQueueInfo(const bmqt::Uri& uri,
const AppInfos& appIdInfos,
bool forceUpdate)
{
// TODO_CSL: Legacy only. Remove.

// executed by the *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());
BSLS_ASSERT_SAFE(uri.isCanonical());

d_stateManager_mp->registerQueueInfo(uri,
partitionId,
queueKey,
appIdInfos,
forceUpdate);
bmqp_ctrlmsg::QueueInfo advisory(d_allocator_p);

advisory.uri() = uri.canonical();
advisory.partitionId() = partitionId;
queueKey.loadBinary(&advisory.key());

advisory.appIds().resize(appIdInfos.size());
int i = 0;
for (AppInfos::const_iterator cit = appIdInfos.cbegin();
cit != appIdInfos.cend();
++cit, ++i) {
advisory.appIds()[i].appId() = cit->first;
cit->second.loadBinary(&advisory.appIds()[i].appKey());
}

d_stateManager_mp->registerQueueInfo(advisory, forceUpdate);
}

void ClusterOrchestrator::onPartitionPrimaryStatusDispatched(
Expand Down Expand Up @@ -1938,26 +1951,19 @@ void ClusterOrchestrator::validateClusterStateLedger()
d_stateManager_mp->validateClusterStateLedger();
}

void ClusterOrchestrator::registerAppId(bsl::string appId,
const mqbi::Domain& domain)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));

d_stateManager_mp->registerAppId(appId, &domain);
}

void ClusterOrchestrator::unregisterAppId(bsl::string appId,
const mqbi::Domain& domain)
void ClusterOrchestrator::updateAppIds(const bsl::vector<bsl::string>& added,
const bsl::vector<bsl::string>& removed,
const bsl::string& domainName)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));

d_stateManager_mp->unregisterAppId(appId, &domain);
d_stateManager_mp->updateAppIds(added,
removed,
domainName,
""); // for all queues
}

void ClusterOrchestrator::onPartitionPrimaryStatus(int partitionId,
Expand Down
22 changes: 7 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,12 @@ class ClusterOrchestrator {
/// IncoreCSL.
void validateClusterStateLedger();

/// Invoked by @bbref{mqbblp::Cluster} to register a new `appId` for
/// `domain`.
///
/// Note: As this function is dispatched from a separate thread, `appId`
/// is taken by value to ensure it survives the lifetime of this
/// function call.
void registerAppId(bsl::string appId, const mqbi::Domain& domain);

/// Invoked by @bbref{mqbblp::Cluster} to unregister an `appId` for
/// `domain`.
///
/// Note: As this function is dispatched from a separate thread, `appId`
/// is taken by value to ensure it survives the lifetime of this
/// function call.
void unregisterAppId(bsl::string appId, const mqbi::Domain& domain);
/// Unregister the specified 'removed' and register the specified `added`
/// for the specified `domain`.
/// Invoked by @bbref{mqbblp::Cluster}.
void updateAppIds(const bsl::vector<bsl::string>& added,
const bsl::vector<bsl::string>& removed,
const bsl::string& domainName);

/// Register a queue info for the queue with the specified `uri`,
/// `partitionId`, `queueKey` and `appIdInfos`. If the specified
Expand All @@ -511,6 +502,7 @@ class ClusterOrchestrator {
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
/// TODO_CSL: Legacy only. Remove.
void registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
Expand Down
Loading

0 comments on commit a8cf24c

Please sign in to comment.