Skip to content

Commit

Permalink
Feat[bmqtool]: collect ACK latencies (#591)
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
Co-authored-by: Anton Pryakhin <[email protected]>
  • Loading branch information
678098 and waldgange authored Feb 10, 2025
1 parent fef718b commit fa69fcb
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 221 deletions.
2 changes: 1 addition & 1 deletion src/applications/bmqtool/bmqtool.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ int main(int argc, const char* argv[])
? &ta
: bslma::Default::allocator();

Application app(&parameters, &shutdownSemaphore, allocator);
Application app(parameters, &shutdownSemaphore, allocator);
if (app.start() != 0) {
return 2; // RETURN
}
Expand Down
323 changes: 185 additions & 138 deletions src/applications/bmqtool/m_bmqtool_application.cpp

Large diffs are not rendered by default.

47 changes: 34 additions & 13 deletions src/applications/bmqtool/m_bmqtool_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <bdlbb_blob.h>
#include <bdlbb_pooledblobbufferfactory.h>
#include <bdlmt_eventscheduler.h>
#include <bdlmt_throttle.h>
#include <bsl_list.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
Expand Down Expand Up @@ -83,9 +84,10 @@ class Application : public bmqa::SessionEventHandler {
bslma::Allocator* d_allocator_p;
// Held, not owned

Parameters* d_parameters_p;
// Command-line parameters. Held, not
// owned
/// Run parameters
/// Copy is made to ensure lifetime of the Parameters object used by this
/// Application
const Parameters d_parameters;

bslmt::Semaphore* d_shutdownSemaphore_p;
// Semaphore holding the main thread
Expand Down Expand Up @@ -132,11 +134,26 @@ class Application : public bmqa::SessionEventHandler {
Interactive d_interactive;
// CLI handler.

bsl::list<bsls::Types::Int64> d_latencies;
// List of all message latencies (in
// ns). Only populated when requested
// to generate a latency report (with
// --latency-report).
/// A throttle object used to control confirm latency logging on a consumer
bdlmt::Throttle d_confirmLatencyThrottle;

/// List of all confirm message latencies (in ns).
/// Confirm message latency is the end-to-end time to deliver a message,
/// starting from producer post and ending on a consumer.
/// Only populated when requested to generate a latency report (with
/// --latency-report).
bsl::list<bsls::Types::Int64> d_confirmLatencies;

/// A throttle object used to control ack latency logging on a consumer
bdlmt::Throttle d_ackLatencyThrottle;

/// List of all ack message latencies (in ns).
/// Ack message latency is the time between posting a message and getting
/// an ACK for it, meaning that the message was at least replicated with
/// a needed quorum (delivery might not have happened yet).
/// Only populated when requested to generate a latency report (with
/// --latency-report).
bsl::list<bsls::Types::Int64> d_ackLatencies;

bsls::AtomicBool d_autoReadInProgress;
// Auto-consume mode only. True if a
Expand Down Expand Up @@ -198,8 +215,12 @@ class Application : public bmqa::SessionEventHandler {
/// Print the final stats to the standard output, at exit time.
void printFinalStats();

/// Generate the latency report.
void generateLatencyReport();
/// Generate the latency report from the specified `latencies`.
/// The specified `name` represents the origin of the latencies, it is
/// either end-to-end latency (producer->consumer) or ack latency
/// (producer->cluster->producer-ack).
void generateLatencyReport(const bsl::list<bsls::Types::Int64>& latencies,
const bslstl::StringRef& name);

/// Do any `pre` run initialization, such as connecting to bmqbrkr,
/// opening a queue, preparing the blob to publish, ... Return 0 on
Expand All @@ -219,9 +240,9 @@ class Application : public bmqa::SessionEventHandler {
// CREATORS

/// Constructor
Application(m_bmqtool::Parameters* parameters,
bslmt::Semaphore* shutdownSemaphore,
bslma::Allocator* allocator);
Application(const m_bmqtool::Parameters& parameters,
bslmt::Semaphore* shutdownSemaphore,
bslma::Allocator* allocator);

~Application() BSLS_KEYWORD_OVERRIDE;

Expand Down
19 changes: 9 additions & 10 deletions src/applications/bmqtool/m_bmqtool_interactive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ void Interactive::processCommand(const StartCommand& command)
<< "<-- session.start(5.0) => " << bmqt::GenericResult::Enum(rc)
<< " (" << rc << ")";

if (d_parameters_p->noSessionEventHandler()) {
if (d_parameters.noSessionEventHandler()) {
BALL_LOG_INFO << "Creating processing threads";
for (int i = 0; i < d_parameters_p->numProcessingThreads(); ++i) {
for (int i = 0; i < d_parameters.numProcessingThreads(); ++i) {
bslmt::ThreadUtil::Handle threadHandle;
rc = bslmt::ThreadUtil::create(
&threadHandle,
Expand Down Expand Up @@ -218,7 +218,7 @@ void Interactive::processCommand(const StopCommand& command)

BALL_LOG_INFO << "<-- session.stop()";

if (d_parameters_p->noSessionEventHandler()) {
if (d_parameters.noSessionEventHandler()) {
// Join on all threads
BALL_LOG_INFO << "Joining event handler threads";
for (size_t i = 0; i < d_eventHandlerThreads.size(); ++i) {
Expand Down Expand Up @@ -733,7 +733,7 @@ void Interactive::processCommand(const BatchPostCommand& command)
}

bsl::shared_ptr<PostingContext> postingContext =
d_poster_p->createPostingContext(d_session_p, &parameters, queueId);
d_poster_p->createPostingContext(d_session_p, parameters, queueId);

while (postingContext->pendingPost()) {
postingContext->postNext();
Expand Down Expand Up @@ -887,20 +887,19 @@ void Interactive::eventHandlerThread()
BALL_LOG_INFO << "EventHandlerThread terminated";
}

Interactive::Interactive(Parameters* parameters,
Interactive::Interactive(const Parameters& parameters,
Poster* poster,
bslma::Allocator* allocator)
: d_session_p(0)
, d_sessionEventHandler_p(0)
, d_parameters_p(parameters)
, d_parameters(parameters)
, d_uris(allocator)
, d_eventHandlerThreads(allocator)
, d_producerIdProperty("** NONE **", allocator)
, d_poster_p(poster)
, d_allocator_p(allocator)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(parameters);
BSLS_ASSERT_SAFE(poster);

bdls::ProcessUtil::getProcessName(&d_producerIdProperty); // ignore rc
Expand Down Expand Up @@ -1088,7 +1087,7 @@ void Interactive::onMessage(const bmqa::Message& message)

void Interactive::onOpenQueueStatus(const bmqa::OpenQueueStatus& status)
{
if (d_parameters_p->verbosity() != ParametersVerbosity::e_SILENT) {
if (d_parameters.verbosity() != ParametersVerbosity::e_SILENT) {
BALL_LOG_INFO << "==> OPEN_QUEUE_RESULT received: " << status;
}

Expand All @@ -1112,7 +1111,7 @@ void Interactive::onOpenQueueStatus(const bmqa::OpenQueueStatus& status)
void Interactive::onConfigureQueueStatus(
const bmqa::ConfigureQueueStatus& status)
{
if (d_parameters_p->verbosity() != ParametersVerbosity::e_SILENT) {
if (d_parameters.verbosity() != ParametersVerbosity::e_SILENT) {
BALL_LOG_INFO << "==> CONFIGURE_QUEUE_RESULT received: " << status;
}

Expand All @@ -1126,7 +1125,7 @@ void Interactive::onConfigureQueueStatus(

void Interactive::onCloseQueueStatus(const bmqa::CloseQueueStatus& status)
{
if (d_parameters_p->verbosity() != ParametersVerbosity::e_SILENT) {
if (d_parameters.verbosity() != ParametersVerbosity::e_SILENT) {
BALL_LOG_INFO << "==> CLOSE_QUEUE_RESULT received: " << status;
}

Expand Down
7 changes: 4 additions & 3 deletions src/applications/bmqtool/m_bmqtool_interactive.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class Interactive {
// Session handler to use for events
// processing, if using custom event handler

Parameters* d_parameters_p;
// Parameters to use
/// Parameters to use, the object under this reference is owned by
/// the Application that uses this Interactive object
const Parameters& d_parameters;

bslmt::Mutex d_mutex;
// Mutex to protect below map
Expand Down Expand Up @@ -171,7 +172,7 @@ class Interactive {

/// Constructor using the specified `parameters`, 'poster',
/// and `allocator`.
Interactive(Parameters* parameters,
Interactive(const Parameters& parameters,
Poster* poster,
bslma::Allocator* allocator);

Expand Down
2 changes: 1 addition & 1 deletion src/applications/bmqtool/m_bmqtool_parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ bool Parameters::from(bsl::ostream& stream,
return true;
}

void Parameters::dump(bsl::ostream& stream)
void Parameters::dump(bsl::ostream& stream) const
{
print(stream, 0, -1);
}
Expand Down
6 changes: 3 additions & 3 deletions src/applications/bmqtool/m_bmqtool_parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ class Parameters {
/// error in the specified `stream` and return false on failure.
bool from(bsl::ostream& stream, const CommandLineParameters& params);

/// Do a nicer pretty print of all the parameters aligned.
void dump(bsl::ostream& stream);

/// Validate the consistency of all settings.
bool validate(bsl::string* error);

// ACCESSORS

/// Do a nicer pretty print of all the parameters aligned.
void dump(bsl::ostream& stream) const;

/// Format this object to the specified output `stream` at the (absolute
/// value of) the optionally specified indentation `level` and return a
/// reference to `stream`. If `level` is specified, optionally specify
Expand Down
Loading

0 comments on commit fa69fcb

Please sign in to comment.