Skip to content

Commit 9c9b344

Browse files
committed
Merge pull request #192 from johnrotach/develop
Allow WebSocket query parameters. Disconnect fix.
2 parents f3dcdac + 9e8872b commit 9c9b344

6 files changed

Lines changed: 102 additions & 56 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ else if ((factory instanceof SSLSocketFactory) == false) {
430430
else if (factory instanceof SSLSocketFactory) {
431431
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
432432
}
433-
netModule = new WebSocketNetworkModule(factory, host, port, clientId);
433+
netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
434434
((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
435435
break;
436436
case MqttConnectOptions.URI_TYPE_WSS:
@@ -451,7 +451,7 @@ else if ((factory instanceof SSLSocketFactory) == false) {
451451
}
452452

453453
// Create the network module...
454-
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, host, port, clientId);
454+
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
455455
((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
456456
// Ciphers suites need to be set, if they are available
457457
if (wSSFactoryFactory != null) {
@@ -478,18 +478,24 @@ private int getPort(String uri, int defaultPort) {
478478
port = defaultPort;
479479
}
480480
else {
481-
port = Integer.parseInt(uri.substring(portIndex + 1));
481+
int slashIndex = uri.indexOf('/');
482+
if (slashIndex == -1) {
483+
slashIndex = uri.length();
484+
}
485+
port = Integer.parseInt(uri.substring(portIndex + 1, slashIndex));
482486
}
483487
return port;
484488
}
485489

486490
private String getHostName(String uri) {
487-
int schemeIndex = uri.lastIndexOf('/');
488-
int portIndex = uri.lastIndexOf(':');
491+
int portIndex = uri.indexOf(':');
492+
if (portIndex == -1) {
493+
portIndex = uri.indexOf('/');
494+
}
489495
if (portIndex == -1) {
490496
portIndex = uri.length();
491497
}
492-
return uri.substring(schemeIndex + 1, portIndex);
498+
return uri.substring(0, portIndex);
493499
}
494500

495501
/* (non-Javadoc)
@@ -532,7 +538,7 @@ public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttA
532538
if (comms.isClosed()) {
533539
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
534540
}
535-
541+
536542
this.connOpts = options;
537543
this.userContext = userContext;
538544
final boolean automaticReconnect = options.isAutomaticReconnect();
@@ -576,7 +582,7 @@ public void connectionLost(Throwable cause) {
576582
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
577583
userToken.setActionCallback(connectActionListener);
578584
userToken.setUserContext(this);
579-
585+
580586
// If we are using the MqttCallbackExtended, set it on the connectActionListener
581587
if(this.mqttCallback instanceof MqttCallbackExtended){
582588
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
@@ -1025,7 +1031,7 @@ public IMqttDeliveryToken publish(String topic, MqttMessage message, Object user
10251031

10261032
return token;
10271033
}
1028-
1034+
10291035
/**
10301036
* User triggered attempt to reconnect
10311037
* @throws MqttException

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,13 @@ public void setServerURIs(String[] array) {
486486
protected static int validateURI(String srvURI) {
487487
try {
488488
URI vURI = new URI(srvURI);
489+
if (vURI.getScheme().equals("ws")){
490+
return URI_TYPE_WS;
491+
}
492+
else if (vURI.getScheme().equals("wss")) {
493+
return URI_TYPE_WSS;
494+
}
495+
489496
if (!vURI.getPath().equals("")) {
490497
throw new IllegalArgumentException(srvURI);
491498
}
@@ -498,12 +505,6 @@ else if (vURI.getScheme().equals("ssl")) {
498505
else if (vURI.getScheme().equals("local")) {
499506
return URI_TYPE_LOCAL;
500507
}
501-
else if (vURI.getScheme().equals("ws")){
502-
return URI_TYPE_WS;
503-
}
504-
else if (vURI.getScheme().equals("wss")) {
505-
return URI_TYPE_WSS;
506-
}
507508
else {
508509
throw new IllegalArgumentException(srvURI);
509510
}
@@ -529,7 +530,7 @@ public void setMqttVersion(int MqttVersion)throws IllegalArgumentException {
529530
}
530531
this.MqttVersion = MqttVersion;
531532
}
532-
533+
533534
/**
534535
* Returns whether the client will automatically attempt to reconnect to the
535536
* server if the connection is lost

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketHandshake.java

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.InputStreamReader;
2222
import java.io.OutputStream;
2323
import java.io.PrintWriter;
24+
import java.net.URI;
25+
import java.net.URISyntaxException;
2426
import java.security.MessageDigest;
2527
import java.security.NoSuchAlgorithmException;
2628
import java.util.ArrayList;
@@ -39,16 +41,22 @@ public class WebSocketHandshake {
3941
private static final String HTTP_HEADER_UPGRADE_WEBSOCKET = "websocket";
4042
private static final String EMPTY = "";
4143
private static final String LINE_SEPARATOR = "\r\n";
42-
44+
45+
private static final String HTTP_HEADER_CONNECTION = "connection";
46+
private static final String HTTP_HEADER_CONNECTION_VALUE = "upgrade";
47+
private static final String HTTP_HEADER_SEC_WEBSOCKET_PROTOCOL = "sec-websocket-protocol";
48+
4349
InputStream input;
4450
OutputStream output;
51+
String uri;
4552
String host;
4653
int port;
4754

4855

49-
public WebSocketHandshake(InputStream input, OutputStream output, String host, int port){
56+
public WebSocketHandshake(InputStream input, OutputStream output, String uri, String host, int port){
5057
this.input = input;
5158
this.output = output;
59+
this.uri = uri;
5260
this.host = host;
5361
this.port = port;
5462
}
@@ -73,16 +81,29 @@ public void execute() throws IOException {
7381
* @throws IOException
7482
*/
7583
private void sendHandshakeRequest(String key) throws IOException{
76-
PrintWriter pw = new PrintWriter(output);
77-
pw.print("GET /mqtt HTTP/1.1" + LINE_SEPARATOR);
78-
pw.print("Host: " + host + ":" + port + LINE_SEPARATOR);
79-
pw.print("Upgrade: websocket" + LINE_SEPARATOR);
80-
pw.print("Connection: Upgrade" + LINE_SEPARATOR);
81-
pw.print("Sec-WebSocket-Key: " + key + LINE_SEPARATOR);
82-
pw.print("Sec-WebSocket-Protocol: mqtt" + LINE_SEPARATOR);
83-
pw.print("Sec-WebSocket-Version: 13" + LINE_SEPARATOR);
84-
pw.print(LINE_SEPARATOR);
85-
pw.flush();
84+
try {
85+
String path = "/mqtt";
86+
URI srvUri = new URI(uri);
87+
if (srvUri.getRawPath() != null && !srvUri.getRawPath().isEmpty()) {
88+
path = srvUri.getRawPath();
89+
if (srvUri.getRawQuery() != null && !srvUri.getRawQuery().isEmpty()) {
90+
path += "?" + srvUri.getRawQuery();
91+
}
92+
}
93+
94+
PrintWriter pw = new PrintWriter(output);
95+
pw.print("GET " + path + " HTTP/1.1" + LINE_SEPARATOR);
96+
pw.print("Host: " + host + ":" + port + LINE_SEPARATOR);
97+
pw.print("Upgrade: websocket" + LINE_SEPARATOR);
98+
pw.print("Connection: Upgrade" + LINE_SEPARATOR);
99+
pw.print("Sec-WebSocket-Key: " + key + LINE_SEPARATOR);
100+
pw.print("Sec-WebSocket-Protocol: mqttv3.1" + LINE_SEPARATOR);
101+
pw.print("Sec-WebSocket-Version: 13" + LINE_SEPARATOR);
102+
pw.print(LINE_SEPARATOR);
103+
pw.flush();
104+
} catch (URISyntaxException e) {
105+
throw new IllegalStateException(e);
106+
}
86107
}
87108

88109
/**
@@ -102,11 +123,22 @@ private void receiveHandshakeResponse(String key) throws IOException {
102123
line = in.readLine();
103124
}
104125
Map headerMap = getHeaders(responseLines);
126+
127+
String connectionHeader = (String) headerMap.get(HTTP_HEADER_CONNECTION);
128+
if (connectionHeader == null || connectionHeader.equalsIgnoreCase(HTTP_HEADER_CONNECTION_VALUE)) {
129+
throw new IOException("WebSocket Response header: Incorrect connection header");
130+
}
131+
105132
String upgradeHeader = (String) headerMap.get(HTTP_HEADER_UPGRADE);
106133
if(!upgradeHeader.toLowerCase().contains(HTTP_HEADER_UPGRADE_WEBSOCKET)){
107134
throw new IOException("WebSocket Response header: Incorrect upgrade.");
108135
}
109-
136+
137+
String secWebsocketProtocolHeader = (String) headerMap.get(HTTP_HEADER_SEC_WEBSOCKET_PROTOCOL);
138+
if (secWebsocketProtocolHeader == null) {
139+
throw new IOException("WebSocket Response header: empty sec-websocket-protocol");
140+
}
141+
110142
if(!headerMap.containsKey(HTTP_HEADER_SEC_WEBSOCKET_ACCEPT)){
111143
throw new IOException("WebSocket Response header: Missing Sec-WebSocket-Accept");
112144
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModule.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.InputStream;
2121
import java.io.OutputStream;
2222
import java.io.PipedInputStream;
23-
import java.net.SocketException;
2423
import java.nio.ByteBuffer;
2524

2625
import javax.net.SocketFactory;
@@ -35,6 +34,7 @@ public class WebSocketNetworkModule extends TCPNetworkModule {
3534
private static final String CLASS_NAME = WebSocketNetworkModule.class.getName();
3635
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3736

37+
private String uri;
3838
private String host;
3939
private int port;
4040
private PipedInputStream pipedInputStream;
@@ -62,8 +62,9 @@ public void flush() throws IOException {
6262
}
6363
};
6464

65-
public WebSocketNetworkModule(SocketFactory factory, String host, int port, String resourceContext){
65+
public WebSocketNetworkModule(SocketFactory factory, String uri, String host, int port, String resourceContext){
6666
super(factory, host, port, resourceContext);
67+
this.uri = uri;
6768
this.host = host;
6869
this.port = port;
6970
this.pipedInputStream = new PipedInputStream();
@@ -73,7 +74,7 @@ public WebSocketNetworkModule(SocketFactory factory, String host, int port, Stri
7374

7475
public void start() throws IOException, MqttException {
7576
super.start();
76-
WebSocketHandshake handshake = new WebSocketHandshake(getSocketInputStream(), getSocketOutputStream(), host, port);
77+
WebSocketHandshake handshake = new WebSocketHandshake(getSocketInputStream(), getSocketOutputStream(), uri, host, port);
7778
handshake.execute();
7879
this.webSocketReceiver = new WebSocketReceiver(getSocketInputStream(), pipedInputStream);
7980
webSocketReceiver.start("webSocketReceiver");
@@ -104,7 +105,7 @@ public void stop() throws IOException {
104105
byte[] rawFrame = frame.encodeFrame();
105106
getSocketOutputStream().write(rawFrame);
106107
getSocketOutputStream().flush();
107-
108+
108109
if(webSocketReceiver != null){
109110
webSocketReceiver.stop();
110111
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketReceiver.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,24 @@
2424
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
2525

2626
public class WebSocketReceiver implements Runnable{
27-
27+
2828
private static final String CLASS_NAME = WebSocketReceiver.class.getName();
2929
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
30-
30+
3131
private boolean running = false;
3232
private boolean stopping = false;
3333
private Object lifecycle = new Object();
3434
private InputStream input;
3535
private Thread receiverThread = null;
3636
private volatile boolean receiving;
3737
private PipedOutputStream pipedOutputStream;
38-
38+
3939
public WebSocketReceiver(InputStream input, PipedInputStream pipedInputStream) throws IOException{
4040
this.input = input;
4141
this.pipedOutputStream = new PipedOutputStream();
4242
pipedInputStream.connect(pipedOutputStream);
4343
}
44-
44+
4545
/**
4646
* Starts up the WebSocketReceiver's thread
4747
*/
@@ -57,7 +57,7 @@ public void start(String threadName){
5757
}
5858
}
5959
}
60-
60+
6161
/**
6262
* Stops this WebSocketReceiver's thread.
6363
* This call will block.
@@ -89,51 +89,49 @@ public void stop() {
8989

9090
public void run() {
9191
final String methodName = "run";
92-
92+
9393
while (running && (input != null)) {
9494
try {
9595
//@TRACE 852=network read message
9696
log.fine(CLASS_NAME, methodName, "852");
9797
receiving = input.available() > 0;
9898
WebSocketFrame incomingFrame = new WebSocketFrame(input);
9999
if(!incomingFrame.isCloseFlag()){
100-
for(int i = 0; i < incomingFrame.getPayload().length; i++){
101-
pipedOutputStream.write(incomingFrame.getPayload()[i]);
102-
}
103-
104-
pipedOutputStream.flush();
100+
for(int i = 0; i < incomingFrame.getPayload().length; i++){
101+
pipedOutputStream.write(incomingFrame.getPayload()[i]);
102+
}
103+
104+
pipedOutputStream.flush();
105105
} else {
106106
if(!stopping){
107107
throw new IOException("Server sent a WebSocket Frame with the Stop OpCode");
108108
}
109109
}
110-
110+
111111
receiving = false;
112-
112+
113113
} catch (IOException ex) {
114-
// Exception occurred whilst reading the stream.
115-
System.out.println("WebSocketReceiver.java : run(): Exception Occured.");
116-
ex.printStackTrace();
117-
closeOutputStream();
114+
// Exception occurred whilst reading the stream.
115+
this.stop();
118116
}
119117
}
120118
}
121-
119+
122120
private void closeOutputStream(){
123121
try {
124122
pipedOutputStream.close();
125123
} catch (IOException e) {
126124
}
127125
}
128-
126+
129127

130128
public boolean isRunning() {
131129
return running;
132130
}
133-
131+
134132
/**
135133
* Returns the receiving state.
136-
*
134+
*
137135
* @return true if the receiver is receiving data, false otherwise.
138136
*/
139137
public boolean isReceiving(){

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModule.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class WebSocketSecureNetworkModule extends SSLNetworkModule{
3636

3737
private PipedInputStream pipedInputStream;
3838
private WebSocketReceiver webSocketReceiver;
39+
private String uri;
3940
private String host;
4041
private int port;
4142
ByteBuffer recievedPayload;
@@ -61,8 +62,9 @@ public void flush() throws IOException {
6162
}
6263
};
6364

64-
public WebSocketSecureNetworkModule(SSLSocketFactory factory, String host, int port, String clientId) {
65+
public WebSocketSecureNetworkModule(SSLSocketFactory factory, String uri, String host, int port, String clientId) {
6566
super(factory, host, port, clientId);
67+
this.uri = uri;
6668
this.host = host;
6769
this.port = port;
6870
this.pipedInputStream = new PipedInputStream();
@@ -71,7 +73,7 @@ public WebSocketSecureNetworkModule(SSLSocketFactory factory, String host, int p
7173

7274
public void start() throws IOException, MqttException {
7375
super.start();
74-
WebSocketHandshake handshake = new WebSocketHandshake(super.getInputStream(), super.getOutputStream(), host, port);
76+
WebSocketHandshake handshake = new WebSocketHandshake(super.getInputStream(), super.getOutputStream(), uri, host, port);
7577
handshake.execute();
7678
this.webSocketReceiver = new WebSocketReceiver(getSocketInputStream(), pipedInputStream);
7779
webSocketReceiver.start("WssSocketReceiver");
@@ -95,6 +97,12 @@ public OutputStream getOutputStream() throws IOException {
9597
}
9698

9799
public void stop() throws IOException {
100+
// Creating Close Frame
101+
WebSocketFrame frame = new WebSocketFrame((byte)0x08, true, "1000".getBytes());
102+
byte[] rawFrame = frame.encodeFrame();
103+
getSocketOutputStream().write(rawFrame);
104+
getSocketOutputStream().flush();
105+
98106
if(webSocketReceiver != null){
99107
webSocketReceiver.stop();
100108
}

0 commit comments

Comments
 (0)