Skip to content

Commit

Permalink
Add test for calling onFailure when removing queued publish #1542
Browse files Browse the repository at this point in the history
  • Loading branch information
icraggs committed Dec 20, 2024
1 parent cb4facf commit 25bca9d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 41 deletions.
37 changes: 18 additions & 19 deletions src/MQTTAsyncUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -906,28 +906,27 @@ int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
#if !defined(NO_PERSISTENCE)
if (command->client->c->persistence)
MQTTAsync_unpersistCommand(first_publish);
#endif
#endif
if (first_publish->command.onFailure)
{
MQTTAsync_failureData data;

data.token = first_publish->command.token;
data.code = MQTTASYNC_MAX_BUFFERED_MESSAGES;
data.message = NULL;
Log(TRACE_MIN, -1, "Cache is full, messages have been removed.");
(*(first_publish->command.onFailure))(first_publish->command.context, &data);
}
else if (first_publish->command.onFailure5)
MQTTAsync_failureData data;

data.token = first_publish->command.token;
data.code = MQTTASYNC_MAX_BUFFERED_MESSAGES;
data.message = NULL;
Log(TRACE_MIN, -1, "Calling connect failure for client %s, rc %d", command->client->c->clientID, data.code);
(*(first_publish->command.onFailure))(first_publish->command.context, &data);
} else if (first_publish->command.onFailure5)
{
MQTTAsync_failureData5 data;

data.token = first_publish->command.token;
data.code = MQTTASYNC_MAX_BUFFERED_MESSAGES;
data.message = NULL;
data.packet_type = PUBLISH;
Log(TRACE_MIN, -1, "Cache is full, messages have been removed.");
(*(first_publish->command.onFailure5))(first_publish->command.context, &data);
}
MQTTAsync_failureData5 data;

data.token = first_publish->command.token;
data.code = MQTTASYNC_MAX_BUFFERED_MESSAGES;
data.message = NULL;
data.packet_type = PUBLISH;
Log(TRACE_MIN, -1, "Calling connect failure for client %s, rc %d", command->client->c->clientID, data.code);
(*(first_publish->command.onFailure5))(first_publish->command.context, &data);
}
MQTTAsync_freeCommand(first_publish);
}
}
Expand Down
77 changes: 55 additions & 22 deletions test/test9.c
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,24 @@ void test10cOnConnect(void* context, MQTTAsync_successData* response)
test10cConnected = 1;
}

int test10onSendFailureCalled = 0;
int test10onSendSuccessCalled = 0;

void test10onSendFailure(void* context, MQTTAsync_failureData* response)
{
MQTTAsync c = (MQTTAsync)context;

MyLog(LOGA_INFO, "In send onFailure callback for client c rc %s", MQTTAsync_strerror(response->code));
test10onSendFailureCalled++;
}

void test10onSendSuccess(void* context, MQTTAsync_successData* response)
{
MQTTAsync c = (MQTTAsync)context;

MyLog(LOGA_DEBUG, "In send onSuccess callback for client c");
test10onSendSuccessCalled++;
}

int test10(struct Options options)
{
Expand All @@ -2523,6 +2541,8 @@ int test10(struct Options options)

test10Finished = 0;
failures = 0;
test10onSendFailureCalled = 0;
test10onSendSuccessCalled = 0;
MyLog(LOGA_INFO, "Starting Offline buffering 10 - delete oldest buffered messages first");
fprintf(xml, "<testcase classname=\"test9\" name=\"%s\"", testname);
global_start_time = start_clock();
Expand Down Expand Up @@ -2575,17 +2595,19 @@ int test10(struct Options options)
/* send some messages while disconnected */
for (i = 0; i < test10MessagesToSend; ++i)
{
char buf[50];

MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = i % 3;
sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
char buf[50];
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

opts.onSuccess = test10onSendSuccess;
opts.onFailure = test10onSendFailure;
pubmsg.qos = i % 3;
sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = (int) (strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
}

assert3PendingTokens(c);
Expand Down Expand Up @@ -2614,8 +2636,13 @@ int test10(struct Options options)
MySleep(100);

waitForNoPendingTokens(c);
MyLog(LOGA_INFO, "Send onSuccess %d onFailure %d", test10onSendSuccessCalled, test10onSendFailureCalled);
assert("test10onSendSuccessCalled should be 3", test10onSendSuccessCalled == 3, "test10onSendSuccessCalled was %d", test10onSendSuccessCalled);
assert("test10onSendFailureCalled should be 3", test10onSendFailureCalled == 3, "test10onSendFailureCalled was %d", test10onSendFailureCalled);

/* Now try the same thing, but force messages to be persisted and re-read */
test10onSendFailureCalled = 0;
test10onSendSuccessCalled = 0;

/* disconnect so we buffer some messages again */
rc = MQTTAsync_disconnect(c, NULL);
Expand All @@ -2624,17 +2651,19 @@ int test10(struct Options options)
/* send some messages while disconnected */
for (i = 0; i < test10MessagesToSend; ++i)
{
char buf[50];

MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pubmsg.qos = i % 3;
sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
char buf[50];
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

opts.onSuccess = test10onSendSuccess;
opts.onFailure = test10onSendFailure;
pubmsg.qos = i % 3;
sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
pubmsg.payload = buf;
pubmsg.payloadlen = (int) (strlen(pubmsg.payload) + 1);
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
}

assert3PendingTokens(c);
Expand Down Expand Up @@ -2675,6 +2704,10 @@ int test10(struct Options options)
rc = MQTTAsync_disconnect(d, NULL);
assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);

MyLog(LOGA_INFO, "Send onSuccess %d onFailure %d", test10onSendSuccessCalled, test10onSendFailureCalled);
assert("test10onSendSuccessCalled should be 0", test10onSendSuccessCalled == 0, "test10onSendSuccessCalled was %d", test10onSendSuccessCalled);
assert("test10onSendFailureCalled should be 6", test10onSendFailureCalled == 6, "test10onSendFailureCalled was %d", test10onSendFailureCalled);

exit:
MySleep(200);
MQTTAsync_destroy(&c);
Expand Down

0 comments on commit 25bca9d

Please sign in to comment.