Skip to content

Commit d669d22

Browse files
vit21ikvitalii-temy
authored andcommitted
Added custom headers support for WebSocket connection (502)
Signed-off-by: Vitalii <vitalii.vlasiuk@temy.co>
1 parent dbb3e9f commit d669d22

11 files changed

Lines changed: 106 additions & 18 deletions

File tree

MQTTv3.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,20 @@ public class MqttPublishSample {
9898
}
9999
```
100100

101+
## Adding custom headers for Websocket connection
102+
103+
The included code below is a extended basic sample that connects to a server with custom headers.
104+
105+
```
106+
MqttClient client = new MqttClient("wss://<BROKER_URI>", "MyClient");
107+
108+
MqttConnectOptions connectOptions = new MqttConnectOptions();
109+
Properties properties = new Properties();
110+
properties.setProperty("X-Amz-CustomAuthorizer-Name", <SOME_VALUE>);
111+
properties.setProperty("X-Amz-CustomAuthorizer-Signature", <SOME_VALUE>);
112+
properties.setProperty(<SOME_VALUE>, <SOME_VALUE>);
113+
connectOptions.setCustomWebSocketHeaders(properties);
114+
115+
client.connect(connectOptions);
116+
117+
```

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ else if ((factory instanceof SSLSocketFactory) == false) {
635635
else if (factory instanceof SSLSocketFactory) {
636636
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
637637
}
638-
netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
638+
netModule = new WebSocketNetworkModule(factory, address, host, port, clientId, options.getCustomWebSocketHeaders());
639639
((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
640640
break;
641641
case MqttConnectOptions.URI_TYPE_WSS:
@@ -656,7 +656,7 @@ else if ((factory instanceof SSLSocketFactory) == false) {
656656
}
657657

658658
// Create the network module...
659-
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
659+
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId, options.getCustomWebSocketHeaders());
660660
((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
661661
((WebSocketSecureNetworkModule)netModule).setSSLHostnameVerifier(options.getSSLHostnameVerifier());
662662
((WebSocketSecureNetworkModule)netModule).setHttpsHostnameVerificationEnabled(options.isHttpsHostnameVerificationEnabled());

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class MqttConnectOptions {
8282
private int mqttVersion = MQTT_VERSION_DEFAULT;
8383
private boolean automaticReconnect = false;
8484
private int maxReconnectDelay = 128000;
85+
private Properties customWebSocketHeaders = null;
8586

8687
/**
8788
* Constructs a new <code>MqttConnectOptions</code> object using the
@@ -650,6 +651,20 @@ public Properties getDebug() {
650651
return p;
651652
}
652653

654+
/**
655+
* Sets the Custom WebSocket Headers for the WebSocket Connection.
656+
*
657+
* @param props The custom websocket headers {@link Properties}
658+
*/
659+
660+
public void setCustomWebSocketHeaders(Properties props) {
661+
this.customWebSocketHeaders = props;
662+
}
663+
664+
public Properties getCustomWebSocketHeaders() {
665+
return customWebSocketHeaders;
666+
}
667+
653668
public String toString() {
654669
return Debug.dumpProperties(getDebug(), "Connection options");
655670
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketHandshake.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import java.util.HashMap;
3030
import java.util.Map;
3131
import java.util.UUID;
32+
import java.util.Properties;
33+
import java.util.Set;
34+
import java.util.Iterator;
3235
/**
3336
* Helper class to execute a WebSocket Handshake.
3437
*/
@@ -52,14 +55,15 @@ public class WebSocketHandshake {
5255
String uri;
5356
String host;
5457
int port;
58+
Properties customWebSocketHeaders;
5559

56-
57-
public WebSocketHandshake(InputStream input, OutputStream output, String uri, String host, int port){
60+
public WebSocketHandshake(InputStream input, OutputStream output, String uri, String host, int port, Properties customWebSocketHeaders){
5861
this.input = input;
5962
this.output = output;
6063
this.uri = uri;
6164
this.host = host;
6265
this.port = port;
66+
this.customWebSocketHeaders = customWebSocketHeaders;
6367
}
6468

6569

@@ -108,6 +112,16 @@ private void sendHandshakeRequest(String key) throws IOException{
108112
pw.print("Sec-WebSocket-Protocol: mqtt" + LINE_SEPARATOR);
109113
pw.print("Sec-WebSocket-Version: 13" + LINE_SEPARATOR);
110114

115+
if (customWebSocketHeaders != null) {
116+
Set keys = customWebSocketHeaders.keySet();
117+
Iterator i = keys.iterator();
118+
while (i.hasNext()) {
119+
String k = (String) i.next();
120+
String value = customWebSocketHeaders.getProperty(k);
121+
pw.print(k + ": " + value + LINE_SEPARATOR);
122+
}
123+
}
124+
111125
String userInfo = srvUri.getUserInfo();
112126
if(userInfo != null) {
113127
pw.print("Authorization: Basic " + Base64.encode(userInfo) + LINE_SEPARATOR);

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModule.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.OutputStream;
2222
import java.io.PipedInputStream;
2323
import java.nio.ByteBuffer;
24+
import java.util.Properties;
2425

2526
import javax.net.SocketFactory;
2627

@@ -37,6 +38,7 @@ public class WebSocketNetworkModule extends TCPNetworkModule {
3738
private String uri;
3839
private String host;
3940
private int port;
41+
private Properties customWebsocketHeaders;
4042
private PipedInputStream pipedInputStream;
4143
private WebSocketReceiver webSocketReceiver;
4244
ByteBuffer recievedPayload;
@@ -47,20 +49,21 @@ public class WebSocketNetworkModule extends TCPNetworkModule {
4749
* Frame before passing it through to the real socket.
4850
*/
4951
private ByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(this);
50-
51-
public WebSocketNetworkModule(SocketFactory factory, String uri, String host, int port, String resourceContext){
52+
53+
public WebSocketNetworkModule(SocketFactory factory, String uri, String host, int port, String resourceContext, Properties customWebsocketHeaders){
5254
super(factory, host, port, resourceContext);
5355
this.uri = uri;
5456
this.host = host;
5557
this.port = port;
58+
this.customWebsocketHeaders = customWebsocketHeaders;
5659
this.pipedInputStream = new PipedInputStream();
5760

5861
log.setResourceName(resourceContext);
5962
}
6063

6164
public void start() throws IOException, MqttException {
6265
super.start();
63-
WebSocketHandshake handshake = new WebSocketHandshake(getSocketInputStream(), getSocketOutputStream(), uri, host, port);
66+
WebSocketHandshake handshake = new WebSocketHandshake(getSocketInputStream(), getSocketOutputStream(), uri, host, port, customWebsocketHeaders);
6467
handshake.execute();
6568
this.webSocketReceiver = new WebSocketReceiver(getSocketInputStream(), pipedInputStream);
6669
webSocketReceiver.start("webSocketReceiver");

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModule.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.OutputStream;
2222
import java.io.PipedInputStream;
2323
import java.nio.ByteBuffer;
24-
24+
import java.util.Properties;
2525
import javax.net.ssl.SSLSocketFactory;
2626

2727
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -39,6 +39,7 @@ public class WebSocketSecureNetworkModule extends SSLNetworkModule{
3939
private String uri;
4040
private String host;
4141
private int port;
42+
private Properties customWebSocketHeaders;
4243
ByteBuffer recievedPayload;
4344

4445
/**
@@ -48,18 +49,19 @@ public class WebSocketSecureNetworkModule extends SSLNetworkModule{
4849
*/
4950
private ByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(this);
5051

51-
public WebSocketSecureNetworkModule(SSLSocketFactory factory, String uri, String host, int port, String clientId) {
52+
public WebSocketSecureNetworkModule(SSLSocketFactory factory, String uri, String host, int port, String clientId, Properties customWebSocketHeaders) {
5253
super(factory, host, port, clientId);
5354
this.uri = uri;
5455
this.host = host;
5556
this.port = port;
57+
this.customWebSocketHeaders = customWebSocketHeaders;
5658
this.pipedInputStream = new PipedInputStream();
5759
log.setResourceName(clientId);
5860
}
5961

6062
public void start() throws IOException, MqttException {
6163
super.start();
62-
WebSocketHandshake handshake = new WebSocketHandshake(super.getInputStream(), super.getOutputStream(), uri, host, port);
64+
WebSocketHandshake handshake = new WebSocketHandshake(super.getInputStream(), super.getOutputStream(), uri, host, port, customWebSocketHeaders);
6365
handshake.execute();
6466
this.webSocketReceiver = new WebSocketReceiver(getSocketInputStream(), pipedInputStream);
6567
webSocketReceiver.start("WssSocketReceiver");

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,7 @@ private NetworkModule createNetworkModule(String address, MqttConnectionOptions
773773
}
774774
netModule = new WebSocketNetworkModule(factory, address, host, port, this.mqttSession.getClientId());
775775
((WebSocketNetworkModule) netModule).setConnectTimeout(options.getConnectionTimeout());
776+
((WebSocketNetworkModule) netModule).setCustomWebSocketHeaders(options.getCustomWebSocketHeaders());
776777
break;
777778
case WSS:
778779
if (port == -1) {
@@ -794,6 +795,7 @@ private NetworkModule createNetworkModule(String address, MqttConnectionOptions
794795
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port,
795796
this.mqttSession.getClientId());
796797
((WebSocketSecureNetworkModule) netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
798+
((WebSocketSecureNetworkModule) netModule).setCustomWebSocketHeaders(options.getCustomWebSocketHeaders());
797799
// Ciphers suites need to be set, if they are available
798800
if (wSSFactoryFactory != null) {
799801
String[] enabledCiphers = wSSFactoryFactory.getEnabledCipherSuites(null);

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttConnectionOptions.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import java.net.URI;
2222
import java.net.URISyntaxException;
23+
import java.util.Collections;
2324
import java.util.List;
25+
import java.util.Map;
2426
import java.util.Properties;
2527

2628
import javax.net.SocketFactory;
@@ -117,7 +119,7 @@ public void setWillMessageProperties(MqttProperties willMessageProperties) {
117119
private SocketFactory socketFactory; // SocketFactory to be used to connect
118120
private Properties sslClientProps = null; // SSL Client Properties
119121
private HostnameVerifier sslHostnameVerifier = null; // SSL Hostname Verifier
120-
122+
private Map<String, String> customWebSocketHeaders;
121123
/**
122124
* Returns the MQTT version.
123125
*
@@ -960,6 +962,19 @@ public Properties getDebug() {
960962
return p;
961963
}
962964

965+
/**
966+
* Sets the Custom WebSocket Headers for the WebSocket Connection.
967+
*
968+
* @param headers The custom websocket headers {@link Properties}
969+
*/
970+
public void setCustomWebSocketHeaders(Map<String, String> headers) {
971+
this.customWebSocketHeaders = Collections.unmodifiableMap(headers);
972+
}
973+
974+
public Map<String, String> getCustomWebSocketHeaders() {
975+
return customWebSocketHeaders;
976+
}
977+
963978
public String toString() {
964979
return Debug.dumpProperties(getDebug(), "Connection options");
965980
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/websocket/WebSocketHandshake.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ public class WebSocketHandshake {
5252
String uri;
5353
String host;
5454
int port;
55+
Map<String, String> customWebSocketHeaders;
5556

56-
public WebSocketHandshake(InputStream input, OutputStream output, String uri, String host, int port) {
57+
public WebSocketHandshake(InputStream input, OutputStream output, String uri, String host, int port, Map<String, String> customWebSocketHeaders) {
5758
this.input = input;
5859
this.output = output;
5960
this.uri = uri;
6061
this.host = host;
6162
this.port = port;
63+
this.customWebSocketHeaders = customWebSocketHeaders;
6264
}
6365

6466
/**
@@ -108,6 +110,12 @@ private void sendHandshakeRequest(String key) {
108110
pw.print("Sec-WebSocket-Protocol: mqtt" + LINE_SEPARATOR);
109111
pw.print("Sec-WebSocket-Version: 13" + LINE_SEPARATOR);
110112

113+
if (customWebSocketHeaders != null) {
114+
customWebSocketHeaders.entrySet().forEach(entry ->
115+
pw.print(entry.getKey() + ": " + entry.getValue() + LINE_SEPARATOR)
116+
);
117+
}
118+
111119
String userInfo = srvUri.getUserInfo();
112120
if (userInfo != null) {
113121
pw.print("Authorization: Basic " + Base64.encode(userInfo) + LINE_SEPARATOR);

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/websocket/WebSocketNetworkModule.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import java.io.OutputStream;
2222
import java.io.PipedInputStream;
2323
import java.nio.ByteBuffer;
24-
24+
import java.util.Collections;
25+
import java.util.Map;
2526
import javax.net.SocketFactory;
2627

2728
import org.eclipse.paho.mqttv5.client.internal.TCPNetworkModule;
@@ -40,7 +41,8 @@ public class WebSocketNetworkModule extends TCPNetworkModule {
4041
private PipedInputStream pipedInputStream;
4142
private WebSocketReceiver webSocketReceiver;
4243
ByteBuffer recievedPayload;
43-
44+
Map<String, String> customWebSocketHeaders;
45+
4446
/**
4547
* Overrides the flush method.
4648
* This allows us to encode the MQTT payload into a WebSocket
@@ -60,7 +62,7 @@ public WebSocketNetworkModule(SocketFactory factory, String uri, String host, in
6062

6163
public void start() throws IOException, MqttException {
6264
super.start();
63-
WebSocketHandshake handshake = new WebSocketHandshake(getSocketInputStream(), getSocketOutputStream(), uri, host, port);
65+
WebSocketHandshake handshake = new WebSocketHandshake(getSocketInputStream(), getSocketOutputStream(), uri, host, port, customWebSocketHeaders);
6466
handshake.execute();
6567
this.webSocketReceiver = new WebSocketReceiver(getSocketInputStream(), pipedInputStream);
6668
webSocketReceiver.start("webSocketReceiver");
@@ -81,7 +83,11 @@ public InputStream getInputStream() throws IOException {
8183
public OutputStream getOutputStream() throws IOException {
8284
return outputStream;
8385
}
84-
86+
87+
public void setCustomWebSocketHeaders(Map<String, String> customWebSocketHeaders) {
88+
this.customWebSocketHeaders = customWebSocketHeaders;
89+
}
90+
8591
/**
8692
* Stops the module, by closing the TCP socket.
8793
*/

0 commit comments

Comments
 (0)