Skip to content

Commit

Permalink
add readTimeout option - fixes #109
Browse files Browse the repository at this point in the history
  • Loading branch information
Sammers21 committed Nov 14, 2018
1 parent 2d51276 commit 9e65fe2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
20 changes: 20 additions & 0 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MqttClientOptions extends NetClientOptions {
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_WILL_QOS = 0;
public static final int DEFAULT_KEEP_ALIVE_TIME_SECONDS = 30;
public static final int DEFAULT_READ_TIMEOUT = 0;
public static final int DEFAULT_MAX_INFLIGHT_QUEUE = 10;
public static final boolean DEFAULT_CLEAN_SESSION = true;
public static final boolean DEFAULT_WILL_FLAG = false;
Expand All @@ -55,6 +56,7 @@ public class MqttClientOptions extends NetClientOptions {
private int willQoS = DEFAULT_WILL_QOS;
private boolean willRetain = DEFAULT_WILL_RETAIN;
private int keepAliveTimeSeconds = DEFAULT_KEEP_ALIVE_TIME_SECONDS;
private int readTimeout = DEFAULT_READ_TIMEOUT;
private boolean isAutoKeepAlive = true;
private boolean isAutoGeneratedClientId = true;
private int maxInflightQueue = DEFAULT_MAX_INFLIGHT_QUEUE;
Expand Down Expand Up @@ -335,6 +337,24 @@ public MqttClientOptions setAutoGeneratedClientId(boolean isAutoGeneratedClientI
return this;
}

/**
* @return the period of time in millis, If no read was performed for the period,
* the connection is going to be closed. {@code 0} means that the feature is disabled
*/
public int getReadTimeout() {
return readTimeout;
}

/**
* Set the period of time in millis, If no read was performed for the period, the connection is going to be closed.
* Set to {@code 0} to disable.
*
* @param readTimeout read timeout
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

/**
* @return if the PINGREQ is handled automatically
*/
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ private void initChannel(ChannelPipeline pipeline) {
this.options.getKeepAliveTimeSeconds() != 0) {

pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
new IdleStateHandler(this.options.getReadTimeout(), this.options.getKeepAliveTimeSeconds(), 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

@Override
Expand All @@ -664,6 +664,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
ping();
} else if (e.state() == IdleState.READER_IDLE) {
// implicitly triggers #handleClosed
ctx.close();
}
}
}
Expand Down

0 comments on commit 9e65fe2

Please sign in to comment.