Skip to content

Commit e7f041d

Browse files
author
Akira Saito
committed
Fix TCPNetworkModule to close SSLSocket.
- Remove unnecessary shutdownInput call in TCPNetworkModule#stop TCPNetworkModule#stop calls shutdownInput before calling close. The comment says it is for enabling SSL session resumption, but close is never called if the socket is SSLSocket as shutdownInput always throws exception. - Fix CommsReceiver to properly use `runningSemaphore` for stopping the receiver task In `run()` method, `runningSemaphore` is inproperly released on every while loop run. This change moves `runningSemaphore.release()` to outside of while loop. Signed-off-by: Akira Saito <saiaki@jp.ibm.com>
1 parent b84cad6 commit e7f041d

2 files changed

Lines changed: 75 additions & 75 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 75 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.ExecutorService;
2121
import java.util.concurrent.Future;
2222
import java.util.concurrent.Semaphore;
23+
import java.util.concurrent.TimeUnit;
2324

2425
import org.eclipse.paho.client.mqttv3.MqttException;
2526
import org.eclipse.paho.client.mqttv3.MqttToken;
@@ -39,7 +40,7 @@ public class CommsReceiver implements Runnable {
3940
private static final String CLASS_NAME = CommsReceiver.class.getName();
4041
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
4142

42-
private boolean running = false;
43+
private volatile boolean running = false;
4344
private Object lifecycle = new Object();
4445
private ClientState clientState = null;
4546
private ClientComms clientComms = null;
@@ -95,7 +96,22 @@ public void stop() {
9596
if (!Thread.currentThread().equals(recThread)) {
9697
try {
9798
// Wait for the thread to finish.
98-
runningSemaphore.acquire();
99+
// We need to set a certain timeout since there is no universal way
100+
// to wake up the worker thread from blocking read.
101+
// If network module is websocket, the worker thread returns from read with InterruptedIOException
102+
// in response to receiverFuture.cancel(true) above.
103+
//
104+
// If network module is tcp or ssl, however, InterruptedIOException is not thrown on read.
105+
// In normal cases, the worker thread returns from read with EOFException when
106+
// the connection is terminated from the broker side in response to DISCONNECT packet.
107+
// In this case, the SSL session is preserved and available for reuse in later connections.
108+
//
109+
// If stop is called because the broker connection is unexpectedly lost,
110+
// the worker thread will keep waiting on read, and tryAcquire below will return on timeout expiration.
111+
// The worker thread wakes up with SocketException when the socket is closed later.
112+
// In this case, the SSL session is invalidated by SSLSocket implementation.
113+
//
114+
runningSemaphore.tryAcquire(3000, TimeUnit.MILLISECONDS);
99115
}
100116
catch (InterruptedException ex) {
101117
} finally {
@@ -124,67 +140,70 @@ public void run() {
124140
running = false;
125141
return;
126142
}
127-
128-
while (running && (in != null)) {
129-
try {
130-
//@TRACE 852=network read message
131-
log.fine(CLASS_NAME,methodName,"852");
132-
receiving = in.available() > 0;
133-
MqttWireMessage message = in.readMqttWireMessage();
134-
receiving = false;
135-
136-
// instanceof checks if message is null
137-
if (message instanceof MqttAck) {
138-
token = tokenStore.getToken(message);
139-
if (token!=null) {
140-
synchronized (token) {
141-
// Ensure the notify processing is done under a lock on the token
142-
// This ensures that the send processing can complete before the
143-
// receive processing starts! ( request and ack and ack processing
144-
// can occur before request processing is complete if not!
145-
clientState.notifyReceivedAck((MqttAck)message);
143+
144+
try {
145+
while (running && (in != null)) {
146+
try {
147+
//@TRACE 852=network read message
148+
log.fine(CLASS_NAME,methodName,"852");
149+
receiving = in.available() > 0;
150+
MqttWireMessage message = in.readMqttWireMessage();
151+
receiving = false;
152+
153+
// instanceof checks if message is null
154+
if (message instanceof MqttAck) {
155+
token = tokenStore.getToken(message);
156+
if (token!=null) {
157+
synchronized (token) {
158+
// Ensure the notify processing is done under a lock on the token
159+
// This ensures that the send processing can complete before the
160+
// receive processing starts! ( request and ack and ack processing
161+
// can occur before request processing is complete if not!
162+
clientState.notifyReceivedAck((MqttAck)message);
163+
}
164+
} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
165+
//This is an ack for a message we no longer have a ticket for.
166+
//This probably means we already received this message and it's being send again
167+
//because of timeouts, crashes, disconnects, restarts etc.
168+
//It should be safe to ignore these unexpected messages.
169+
log.fine(CLASS_NAME, methodName, "857");
170+
} else {
171+
// It its an ack and there is no token then something is not right.
172+
// An ack should always have a token assoicated with it.
173+
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
146174
}
147-
} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
148-
//This is an ack for a message we no longer have a ticket for.
149-
//This probably means we already received this message and it's being send again
150-
//because of timeouts, crashes, disconnects, restarts etc.
151-
//It should be safe to ignore these unexpected messages.
152-
log.fine(CLASS_NAME, methodName, "857");
153175
} else {
154-
// It its an ack and there is no token then something is not right.
155-
// An ack should always have a token assoicated with it.
156-
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
176+
if (message != null) {
177+
// A new message has arrived
178+
clientState.notifyReceivedMsg(message);
179+
}
157180
}
158-
} else {
159-
if (message != null) {
160-
// A new message has arrived
161-
clientState.notifyReceivedMsg(message);
181+
}
182+
catch (MqttException ex) {
183+
//@TRACE 856=Stopping, MQttException
184+
log.fine(CLASS_NAME,methodName,"856",null,ex);
185+
running = false;
186+
// Token maybe null but that is handled in shutdown
187+
clientComms.shutdownConnection(token, ex);
188+
}
189+
catch (IOException ioe) {
190+
//@TRACE 853=Stopping due to IOException
191+
log.fine(CLASS_NAME,methodName,"853");
192+
193+
running = false;
194+
// An EOFException could be raised if the broker processes the
195+
// DISCONNECT and ends the socket before we complete. As such,
196+
// only shutdown the connection if we're not already shutting down.
197+
if (!clientComms.isDisconnecting()) {
198+
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
162199
}
163200
}
164-
}
165-
catch (MqttException ex) {
166-
//@TRACE 856=Stopping, MQttException
167-
log.fine(CLASS_NAME,methodName,"856",null,ex);
168-
running = false;
169-
// Token maybe null but that is handled in shutdown
170-
clientComms.shutdownConnection(token, ex);
171-
}
172-
catch (IOException ioe) {
173-
//@TRACE 853=Stopping due to IOException
174-
log.fine(CLASS_NAME,methodName,"853");
175-
176-
running = false;
177-
// An EOFException could be raised if the broker processes the
178-
// DISCONNECT and ends the socket before we complete. As such,
179-
// only shutdown the connection if we're not already shutting down.
180-
if (!clientComms.isDisconnecting()) {
181-
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
201+
finally {
202+
receiving = false;
182203
}
183204
}
184-
finally {
185-
receiving = false;
186-
runningSemaphore.release();
187-
}
205+
} finally {
206+
runningSemaphore.release();
188207
}
189208

190209
//@TRACE 854=<

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModule.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -102,25 +102,6 @@ public OutputStream getOutputStream() throws IOException {
102102
*/
103103
public void stop() throws IOException {
104104
if (socket != null) {
105-
// CDA: an attempt is made to stop the receiver cleanly before closing the socket.
106-
// If the socket is forcibly closed too early, the blocking socket read in
107-
// the receiver thread throws a SocketException.
108-
// While this causes the receiver thread to exit, it also invalidates the
109-
// SSL session preventing to perform an accelerated SSL handshake in the
110-
// next connection.
111-
//
112-
// Also note that due to the blocking socket reads in the receiver thread,
113-
// it's not possible to interrupt the thread. Using non blocking reads in
114-
// combination with a socket timeout (see setSoTimeout()) would be a better approach.
115-
//
116-
// Please note that the Javadoc only says that an EOF is returned on
117-
// subsequent reads of the socket stream.
118-
// Anyway, at least with Oracle Java SE 7 on Linux systems, this causes a blocked read
119-
// to return EOF immediately.
120-
// This workaround should not cause any harm in general but you might
121-
// want to move it in SSLNetworkModule.
122-
123-
socket.shutdownInput();
124105
socket.close();
125106
}
126107
}

0 commit comments

Comments
 (0)