Skip to content

Commit 43cc7b1

Browse files
committed
Issue #582 - Possible fix in v5 client by setting inflight window to 4 less, aim to dynamically size window based on QoS in future.
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 01d9e34 commit 43cc7b1

5 files changed

Lines changed: 45 additions & 35 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/ConnectionManipulationProxyServer.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@
77
import java.net.Socket;
88
import java.util.logging.Logger;
99

10-
import org.eclipse.paho.client.mqttv3.test.automaticReconnect.AutomaticReconnectTest;
11-
1210
public class ConnectionManipulationProxyServer implements Runnable {
13-
static final Class<?> cclass = AutomaticReconnectTest.class;
11+
static final Class<?> cclass = ConnectionManipulationProxyServer.class;
1412
private static final String className = cclass.getName();
1513
private static final Logger log = Logger.getLogger(className);
1614
private int localPort;
@@ -34,7 +32,7 @@ public ConnectionManipulationProxyServer(String host, int remotePort, int localP
3432
}
3533

3634
public void startProxy(){
37-
log.info("[CPMS Proxy] - Starting Proxy");
35+
log.info("[CMPS Proxy] - Starting Proxy");
3836
synchronized (enableLock) {
3937
enableProxy = true;
4038
}
@@ -43,7 +41,7 @@ public void startProxy(){
4341
}
4442

4543
public void enableProxy(){
46-
log.info("[CPMS Proxy] - Enabling Proxy");
44+
log.info("[CMPS Proxy] - Enabling Proxy");
4745
synchronized (enableLock) {
4846
enableProxy = true;
4947
}
@@ -54,7 +52,7 @@ public void enableProxy(){
5452
}
5553

5654
public void disableProxy(){
57-
log.info("[CPMS Proxy] - Disabling Proxy");
55+
log.info("[CMPS Proxy] - Disabling Proxy");
5856
synchronized (enableLock) {
5957
enableProxy = false;
6058
}
@@ -69,7 +67,7 @@ public void disableProxy(){
6967
}
7068

7169
public void stopProxy(){
72-
log.info("[CPMS Proxy] - Stopping Proxy");
70+
log.info("[CMPS Proxy] - Stopping Proxy");
7371
synchronized (enableLock) {
7472
enableProxy = false;
7573
}
@@ -78,7 +76,7 @@ public void stopProxy(){
7876
}
7977

8078
private void killOpenSockets(){
81-
log.info("[CPMS Proxy] - killOpenSockets Called.");
79+
log.info("[CMPS Proxy] - killOpenSockets Called.");
8280
try {
8381
if(serverSocket != null){
8482
serverSocket.close();
@@ -96,7 +94,7 @@ private void killOpenSockets(){
9694

9795
@Override
9896
public void run() {
99-
log.info("[CPMS Proxy] - Proxy Thread running.");
97+
log.info("[CMPS Proxy] - Proxy Thread running.");
10098
try {
10199

102100
final byte[] request = new byte[1024];
@@ -122,14 +120,14 @@ public void run() {
122120

123121

124122

125-
log.info("[CPMS Proxy] - Waiting for incoming connection..");
123+
log.info("[CMPS Proxy] - Waiting for incoming connection..");
126124

127125
try {
128126
// Wait for a connection on the local Port
129127
client = serverSocket.accept();
130128

131129

132-
log.info("[CPMS Proxy] - Client Opened Connection to Proxy...");
130+
log.info("[CMPS Proxy] - Client Opened Connection to Proxy...");
133131

134132
final InputStream streamFromClient = client.getInputStream();
135133
final OutputStream streamToClient = client.getOutputStream();
@@ -157,7 +155,7 @@ public void run() {
157155
streamToServer.flush();
158156
}
159157
} catch (IOException ex){
160-
log.warning("[CPMS Proxy] - IOException in client to server stream: " + ex.getMessage());
158+
log.warning("[CMPS Proxy] - IOException in client to server stream: " + ex.getMessage());
161159
try {
162160
client.close();
163161
server.close();
@@ -180,8 +178,8 @@ public void run() {
180178
}
181179

182180
} catch (IOException ex){
183-
log.warning("[CPMS Proxy] - IOException in server to client stream: " + ex.getMessage());
184-
log.info("[CPMS Proxy] - ");
181+
log.warning("[CMPS Proxy] - IOException in server to client stream: " + ex.getMessage());
182+
log.info("[CMPS Proxy] - ");
185183
client.close();
186184
server.close();
187185
}
@@ -190,7 +188,7 @@ public void run() {
190188

191189

192190
} catch (IOException ex) {
193-
log.warning("[CPMS Proxy] - General IO Exception caught in main Thread: " + ex.getMessage());
191+
log.warning("[CMPS Proxy] - General IO Exception caught in main Thread: " + ex.getMessage());
194192
break;
195193
} finally {
196194
try {
@@ -201,23 +199,23 @@ public void run() {
201199
client.close();
202200
}
203201
} catch(IOException ex) {
204-
log.warning("[CPMS Proxy] - IOException caught whilst closing proxy connection.: " + ex.getMessage());
202+
log.warning("[CMPS Proxy] - IOException caught whilst closing proxy connection.: " + ex.getMessage());
205203
}
206204
}
207205

208206

209207

210208
}
211209
}
212-
log.info("[CPMS Proxy] - Proxy Thread finishing..");
210+
log.info("[CMPS Proxy] - Proxy Thread finishing..");
213211

214212
if(!serverSocket.isClosed()){
215213
serverSocket.close();
216214
}
217-
log.info("[CPMS Proxy] - Server Socket Closed, returning...");
215+
log.info("[CMPS Proxy] - Server Socket Closed, returning...");
218216

219217
} catch(IOException ex) {
220-
log.warning("[CPMS Proxy] - Thread Connection lost: " + ex.getMessage());
218+
log.warning("[CMPS Proxy] - Thread Connection lost: " + ex.getMessage());
221219
ex.printStackTrace();
222220
}
223221

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -946,11 +946,11 @@ class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback
946946

947947
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
948948
if (isConnected()) {
949-
while (clientState.getActualInFlight() >= (mqttConnection.getReceiveMaximum() - 1)) {
949+
//int qos = ((MqttPublish) bufferedMessage.getMessage()).getQos();
950+
while (clientState.getActualInFlight() >= (mqttConnection.getReceiveMaximum() - 4)) {
950951
// We need to Yield to the other threads to allow the in flight messages to
951952
// clear
952953
Thread.yield();
953-
954954
}
955955
// @TRACE 510=Publising Buffered message message={0}
956956
log.fine(CLASS_NAME, methodName, "510", new Object[] { bufferedMessage.getMessage().getKey() });

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/automaticReconnect/OfflineBufferingTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ public void testManyMessageBufferAndDeliver() throws Exception {
165165
DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
166166
disconnectedOpts.setBufferEnabled(true);
167167
client.setBufferOpts(disconnectedOpts);
168-
168+
169169
// Create subscription client that won't be affected by proxy
170-
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, methodName + "sub-client");
170+
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, methodName + "sub-client", new TestMemoryPersistence());
171171
MqttV5Receiver mqttV3Receiver = new MqttV5Receiver(clientId, LoggingUtilities.getPrintStream());
172172
subClient.setCallback(mqttV3Receiver);
173173
IMqttToken subConnectToken = subClient.connect();
@@ -438,7 +438,7 @@ public void testUnPersistBufferedMessagesOnNewClient() throws Exception {
438438
// Create Subscription client to watch for the message being published
439439
// as soon as the main client connects
440440
log.info("Creating subscription client");
441-
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, clientId);
441+
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, clientId, new TestMemoryPersistence());
442442
MqttV5Receiver mqttV3Receiver = new MqttV5Receiver(clientId, LoggingUtilities.getPrintStream());
443443
subClient.setCallback(mqttV3Receiver);
444444
IMqttToken subConnectToken = subClient.connect();

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/utilities/ConnectionManipulationProxyServer.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public ConnectionManipulationProxyServer(String host, int remotePort, int localP
3232
}
3333

3434
public void startProxy(){
35+
log.info("[CMPS Proxy] - Starting Proxy");
3536
synchronized (enableLock) {
3637
enableProxy = true;
3738
}
@@ -40,6 +41,7 @@ public void startProxy(){
4041
}
4142

4243
public void enableProxy(){
44+
log.info("[CMPS Proxy] - Enabling Proxy");
4345
synchronized (enableLock) {
4446
enableProxy = true;
4547
}
@@ -50,6 +52,7 @@ public void enableProxy(){
5052
}
5153

5254
public void disableProxy(){
55+
log.info("[CMPS Proxy] - Disabling Proxy");
5356
synchronized (enableLock) {
5457
enableProxy = false;
5558
}
@@ -64,6 +67,7 @@ public void disableProxy(){
6467
}
6568

6669
public void stopProxy(){
70+
log.info("[CMPS Proxy] - Stopping Proxy");
6771
synchronized (enableLock) {
6872
enableProxy = false;
6973
}
@@ -72,6 +76,7 @@ public void stopProxy(){
7276
}
7377

7478
private void killOpenSockets(){
79+
log.info("[CMPS Proxy] - killOpenSockets Called.");
7580
try {
7681
if(serverSocket != null){
7782
serverSocket.close();
@@ -89,6 +94,7 @@ private void killOpenSockets(){
8994

9095
@Override
9196
public void run() {
97+
log.info("[CMPS Proxy] - Proxy Thread running.");
9298
try {
9399

94100
final byte[] request = new byte[1024];
@@ -114,13 +120,14 @@ public void run() {
114120

115121

116122

117-
//log.fine("Proxy: Waiting for incoming connection");
123+
log.info("[CMPS Proxy] - Waiting for incoming connection..");
118124

119125
try {
120126
// Wait for a connection on the local Port
121127
client = serverSocket.accept();
122128

123-
log.fine("Proxy: Client Opened Connection to Proxy...");
129+
130+
log.info("[CMPS Proxy] - Client Opened Connection to Proxy...");
124131

125132
final InputStream streamFromClient = client.getInputStream();
126133
final OutputStream streamToClient = client.getOutputStream();
@@ -133,7 +140,7 @@ public void run() {
133140
client.close();
134141
continue;
135142
}
136-
log.fine("Proxy: Proxy Connected to Server");
143+
log.info("Proxy: Proxy Connected to Server");
137144

138145
// Get Server Streams
139146
final InputStream streamFromServer = server.getInputStream();
@@ -148,7 +155,7 @@ public void run() {
148155
streamToServer.flush();
149156
}
150157
} catch (IOException ex){
151-
//log.warning("Proxy: 1 Connection lost: " + ex.getMessage());
158+
log.warning("[CMPS Proxy] - IOException in client to server stream: " + ex.getMessage());
152159
try {
153160
client.close();
154161
server.close();
@@ -171,7 +178,8 @@ public void run() {
171178
}
172179

173180
} catch (IOException ex){
174-
//log.warning("Proxy: 2 Connection lost: " + ex.getMessage());
181+
log.warning("[CMPS Proxy] - IOException in server to client stream: " + ex.getMessage());
182+
log.info("[CMPS Proxy] - ");
175183
client.close();
176184
server.close();
177185
}
@@ -180,8 +188,7 @@ public void run() {
180188

181189

182190
} catch (IOException ex) {
183-
//log.warning("Proxy: 3 Connection lost: " + ex.getMessage());
184-
//ex.printStackTrace();
191+
log.warning("[CMPS Proxy] - General IO Exception caught in main Thread: " + ex.getMessage());
185192
break;
186193
} finally {
187194
try {
@@ -192,22 +199,23 @@ public void run() {
192199
client.close();
193200
}
194201
} catch(IOException ex) {
195-
//log.warning("Proxy: 4 Connection lost: " + ex.getMessage());
202+
log.warning("[CMPS Proxy] - IOException caught whilst closing proxy connection.: " + ex.getMessage());
196203
}
197204
}
198205

199206

200207

201208
}
202209
}
203-
log.fine("Proxy: Proxy Thread finishing..");
210+
log.info("[CMPS Proxy] - Proxy Thread finishing..");
211+
204212
if(!serverSocket.isClosed()){
205213
serverSocket.close();
206214
}
207-
log.fine("Proxy: Server Socket Closed, returning...");
215+
log.info("[CMPS Proxy] - Server Socket Closed, returning...");
208216

209217
} catch(IOException ex) {
210-
log.warning("Proxy: 5 Thread Connection lost: " + ex.getMessage());
218+
log.warning("[CMPS Proxy] - Thread Connection lost: " + ex.getMessage());
211219
ex.printStackTrace();
212220
}
213221

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPublish.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ public void setMessage(MqttMessage message) {
184184
public String getTopicName() {
185185
return topicName;
186186
}
187+
188+
public int getQoS() {
189+
return qos;
190+
}
187191

188192
public void setTopicName(String topicName) {
189193
this.topicName = topicName;

0 commit comments

Comments
 (0)