Skip to content

Commit 513a77a

Browse files
committed
Bug: 482432 - WebSocket handles frames over 125 byte incorrectly
Fixed WebSocketFrame and added test to make sure that this works. Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent b2ff952 commit 513a77a

2 files changed

Lines changed: 74 additions & 3 deletions

File tree

  • org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test
  • org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/WebSocketTest.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313

1414
package org.eclipse.paho.client.mqttv3.test;
1515

16+
import java.io.IOException;
1617
import java.net.URI;
1718
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.Random;
1821
import java.util.logging.Level;
1922
import java.util.logging.Logger;
2023

@@ -183,6 +186,62 @@ public void testWebSocketPubSub() throws Exception {
183186
}
184187
}
185188
}
189+
190+
/**
191+
* Tests Websocker support for packets over 16KB
192+
* Prompted by Bug: 482432
193+
* https://bugs.eclipse.org/bugs/show_bug.cgi?id=482432
194+
* This test connects to a broker via WebSockets, subscribes
195+
* to a topic, publishes a large payload to it and checks
196+
* that it recieves the same payload.
197+
* @throws Exception
198+
*/
199+
@Test
200+
public void largePayloadTest() throws Exception{
201+
// Generate large byte array;
202+
byte[] largeByteArray = new byte[32000];
203+
new Random().nextBytes(largeByteArray);
204+
String methodName = Utility.getMethodName();
205+
LoggingUtilities.banner(log, cclass, methodName);
206+
207+
IMqttClient client = null;
208+
try {
209+
String topicStr = "topic_largeFile_01";
210+
String clientId = methodName;
211+
client = clientFactory.createMqttClient(serverURI, clientId);
212+
213+
log.info("Assigning callback...");
214+
MessageListener listener = new MessageListener();
215+
client.setCallback(listener);
216+
217+
log.info("Connecting... serverURI:" + serverURI + ", ClientId:" + clientId);
218+
client.connect();
219+
220+
log.info("Subscribing to..." + topicStr);
221+
client.subscribe(topicStr);
222+
223+
log.info("Publishing to..." + topicStr);
224+
MqttTopic topic = client.getTopic(topicStr);
225+
MqttMessage message = new MqttMessage(largeByteArray);
226+
topic.publish(message);
227+
228+
log.info("Checking msg");
229+
MqttMessage msg = listener.getNextMessage();
230+
Assert.assertNotNull(msg);
231+
Assert.assertTrue(Arrays.equals(largeByteArray, msg.getPayload()));
232+
log.info("Disconnecting...");
233+
client.disconnect();
234+
log.info("Disconnected...");
235+
} catch (Exception e){
236+
e.printStackTrace();
237+
} finally {
238+
if (client != null) {
239+
log.info("Close...");
240+
client.close();
241+
}
242+
}
243+
244+
}
186245

187246

188247

@@ -211,7 +270,8 @@ public MqttMessage getNextMessage() {
211270
synchronized (messages) {
212271
if (messages.size() == 0) {
213272
try {
214-
messages.wait(1000);
273+
// Wait a bit longer than usual because of the largePayloadTest
274+
messages.wait(10000);
215275
}
216276
catch (InterruptedException e) {
217277
// empty

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketFrame.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ public WebSocketFrame(InputStream input) throws IOException {
144144
}
145145

146146
// Decode the payload length
147-
while (--byteCount > 0){
147+
if(byteCount > 0){
148+
payloadLength = 0;
149+
}
150+
while (--byteCount >= 0){
148151
maskLengthByte = (byte) input.read();
149152
payloadLength |= (maskLengthByte & 0xFF) << (8 * byteCount);
150153
}
@@ -157,7 +160,15 @@ public WebSocketFrame(InputStream input) throws IOException {
157160
}
158161

159162
this.payload = new byte[payloadLength];
160-
input.read(this.payload,0,payloadLength);
163+
int offsetIndex = 0;
164+
int tempLength = payloadLength;
165+
int bytesRead = 0;
166+
while (offsetIndex != payloadLength){
167+
bytesRead = input.read(this.payload,offsetIndex,tempLength);
168+
offsetIndex += bytesRead;
169+
tempLength -= bytesRead;
170+
}
171+
161172

162173
// Demask if needed
163174
if(masked)

0 commit comments

Comments
 (0)