Skip to content

Commit

Permalink
test client against CONNACK timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Sammers21 committed Dec 3, 2018
1 parent 2d51276 commit 8c7e60a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 26 deletions.
27 changes: 20 additions & 7 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.core.net.*;

/**
* Represents options used by the MQTT client.
Expand All @@ -44,6 +38,7 @@ public class MqttClientOptions extends NetClientOptions {
public static final boolean DEFAULT_WILL_FLAG = false;
public static final boolean DEFAULT_WILL_RETAIN = false;
public static final int DEFAULT_MAX_MESSAGE_SIZE = -1;
public static final int DEFAULT_CONNACK_TIMEOUT_SECONDS = 10;

private String clientId;
private String username;
Expand All @@ -59,6 +54,7 @@ public class MqttClientOptions extends NetClientOptions {
private boolean isAutoGeneratedClientId = true;
private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private int connackTimeoutSeconds = DEFAULT_CONNACK_TIMEOUT_SECONDS;

/**
* Default constructor
Expand Down Expand Up @@ -184,6 +180,23 @@ public String getWillMessage() {
return willMessage;
}

/**
* @return CONNACK packet receive timeout
*/
public int getConnackTimeoutSeconds() {
return connackTimeoutSeconds;
}

/**
* Set the CONNACK packet receive timeout
*
* @param connackTimeoutSeconds CONNACK packet receive timeout
*/
public MqttClientOptions setConnackTimeoutSeconds(int connackTimeoutSeconds) {
this.connackTimeoutSeconds = connackTimeoutSeconds;
return this;
}

/**
* Set the client identifier
*
Expand Down
44 changes: 25 additions & 19 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
Expand All @@ -58,11 +45,7 @@

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -84,6 +67,8 @@ public class MqttClientImpl implements MqttClient {
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final String CONNACK_IDLE_HANDLER_NAME = "CONNACK_IDLE_HANDLER";
private static final String CONNACK_TIMEOUT_HANDLER_NAME = "CONNACK_TIMEOUT_HANDLER";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;

Expand Down Expand Up @@ -650,6 +635,23 @@ private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}

pipeline.addBefore("handler", CONNACK_IDLE_HANDLER_NAME, new IdleStateHandler(0, 0, options.getConnackTimeoutSeconds()));
pipeline.addBefore("handler", CONNACK_TIMEOUT_HANDLER_NAME, new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
connectHandler().handle(
Future.failedFuture(String.format("Connection timeout. Have not received CONNACK in %d seconds",
options.getConnackTimeoutSeconds()))
);
}
}
}
});

if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {

Expand Down Expand Up @@ -960,6 +962,10 @@ private void handlePubrel(int pubrelMessageId) {
* @param msg connection response message
*/
private void handleConnack(MqttConnAckMessage msg) {
ChannelPipeline pipeline = connection.channelHandlerContext().pipeline();

pipeline.remove(CONNACK_TIMEOUT_HANDLER_NAME);
pipeline.remove(CONNACK_IDLE_HANDLER_NAME);

synchronized (this) {
this.isConnected = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.vertx.mqtt.test.client;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import org.junit.Test;
import org.junit.runner.RunWith;

/**
* MQTT client testing about behaviour of the client when server don't response with CONNACK.
*/
@RunWith(VertxUnitRunner.class)
public class MqttClientNoConnectPacketTest {

private final static Integer REPRODUCER_SERVER_PORT = 1884;
private final static String REPRODUCER_SERVER_HOST = "0.0.0.0";

@Test
public void connectionTimeoutIfNoPacketSent(TestContext context) {
int connackTimeoutSeconds = 2;
int serverResponseInSeconds = 2;
Vertx vertx = Vertx.vertx();
Async async = context.async(1);
MqttServer mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(mqttEndpoint -> {
vertx.setTimer(serverResponseInSeconds * 1000, action -> mqttEndpoint.close());
}).listen(REPRODUCER_SERVER_PORT);
MqttClient mqttClient = MqttClient.create(vertx, new MqttClientOptions().setConnackTimeoutSeconds(connackTimeoutSeconds));
mqttClient.connect(REPRODUCER_SERVER_PORT, REPRODUCER_SERVER_HOST, connected -> {
if (connected.succeeded()) {
context.fail("Connect handles should not succeed without receiving ack");
} else {
async.countDown();
}
});
async.await((connackTimeoutSeconds + 2) * 1_000);
}
}

0 comments on commit 8c7e60a

Please sign in to comment.