Skip to content

Commit ec97a95

Browse files
shiehnpinjpwsutton
authored andcommitted
Resolved issue Ignoring InnerClasses attribute for an anonymous inner class. (#368)
Signed-off-by: Hsieh Enping <shieh.npin@gmail.com>
1 parent 46edc58 commit ec97a95

5 files changed

Lines changed: 141 additions & 93 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -838,30 +838,39 @@ public void notifyReconnect() {
838838
if(disconnectedMessageBuffer != null){
839839
//@TRACE 509=Client Reconnected, Offline Buffer Available. Sending Buffered Messages.
840840
log.fine(CLASS_NAME, methodName, "509");
841-
disconnectedMessageBuffer.setPublishCallback(new IDisconnectedBufferCallback() {
842-
843-
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
844-
if (isConnected()) {
845-
while(clientState.getActualInFlight() >= (clientState.getMaxInFlight()-1)){
846-
// We need to Yield to the other threads to allow the in flight messages to clear
847-
Thread.yield();
848-
849-
}
850-
//@TRACE 510=Publising Buffered message message={0}
851-
log.fine(CLASS_NAME, methodName, "510", new Object[] {bufferedMessage.getMessage().getKey()});
852-
internalSend(bufferedMessage.getMessage(), bufferedMessage.getToken());
853-
// Delete from persistence if in there
854-
clientState.unPersistBufferedMessage(bufferedMessage.getMessage());
855-
} else {
856-
//@TRACE 208=failed: not connected
857-
log.fine(CLASS_NAME, methodName, "208");
858-
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
859-
}
860-
}
861-
});
841+
842+
disconnectedMessageBuffer.setPublishCallback(new ReconnectDisconnectedBufferCallback(methodName));
862843
executorService.execute(disconnectedMessageBuffer);
863844
}
864845
}
846+
847+
class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback{
848+
849+
final String methodName;
850+
851+
ReconnectDisconnectedBufferCallback(String methodName) {
852+
this.methodName = methodName;
853+
}
854+
855+
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
856+
if (isConnected()) {
857+
while(clientState.getActualInFlight() >= (clientState.getMaxInFlight()-1)){
858+
// We need to Yield to the other threads to allow the in flight messages to clear
859+
Thread.yield();
860+
861+
}
862+
//@TRACE 510=Publising Buffered message message={0}
863+
log.fine(CLASS_NAME, methodName, "510", new Object[] {bufferedMessage.getMessage().getKey()});
864+
internalSend(bufferedMessage.getMessage(), bufferedMessage.getToken());
865+
// Delete from persistence if in there
866+
clientState.unPersistBufferedMessage(bufferedMessage.getMessage());
867+
} else {
868+
//@TRACE 208=failed: not connected
869+
log.fine(CLASS_NAME, methodName, "208");
870+
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
871+
}
872+
}
873+
}
865874

866875
public int getActualInFlight() {
867876
return this.clientState.getActualInFlight();

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -638,26 +638,7 @@ public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttA
638638
userContext,
639639
callback });
640640
comms.setNetworkModules(createNetworkModules(serverURI, options));
641-
comms.setReconnectCallback(new MqttCallbackExtended() {
642-
643-
public void messageArrived(String topic, MqttMessage message) throws Exception {
644-
}
645-
public void deliveryComplete(IMqttDeliveryToken token) {
646-
}
647-
public void connectComplete(boolean reconnect, String serverURI) {
648-
}
649-
650-
public void connectionLost(Throwable cause) {
651-
if(automaticReconnect){
652-
// Automatic reconnect is set so make sure comms is in resting state
653-
comms.setRestingState(true);
654-
reconnecting = true;
655-
startReconnectCycle();
656-
}
657-
}
658-
});
659-
660-
641+
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));
661642

662643

663644
// Insert our own callback to iterate through the URIs till the connect succeeds
@@ -676,6 +657,7 @@ public void connectionLost(Throwable cause) {
676657

677658
return userToken;
678659
}
660+
679661

680662
/* (non-Javadoc)
681663
* @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#disconnect(java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener)
@@ -1185,24 +1167,7 @@ private void attemptReconnect(){
11851167
//@Trace 500=Attempting to reconnect client: {0}
11861168
log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId});
11871169
try {
1188-
connect(this.connOpts, this.userContext,new IMqttActionListener() {
1189-
1190-
public void onSuccess(IMqttToken asyncActionToken) {
1191-
//@Trace 501=Automatic Reconnect Successful: {0}
1192-
log.fine(CLASS_NAME, methodName, "501", new Object[]{asyncActionToken.getClient().getClientId()});
1193-
comms.setRestingState(false);
1194-
stopReconnectCycle();
1195-
}
1196-
1197-
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
1198-
//@Trace 502=Automatic Reconnect failed, rescheduling: {0}
1199-
log.fine(CLASS_NAME, methodName, "502", new Object[]{asyncActionToken.getClient().getClientId()});
1200-
if(reconnectDelay < 128000){
1201-
reconnectDelay = reconnectDelay * 2;
1202-
}
1203-
rescheduleReconnectCycle(reconnectDelay);
1204-
}
1205-
});
1170+
connect(this.connOpts, this.userContext,new MqttReconnectActionListener(methodName));
12061171
} catch (MqttSecurityException ex) {
12071172
//@TRACE 804=exception
12081173
log.fine(CLASS_NAME,methodName,"804",null, ex);
@@ -1263,7 +1228,58 @@ public void run() {
12631228
attemptReconnect();
12641229
}
12651230
}
1231+
1232+
class MqttReconnectCallback implements MqttCallbackExtended{
1233+
1234+
final boolean automaticReconnect;
1235+
1236+
MqttReconnectCallback(boolean isAutomaticReconnect){
1237+
automaticReconnect = isAutomaticReconnect;
1238+
}
1239+
1240+
public void connectionLost(Throwable cause) {
1241+
if(automaticReconnect){
1242+
// Automatic reconnect is set so make sure comms is in resting state
1243+
comms.setRestingState(true);
1244+
reconnecting = true;
1245+
startReconnectCycle();
1246+
}
1247+
}
1248+
1249+
public void messageArrived(String topic, MqttMessage message) throws Exception {}
1250+
1251+
public void deliveryComplete(IMqttDeliveryToken token) {}
1252+
1253+
public void connectComplete(boolean reconnect, String serverURI) {}
1254+
1255+
}
1256+
1257+
class MqttReconnectActionListener implements IMqttActionListener{
1258+
1259+
final String methodName;
1260+
1261+
MqttReconnectActionListener(String methodName) {
1262+
this.methodName = methodName;
1263+
}
12661264

1265+
public void onSuccess(IMqttToken asyncActionToken) {
1266+
//@Trace 501=Automatic Reconnect Successful: {0}
1267+
log.fine(CLASS_NAME, methodName, "501", new Object[]{asyncActionToken.getClient().getClientId()});
1268+
comms.setRestingState(false);
1269+
stopReconnectCycle();
1270+
}
1271+
1272+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
1273+
//@Trace 502=Automatic Reconnect failed, rescheduling: {0}
1274+
log.fine(CLASS_NAME, methodName, "502", new Object[]{asyncActionToken.getClient().getClientId()});
1275+
if(reconnectDelay < 128000){
1276+
reconnectDelay = reconnectDelay * 2;
1277+
}
1278+
rescheduleReconnectCycle(reconnectDelay);
1279+
}
1280+
1281+
}
1282+
12671283
/**
12681284
* Sets the DisconnectedBufferOptions for this client
12691285
* @param bufferOpts the {@link DisconnectedBufferOptions}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.eclipse.paho.client.mqttv3.internal.websocket;
2+
3+
import java.io.ByteArrayOutputStream;
4+
import java.io.IOException;
5+
import java.io.OutputStream;
6+
import java.nio.ByteBuffer;
7+
8+
import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule;
9+
10+
class ExtendedByteArrayOutputStream extends ByteArrayOutputStream {
11+
12+
final WebSocketNetworkModule webSocketNetworkModule;
13+
final WebSocketSecureNetworkModule webSocketSecureNetworkModule;
14+
15+
ExtendedByteArrayOutputStream(WebSocketNetworkModule module) {
16+
this.webSocketNetworkModule = module;
17+
this.webSocketSecureNetworkModule = null;
18+
}
19+
20+
ExtendedByteArrayOutputStream(WebSocketSecureNetworkModule module) {
21+
this.webSocketNetworkModule = null;
22+
this.webSocketSecureNetworkModule = module;
23+
}
24+
25+
public void flush() throws IOException {
26+
final ByteBuffer byteBuffer;
27+
synchronized (this) {
28+
byteBuffer = ByteBuffer.wrap(toByteArray());
29+
reset();
30+
}
31+
WebSocketFrame frame = new WebSocketFrame((byte)0x02, true, byteBuffer.array());
32+
byte[] rawFrame = frame.encodeFrame();
33+
getSocketOutputStream().write(rawFrame);
34+
getSocketOutputStream().flush();
35+
36+
}
37+
38+
OutputStream getSocketOutputStream() throws IOException {
39+
40+
if(webSocketNetworkModule != null ){
41+
return webSocketNetworkModule.getSocketOutputStream();
42+
}
43+
if(webSocketSecureNetworkModule != null){
44+
return webSocketSecureNetworkModule.getSocketOutputStream();
45+
}
46+
return null;
47+
}
48+
49+
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModule.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,7 @@ public class WebSocketNetworkModule extends TCPNetworkModule {
4646
* This allows us to encode the MQTT payload into a WebSocket
4747
* Frame before passing it through to the real socket.
4848
*/
49-
private ByteArrayOutputStream outputStream = new ByteArrayOutputStream(){
50-
51-
public void flush() throws IOException {
52-
final ByteBuffer byteBuffer;
53-
synchronized (this) {
54-
byteBuffer = ByteBuffer.wrap(toByteArray());
55-
reset();
56-
}
57-
WebSocketFrame frame = new WebSocketFrame((byte)0x02, true, byteBuffer.array());
58-
byte[] rawFrame = frame.encodeFrame();
59-
getSocketOutputStream().write(rawFrame);
60-
getSocketOutputStream().flush();
61-
62-
}
63-
};
49+
private ByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(this);
6450

6551
public WebSocketNetworkModule(SocketFactory factory, String uri, String host, int port, String resourceContext){
6652
super(factory, host, port, resourceContext);
@@ -80,11 +66,11 @@ public void start() throws IOException, MqttException {
8066
webSocketReceiver.start("webSocketReceiver");
8167
}
8268

83-
private OutputStream getSocketOutputStream() throws IOException {
69+
OutputStream getSocketOutputStream() throws IOException {
8470
return super.getOutputStream();
8571
}
8672

87-
private InputStream getSocketInputStream() throws IOException {
73+
InputStream getSocketInputStream() throws IOException {
8874
return super.getInputStream();
8975
}
9076

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModule.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,7 @@ public class WebSocketSecureNetworkModule extends SSLNetworkModule{
4646
* This allows us to encode the MQTT payload into a WebSocket
4747
* Frame before passing it through to the real socket.
4848
*/
49-
private ByteArrayOutputStream outputStream = new ByteArrayOutputStream(){
50-
51-
public void flush() throws IOException {
52-
final ByteBuffer byteBuffer;
53-
synchronized (this) {
54-
byteBuffer = ByteBuffer.wrap(toByteArray());
55-
reset();
56-
}
57-
WebSocketFrame frame = new WebSocketFrame((byte)0x02, true, byteBuffer.array());
58-
byte[] rawFrame = frame.encodeFrame();
59-
getSocketOutputStream().write(rawFrame);
60-
getSocketOutputStream().flush();
61-
62-
}
63-
};
49+
private ByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(this);
6450

6551
public WebSocketSecureNetworkModule(SSLSocketFactory factory, String uri, String host, int port, String clientId) {
6652
super(factory, host, port, clientId);
@@ -80,11 +66,11 @@ public void start() throws IOException, MqttException {
8066

8167
}
8268

83-
private OutputStream getSocketOutputStream() throws IOException {
69+
OutputStream getSocketOutputStream() throws IOException {
8470
return super.getOutputStream();
8571
}
8672

87-
private InputStream getSocketInputStream() throws IOException {
73+
InputStream getSocketInputStream() throws IOException {
8874
return super.getInputStream();
8975
}
9076

@@ -112,6 +98,8 @@ public void stop() throws IOException {
11298
public String getServerURI() {
11399
return "wss://" + host + ":" + port;
114100
}
101+
102+
115103

116104

117105
}

0 commit comments

Comments
 (0)