diff --git a/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java b/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java index b0c433498..d0248c592 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java @@ -22,8 +22,7 @@ import java.util.Enumeration; import java.util.Properties; import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.eclipse.paho.client.mqttv3.BufferedMessage; import org.eclipse.paho.client.mqttv3.IMqttActionListener; @@ -271,9 +270,8 @@ public void close(boolean force) throws MqttException { * @param token The {@link MqttToken} to track the connection * @throws MqttException if an error occurs when connecting */ - public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { + public synchronized void connect(MqttConnectOptions options, MqttToken token) throws MqttException { final String methodName = "connect"; - synchronized (conLock) { if (isDisconnected() && !closePending) { //@TRACE 214=state=CONNECTING log.fine(CLASS_NAME,methodName,"214"); @@ -297,7 +295,11 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce tokenStore.open(); ConnectBG conbg = new ConnectBG(this, token, connect, executorService); - conbg.start(); + try { + conbg.start(); + } catch (MqttException e){ + throw e; + } } else { // @TRACE 207=connect failed: not disconnected {0} @@ -312,7 +314,6 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); } } - } } public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException { @@ -400,7 +401,8 @@ public void shutdownConnection(MqttToken token, MqttException reason) { // Clean session handling and tidy up clientState.disconnected(reason); if (clientState.getCleanSession()) - callback.removeMessageListeners(); + + callback.removeMessageListeners(); }catch(Exception ex) { // Ignore as we are shutting down } @@ -417,8 +419,10 @@ public void shutdownConnection(MqttToken token, MqttException reason) { } }catch(Exception ex) { + // Ignore as we are shutting down } + // All disconnect logic has been completed allowing the // client to be marked as disconnected. synchronized(conLock) { @@ -427,6 +431,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) { conState = DISCONNECTED; stoppingComms = false; + } // Internal disconnect processing has completed. If there @@ -436,6 +441,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) { // any outstanding tokens and unblock any waiters if (endToken != null && callback != null) { callback.asyncOperationComplete(endToken); + } if (wasConnected && callback != null) { @@ -449,6 +455,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) { try { close(true); } catch (Exception e) { // ignore any errors as closing + } } } @@ -692,11 +699,17 @@ private class ConnectBG implements Runnable { threadName = "MQTT Con: "+getClient().getClientId(); } - void start() { + void start() throws MqttException { + Future exceptionFuture = null; if (executorService == null) { - new Thread(this).start(); + exceptionFuture = Executors.newSingleThreadExecutor().submit(this); } else { - executorService.execute(this); + exceptionFuture = executorService.submit(this); + } + try { + Object g = exceptionFuture.get();; + } catch (Exception e) { + throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR); } } @@ -734,6 +747,7 @@ public void run() { //@TRACE 212=connect failed: unexpected exception log.fine(CLASS_NAME, methodName, "212", null, ex); mqttEx = ex; + } catch (Exception ex) { //@TRACE 209=connect failed: unexpected exception log.fine(CLASS_NAME, methodName, "209", null, ex); @@ -741,6 +755,8 @@ public void run() { } if (mqttEx != null) { + if(mqttEx.getReasonCode() == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) + throw new RuntimeException(mqttEx); shutdownConnection(conToken, mqttEx); } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java index a5c0a8c28..13807b45e 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java @@ -763,7 +763,11 @@ public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttA } comms.setNetworkModuleIndex(0); - connectActionListener.connect(); + try { + connectActionListener.connect(); + } catch (MqttException e) { + throw e; + } return userToken; } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java index 5e8740a99..ec1d83ab6 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java @@ -136,7 +136,7 @@ public void onFailure(IMqttToken token, Throwable exception) { try { connect(); } - catch (MqttPersistenceException e) { + catch (MqttException e) { onFailure(token, e); // try the next URI in the list } } @@ -166,7 +166,7 @@ public void onFailure(IMqttToken token, Throwable exception) { * Start the connect processing * @throws MqttPersistenceException if an error is thrown whilst setting up persistence */ - public void connect() throws MqttPersistenceException { + public void connect() throws MqttException { MqttToken token = new MqttToken(client.getClientId()); token.setActionCallback(this); token.setUserContext(this); @@ -185,6 +185,8 @@ public void connect() throws MqttPersistenceException { comms.connect(options, token); } catch (MqttException e) { + if(e.getReasonCode() == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) + throw e; onFailure(token, e); } }