Skip to content

Commit 4c28d24

Browse files
Ian CraggsGerrit Code Review @ Eclipse.org
authored andcommitted
Merge "In the WebSocket client code, a close WebSocket frame was not being sent when the Client was closing the connection causing long hangs. When the Mqtt Client tries to disconnect, the WebSocket client sends the correct opcode to close the connection. Signed-off-by: James Sutton <james.sutton@uk.ibm.com>" into develop
2 parents 3641c5f + e07b419 commit 4c28d24

3 files changed

Lines changed: 79 additions & 48 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketFrame.java

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class WebSocketFrame {
2727
private byte opcode;
2828
private boolean fin;
2929
private byte payload[];
30+
private boolean closeFlag = false;
3031

3132
public byte getOpcode() {
3233
return opcode;
@@ -39,6 +40,10 @@ public boolean isFin() {
3940
public byte[] getPayload() {
4041
return payload;
4142
}
43+
44+
public boolean isCloseFlag() {
45+
return closeFlag;
46+
}
4247

4348

4449
/**
@@ -127,57 +132,62 @@ private void setFinAndOpCode(byte incomingByte){
127132
public WebSocketFrame(InputStream input) throws IOException {
128133
byte firstByte = (byte) input.read();
129134
setFinAndOpCode(firstByte);
130-
if(this.opcode != 2){
131-
throw new IOException("Invalid Frame");
132-
}
133-
134-
byte maskLengthByte = (byte) input.read();
135-
boolean masked = ((maskLengthByte & 0x80) != 0);
136-
int payloadLength = (byte)(0x7F & maskLengthByte);
137-
int byteCount = 0;
138-
if(payloadLength == 0X7F){
139-
// 8 Byte Extended payload length
140-
byteCount = 8;
141-
} else if (payloadLength == 0X7E){
142-
// 2 bytes extended payload length
143-
byteCount = 2;
144-
}
145-
146-
// Decode the payload length
147-
if(byteCount > 0){
148-
payloadLength = 0;
149-
}
150-
while (--byteCount >= 0){
151-
maskLengthByte = (byte) input.read();
152-
payloadLength |= (maskLengthByte & 0xFF) << (8 * byteCount);
153-
}
154-
155-
// Get the masking key
156-
byte maskingKey[] = null;
157-
if(masked) {
158-
maskingKey = new byte[4];
159-
input.read(maskingKey,0,4);
160-
}
135+
if(this.opcode == 2){
136+
byte maskLengthByte = (byte) input.read();
137+
boolean masked = ((maskLengthByte & 0x80) != 0);
138+
int payloadLength = (byte)(0x7F & maskLengthByte);
139+
int byteCount = 0;
140+
if(payloadLength == 0X7F){
141+
// 8 Byte Extended payload length
142+
byteCount = 8;
143+
} else if (payloadLength == 0X7E){
144+
// 2 bytes extended payload length
145+
byteCount = 2;
146+
}
147+
148+
// Decode the payload length
149+
if(byteCount > 0){
150+
payloadLength = 0;
151+
}
152+
while (--byteCount >= 0){
153+
maskLengthByte = (byte) input.read();
154+
payloadLength |= (maskLengthByte & 0xFF) << (8 * byteCount);
155+
}
156+
157+
// Get the masking key
158+
byte maskingKey[] = null;
159+
if(masked) {
160+
maskingKey = new byte[4];
161+
input.read(maskingKey,0,4);
162+
}
161163

162-
this.payload = new byte[payloadLength];
163-
int offsetIndex = 0;
164-
int tempLength = payloadLength;
165-
int bytesRead = 0;
166-
while (offsetIndex != payloadLength){
167-
bytesRead = input.read(this.payload,offsetIndex,tempLength);
168-
offsetIndex += bytesRead;
169-
tempLength -= bytesRead;
164+
this.payload = new byte[payloadLength];
165+
int offsetIndex = 0;
166+
int tempLength = payloadLength;
167+
int bytesRead = 0;
168+
while (offsetIndex != payloadLength){
169+
bytesRead = input.read(this.payload,offsetIndex,tempLength);
170+
offsetIndex += bytesRead;
171+
tempLength -= bytesRead;
172+
}
173+
174+
175+
// Demask if needed
176+
if(masked)
177+
{
178+
for(int i = 0; i < this.payload.length; i++){
179+
this.payload[i] ^= maskingKey[i % 4];
180+
}
181+
}
182+
return;
183+
} else if(this.opcode == 8){
184+
// Closing connection with server
185+
closeFlag = true;
186+
} else {
187+
throw new IOException("Invalid Frame: Opcode: " +this.opcode);
170188
}
171189

172190

173-
// Demask if needed
174-
if(masked)
175-
{
176-
for(int i = 0; i < this.payload.length; i++){
177-
this.payload[i] ^= maskingKey[i % 4];
178-
}
179-
}
180-
return;
181191
}
182192

183193

@@ -288,6 +298,9 @@ public static byte[] generateMaskingKey(){
288298
int d = randomGenerator.nextInt(255);
289299
return new byte[] {(byte) a,(byte) b,(byte) c,(byte) d};
290300
}
301+
302+
303+
291304

292305

293306
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.InputStream;
2121
import java.io.OutputStream;
2222
import java.io.PipedInputStream;
23+
import java.net.SocketException;
2324
import java.nio.ByteBuffer;
2425

2526
import javax.net.SocketFactory;
@@ -98,6 +99,12 @@ public OutputStream getOutputStream() throws IOException {
9899
* Stops the module, by closing the TCP socket.
99100
*/
100101
public void stop() throws IOException {
102+
// Creating Close Frame
103+
WebSocketFrame frame = new WebSocketFrame((byte)0x08, true, "1000".getBytes());
104+
byte[] rawFrame = frame.encodeFrame();
105+
getSocketOutputStream().write(rawFrame);
106+
getSocketOutputStream().flush();
107+
101108
if(webSocketReceiver != null){
102109
webSocketReceiver.stop();
103110
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketReceiver.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class WebSocketReceiver implements Runnable{
2929
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3030

3131
private boolean running = false;
32+
private boolean stopping = false;
3233
private Object lifecycle = new Object();
3334
private InputStream input;
3435
private Thread receiverThread = null;
@@ -63,6 +64,7 @@ public void start(String threadName){
6364
*/
6465
public void stop() {
6566
final String methodName = "stop";
67+
stopping = true;
6668
synchronized (lifecycle) {
6769
//@TRACE 850=stopping
6870
log.fine(CLASS_NAME,methodName, "850");
@@ -94,18 +96,27 @@ public void run() {
9496
log.fine(CLASS_NAME, methodName, "852");
9597
receiving = input.available() > 0;
9698
WebSocketFrame incomingFrame = new WebSocketFrame(input);
99+
if(!incomingFrame.isCloseFlag()){
97100
for(int i = 0; i < incomingFrame.getPayload().length; i++){
98101
pipedOutputStream.write(incomingFrame.getPayload()[i]);
99102
}
103+
100104
pipedOutputStream.flush();
105+
} else {
106+
if(!stopping){
107+
throw new IOException("Server sent a WebSocket Frame with the Stop OpCode");
108+
}
109+
}
110+
101111
receiving = false;
102112

103113
} catch (IOException ex) {
104114
// Exception occurred whilst reading the stream.
115+
System.out.println("WebSocketReceiver.java : run(): Exception Occured.");
116+
ex.printStackTrace();
105117
closeOutputStream();
106118
}
107119
}
108-
109120
}
110121

111122
private void closeOutputStream(){

0 commit comments

Comments
 (0)