Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change isConnected flag only in handleClose #113

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MqttClientOptions extends NetClientOptions {
public static final boolean DEFAULT_WILL_FLAG = false;
public static final boolean DEFAULT_WILL_RETAIN = false;
public static final int DEFAULT_MAX_MESSAGE_SIZE = -1;
public static final int DEFAULT_CONNACK_TIMEOUT_SECONDS = 10;

private String clientId;
private String username;
Expand All @@ -59,6 +60,7 @@ public class MqttClientOptions extends NetClientOptions {
private boolean isAutoGeneratedClientId = true;
private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private int connackTimeoutSeconds = DEFAULT_CONNACK_TIMEOUT_SECONDS;

/**
* Default constructor
Expand Down Expand Up @@ -184,6 +186,23 @@ public String getWillMessage() {
return willMessage;
}

/**
* @return CONNACK packet receive timeout
*/
public int getConnackTimeoutSeconds() {
return connackTimeoutSeconds;
}

/**
* Set the CONNACK packet receive timeout
*
* @param connackTimeoutSeconds CONNACK packet receive timeout
*/
public MqttClientOptions setConnackTimeoutSeconds(int connackTimeoutSeconds) {
this.connackTimeoutSeconds = connackTimeoutSeconds;
return this;
}

/**
* Set the client identifier
*
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class MqttClientImpl implements MqttClient {
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final String CONNACK_IDLE_HANDLER_NAME = "CONNACK_IDLE_HANDLER";
private static final String CONNACK_TIMEOUT_HANDLER_NAME = "CONNACK_TIMEOUT_HANDLER";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;

Expand Down Expand Up @@ -650,6 +652,23 @@ private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}

pipeline.addBefore("handler", CONNACK_IDLE_HANDLER_NAME, new IdleStateHandler(0, 0, options.getConnackTimeoutSeconds()));
pipeline.addBefore("handler", CONNACK_TIMEOUT_HANDLER_NAME, new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
connectHandler().handle(
Future.failedFuture(String.format("Connection timeout. Have not received CONNACK in %d seconds",
options.getConnackTimeoutSeconds()))
);
}
}
}
});

if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {

Expand Down Expand Up @@ -960,6 +979,10 @@ private void handlePubrel(int pubrelMessageId) {
* @param msg connection response message
*/
private void handleConnack(MqttConnAckMessage msg) {
ChannelPipeline pipeline = connection.channelHandlerContext().pipeline();

pipeline.remove(CONNACK_TIMEOUT_HANDLER_NAME);
pipeline.remove(CONNACK_IDLE_HANDLER_NAME);

synchronized (this) {
this.isConnected = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.vertx.mqtt.test.client;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import org.junit.Test;
import org.junit.runner.RunWith;

/**
* MQTT client testing about behaviour of the client when server don't response with CONNACK.
*/
@RunWith(VertxUnitRunner.class)
public class MqttClientNoConnectPacketTest {

private final static Integer REPRODUCER_SERVER_PORT = 1884;
private final static String REPRODUCER_SERVER_HOST = "0.0.0.0";

@Test
public void connectionTimeoutIfNoPacketSent(TestContext context) {
int connackTimeoutSeconds = 2;
int serverResponseInSeconds = 2;
Vertx vertx = Vertx.vertx();
Async async = context.async(1);
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(mqttEndpoint -> {
vertx.setTimer(serverResponseInSeconds * 1000, action -> mqttEndpoint.close());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see a reason to set a timer that closes the endpoint.
It is sufficient to do nothing to make the test pass.

Keeping this action could lead to the conclusion that your waiting for the TCP-Connection to be closed, which is not what is intended

}).listen(REPRODUCER_SERVER_PORT);
MqttClient mqttClient = MqttClient.create(vertx, new MqttClientOptions().setConnackTimeoutSeconds(connackTimeoutSeconds));
mqttClient.connect(REPRODUCER_SERVER_PORT, REPRODUCER_SERVER_HOST, connected -> {
if (connected.succeeded()) {
context.fail("Connect handles should not succeed without receiving ack");
} else {
async.countDown();
}
});
async.await((connackTimeoutSeconds + 2) * 1_000);
}
}