Skip to content

Commit f78775a

Browse files
committed
Issue #530 - Migrating from old maxInFlight Option to using the server assigned receive maximum.
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent e4ff13c commit f78775a

5 files changed

Lines changed: 7 additions & 60 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttConnectionOptions.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ enum UriType {
6565
private int automaticReconnectMaxDelay = 120; // Max time to wait for automatic reconnection attempts in seconds.
6666
private boolean useSubscriptionIdentifiers = true; // Whether to automatically assign subscription identifiers.
6767
private int keepAliveInterval = 60; // Keep Alive Interval
68-
private int maxInflight = 10; // Max inflight messages
6968
private int connectionTimeout = 30; // Connection timeout in seconds
7069
private boolean httpsHostnameVerificationEnabled = true;
7170
private int maxReconnectDelay = 128000;
@@ -281,34 +280,6 @@ public void setKeepAliveInterval(int keepAliveInterval){
281280
this.keepAliveInterval = keepAliveInterval;
282281
}
283282

284-
/**
285-
* Returns the "max inflight". The max inflight limits to how many messages we
286-
* can send without receiving acknowledgments.
287-
*
288-
* @see #setMaxInflight(int)
289-
* @return the max inflight
290-
*/
291-
public int getMaxInflight() {
292-
return maxInflight;
293-
}
294-
295-
/**
296-
* Sets the "max inflight". please increase this value in a high traffic
297-
* environment.
298-
* <p>
299-
* The default value is 10
300-
* </p>
301-
*
302-
* @param maxInflight
303-
* the number of maxInfligt messages
304-
*/
305-
public void setMaxInflight(int maxInflight) {
306-
if (maxInflight < 0) {
307-
throw new IllegalArgumentException();
308-
}
309-
this.maxInflight = maxInflight;
310-
}
311-
312283
/**
313284
* Returns the connection timeout value.
314285
*

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttConnectionOptionsBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,6 @@ public MqttConnectionOptionsBuilder keepAliveInterval(int keepAlive) {
3636
mqttConnectionOptions.setKeepAliveInterval(keepAlive);
3737
return this;
3838
}
39-
40-
public MqttConnectionOptionsBuilder maxInFlight(int maxInflight) {
41-
mqttConnectionOptions.setMaxInflight(maxInflight);
42-
return this;
43-
}
4439

4540
public MqttConnectionOptionsBuilder connectionTimeout(int connectionTimeout) {
4641
mqttConnectionOptions.setConnectionTimeout(connectionTimeout);

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,6 @@ public void connect(MqttConnectionOptions options, MqttToken token) throws MqttE
341341

342342
this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
343343
this.clientState.setCleanSession(conOptions.isCleanSession());
344-
this.clientState.setMaxInflight(conOptions.getMaxInflight());
345344

346345
tokenStore.open();
347346
ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
@@ -946,7 +945,7 @@ class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback
946945

947946
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
948947
if (isConnected()) {
949-
while (clientState.getActualInFlight() >= (clientState.getMaxInFlight() - 1)) {
948+
while (clientState.getActualInFlight() >= (mqttSession.getReceiveMaximum() - 1)) {
950949
// We need to Yield to the other threads to allow the in flight messages to
951950
// clear
952951
Thread.yield();

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ public class ClientState implements MqttState {
125125
private long keepAlive;
126126
private boolean cleanSession;
127127
private MqttClientPersistence persistence;
128-
129-
private int maxInflight = 0;
128+
130129
private int actualInFlight = 0;
131130
private int inFlightPubRels = 0;
132131

@@ -185,11 +184,6 @@ protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenSt
185184
restoreState();
186185
}
187186

188-
protected void setMaxInflight(int maxInflight) {
189-
this.maxInflight = maxInflight;
190-
pendingMessages = new Vector<MqttWireMessage>(this.maxInflight);
191-
}
192-
193187
protected void setKeepAliveSecs(long keepAliveSecs) {
194188
this.keepAlive = keepAliveSecs * 1000;
195189
}
@@ -469,7 +463,7 @@ protected void restoreState() throws MqttException {
469463

470464
private void restoreInflightMessages() {
471465
final String methodName = "restoreInflightMessages";
472-
pendingMessages = new Vector<MqttWireMessage>(this.maxInflight);
466+
pendingMessages = new Vector<MqttWireMessage>(this.mqttSession.getReceiveMaximum());
473467
pendingFlows = new Vector<MqttWireMessage>();
474468

475469
Enumeration<Integer> keys = outboundQoS2.keys();
@@ -560,7 +554,7 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
560554

561555
if (message instanceof MqttPublish) {
562556
synchronized (queueLock) {
563-
if (actualInFlight >= this.maxInflight) {
557+
if (actualInFlight >= this.mqttSession.getReceiveMaximum()) {
564558
// @TRACE 613= sending {0} msgs at max inflight window
565559
log.fine(CLASS_NAME, methodName, "613", new Object[] { Integer.valueOf(actualInFlight) });
566560

@@ -846,7 +840,7 @@ protected MqttWireMessage get() throws MqttException {
846840
// freed.
847841
// In both cases queueLock will be notified.
848842
if ((pendingMessages.isEmpty() && pendingFlows.isEmpty())
849-
|| (pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {
843+
|| (pendingFlows.isEmpty() && actualInFlight >= this.mqttSession.getReceiveMaximum())) {
850844
try {
851845
// @TRACE 644=wait for new work or for space in the inflight window
852846
log.fine(CLASS_NAME, methodName, "644");
@@ -892,7 +886,7 @@ protected MqttWireMessage get() throws MqttException {
892886

893887
// If the inflight window is full then messages are not
894888
// processed until the inflight window has space.
895-
if (actualInFlight < this.maxInflight) {
889+
if (actualInFlight < this.mqttSession.getReceiveMaximum()) {
896890
// The in flight window is not full so process the
897891
// first message in the queue
898892
result = (MqttWireMessage) pendingMessages.elementAt(0);
@@ -1526,16 +1520,6 @@ public int getActualInFlight() {
15261520
return actualInFlight;
15271521
}
15281522

1529-
/*
1530-
* (non-Javadoc)
1531-
*
1532-
* @see org.eclipse.paho.mqttv5.client.internal.MqttState#getMaxInFlight()
1533-
*/
1534-
@Override
1535-
public int getMaxInFlight() {
1536-
return maxInflight;
1537-
}
1538-
15391523
/**
15401524
* Tidy up - ensure that tokens are released as they are maintained over a
15411525
* disconnect / connect cycle.
@@ -1576,7 +1560,7 @@ public Properties getDebug() {
15761560
props.put("In use msgids", inUseMsgIds);
15771561
props.put("pendingMessages", pendingMessages);
15781562
props.put("pendingFlows", pendingFlows);
1579-
props.put("maxInflight", Integer.valueOf(maxInflight));
1563+
props.put("serverReceiveMaximum", Integer.valueOf(this.mqttSession.getReceiveMaximum()));
15801564
props.put("nextMsgID", Integer.valueOf(nextMsgId));
15811565
props.put("actualInFlight", Integer.valueOf(actualInFlight));
15821566
props.put("inFlightPubRels", Integer.valueOf(inFlightPubRels));

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttState.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ public interface MqttState {
107107

108108
int getActualInFlight();
109109

110-
int getMaxInFlight();
111-
112110
Properties getDebug();
113111

114112
}

0 commit comments

Comments
 (0)