From 97555590fc5f7a8fd7c845d2955785f712a1731e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=B5=9C=EC=9D=80=EA=B5=90?= Date: Tue, 21 Nov 2023 19:11:49 +0900 Subject: [PATCH 1/2] fix : modify to throw MqttException if MqttAsyncClient.connect() is called when there is no message broker (message broker service is down due to a failure, or address is incorrect...) For users who use the library, debugging is very difficult unless exceptions are thrown in such cases. Currently, if the message broker attempts to connect to a situation that does not exist, there is a risk of performing the next logic without any exceptions being returned. Of course, the getException() of the IMqttToken instance returned to the connect() call can be checked for an Exception, but it is difficult to use because it is not synchronized. I think it's right to make an exception in this particular case. --- .../client/mqttv3/internal/ClientComms.java | 36 +++++++++++++------ .../paho/client/mqttv3/MqttAsyncClient.java | 6 +++- .../internal/ConnectActionListener.java | 6 ++-- 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java b/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java index b0c433498..6de97e922 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java @@ -22,8 +22,7 @@ import java.util.Enumeration; import java.util.Properties; import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.eclipse.paho.client.mqttv3.BufferedMessage; import org.eclipse.paho.client.mqttv3.IMqttActionListener; @@ -271,9 +270,8 @@ public void close(boolean force) throws MqttException { * @param token The {@link MqttToken} to track the connection * @throws MqttException if an error occurs when connecting */ - public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { + public synchronized void connect(MqttConnectOptions options, MqttToken token) throws MqttException { final String methodName = "connect"; - synchronized (conLock) { if (isDisconnected() && !closePending) { //@TRACE 214=state=CONNECTING log.fine(CLASS_NAME,methodName,"214"); @@ -297,7 +295,11 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce tokenStore.open(); ConnectBG conbg = new ConnectBG(this, token, connect, executorService); - conbg.start(); + try { + conbg.start(); + } catch (MqttException e){ + throw e; + } } else { // @TRACE 207=connect failed: not disconnected {0} @@ -312,7 +314,6 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); } } - } } public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException { @@ -400,7 +401,8 @@ public void shutdownConnection(MqttToken token, MqttException reason) { // Clean session handling and tidy up clientState.disconnected(reason); if (clientState.getCleanSession()) - callback.removeMessageListeners(); + + callback.removeMessageListeners(); }catch(Exception ex) { // Ignore as we are shutting down } @@ -417,8 +419,10 @@ public void shutdownConnection(MqttToken token, MqttException reason) { } }catch(Exception ex) { + // Ignore as we are shutting down } + // All disconnect logic has been completed allowing the // client to be marked as disconnected. synchronized(conLock) { @@ -427,6 +431,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) { conState = DISCONNECTED; stoppingComms = false; + } // Internal disconnect processing has completed. If there @@ -436,6 +441,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) { // any outstanding tokens and unblock any waiters if (endToken != null && callback != null) { callback.asyncOperationComplete(endToken); + } if (wasConnected && callback != null) { @@ -449,6 +455,7 @@ public void shutdownConnection(MqttToken token, MqttException reason) { try { close(true); } catch (Exception e) { // ignore any errors as closing + } } } @@ -692,11 +699,17 @@ private class ConnectBG implements Runnable { threadName = "MQTT Con: "+getClient().getClientId(); } - void start() { + void start() throws MqttException { + Future test = null; if (executorService == null) { - new Thread(this).start(); + test = Executors.newSingleThreadExecutor().submit(this); } else { - executorService.execute(this); + test = executorService.submit(this); + } + try { + Object g = test.get();; + } catch (Exception e) { + throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR); } } @@ -734,6 +747,7 @@ public void run() { //@TRACE 212=connect failed: unexpected exception log.fine(CLASS_NAME, methodName, "212", null, ex); mqttEx = ex; + } catch (Exception ex) { //@TRACE 209=connect failed: unexpected exception log.fine(CLASS_NAME, methodName, "209", null, ex); @@ -741,6 +755,8 @@ public void run() { } if (mqttEx != null) { + if(mqttEx.getReasonCode() == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) + throw new RuntimeException(mqttEx); shutdownConnection(conToken, mqttEx); } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java index a5c0a8c28..13807b45e 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java @@ -763,7 +763,11 @@ public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttA } comms.setNetworkModuleIndex(0); - connectActionListener.connect(); + try { + connectActionListener.connect(); + } catch (MqttException e) { + throw e; + } return userToken; } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java index 5e8740a99..ec1d83ab6 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ConnectActionListener.java @@ -136,7 +136,7 @@ public void onFailure(IMqttToken token, Throwable exception) { try { connect(); } - catch (MqttPersistenceException e) { + catch (MqttException e) { onFailure(token, e); // try the next URI in the list } } @@ -166,7 +166,7 @@ public void onFailure(IMqttToken token, Throwable exception) { * Start the connect processing * @throws MqttPersistenceException if an error is thrown whilst setting up persistence */ - public void connect() throws MqttPersistenceException { + public void connect() throws MqttException { MqttToken token = new MqttToken(client.getClientId()); token.setActionCallback(this); token.setUserContext(this); @@ -185,6 +185,8 @@ public void connect() throws MqttPersistenceException { comms.connect(options, token); } catch (MqttException e) { + if(e.getReasonCode() == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) + throw e; onFailure(token, e); } } From eece5838610006d64f196c79146311560f36f0b4 Mon Sep 17 00:00:00 2001 From: ek Date: Tue, 21 Nov 2023 19:30:44 +0900 Subject: [PATCH 2/2] fix : modify to throw MqttException if MqttAsyncClient.connect() is called when there is no message broker (message broker service is down due to a failure, or address is incorrect...) For users who use the library, debugging is very difficult unless exceptions are thrown in such cases. Currently, if the message broker attempts to connect to a situation that does not exist, there is a risk of performing the next logic without any exceptions being returned. Of course, the getException() of the IMqttToken instance returned to the connect() call can be checked for an Exception, but it is difficult to use because it is not synchronized. I think it's right to make an exception in this particular case. --- .../eclipse/paho/client/mqttv3/internal/ClientComms.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java b/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java index 6de97e922..d0248c592 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java @@ -700,14 +700,14 @@ private class ConnectBG implements Runnable { } void start() throws MqttException { - Future test = null; + Future exceptionFuture = null; if (executorService == null) { - test = Executors.newSingleThreadExecutor().submit(this); + exceptionFuture = Executors.newSingleThreadExecutor().submit(this); } else { - test = executorService.submit(this); + exceptionFuture = executorService.submit(this); } try { - Object g = test.get();; + Object g = exceptionFuture.get();; } catch (Exception e) { throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR); }