Skip to content

Commit 783802c

Browse files
redboltzjpwsutton
authored andcommitted
Added removing message function. (#396)
Signed-off-by: Takatoshi Kondo <redboltz@gmail.com>
1 parent 26f7bd7 commit 783802c

5 files changed

Lines changed: 93 additions & 4 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncCallbackTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,18 @@ public MqttMessage getNextMessage() {
153153

154154
public void messageArrived(String topic, MqttMessage message)
155155
throws Exception {
156-
157-
log.info("message arrived: '" + new String(message.getPayload())
156+
String msgstr = new String(message.getPayload());
157+
log.info("message arrived: '" + msgstr
158158
+ "' " + this.hashCode() + " "
159159
+ (message.isDuplicate() ? "duplicate" : ""));
160160

161161
if (!message.isDuplicate()) {
162162
synchronized (messages) {
163-
messages.add(message);
164-
messages.notifyAll();
163+
if (!msgstr.equals("might cancel")) {
164+
log.info("add message");
165+
messages.add(message);
166+
messages.notifyAll();
167+
}
165168
}
166169
}
167170
}
@@ -190,6 +193,16 @@ public void onSuccess(IMqttToken token) {
190193
token.getClient().publish(topicFilter, "my data".getBytes(), 2, false, null, myOnPublish);
191194
}
192195
else {
196+
IMqttDeliveryToken tokenToRemove1 = token.getClient().publish(topicFilter, "might cancel".getBytes(), 1, false, null, myOnPublish);
197+
boolean res1_1 = token.getClient().removeMessage(tokenToRemove1);
198+
Assert.assertTrue("message (QoS1) removed", res1_1);
199+
boolean res1_2 = token.getClient().removeMessage(tokenToRemove1);
200+
Assert.assertFalse("already removed message (QoS1) shoudn't be removed", res1_2);
201+
IMqttDeliveryToken tokenToRemove2 = token.getClient().publish(topicFilter, "might cancel".getBytes(), 2, false, null, myOnPublish);
202+
boolean res2_1 = token.getClient().removeMessage(tokenToRemove2);
203+
Assert.assertTrue("message (QoS2) removed", res2_1);
204+
boolean res2_2 = token.getClient().removeMessage(tokenToRemove2);
205+
Assert.assertFalse("already removed message (QoS2) shoudn't be removed", res2_2);
193206
log.info(methodName + ": all messages published");
194207
testFinished = true;
195208
}

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
3030
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
3131
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
32+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
3233
import org.eclipse.paho.client.mqttv3.MqttCallback;
3334
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
3435
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
@@ -200,6 +201,17 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
200201
}
201202
}
202203

204+
/**
205+
* Removes the message corresponding to the token from the outbound queue and persistence.
206+
* @param token The {@link IMqttDeliveryMqttToken} to remove
207+
* @return if the message is removed, then true, otherwise false
208+
* @throws MqttException if an error occurs sending the message
209+
*/
210+
public boolean removeMessage(IMqttDeliveryToken token) throws MqttException {
211+
final String methodName = "removeMessage";
212+
return this.clientState.removeMessage(token);
213+
}
214+
203215
/**
204216
* Close and tidy up.
205217
*

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttAsyncClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,22 @@ public IMqttToken unsubscribe(String topicFilter, Object userContext, IMqttActio
809809
public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttActionListener callback)
810810
throws MqttException;
811811

812+
/**
813+
* Removes a published message corresponding to the token.
814+
* <p>If a publish is requested with QoS1 or Qos2 and the publish callback is
815+
* not called yet, this function returns true, the publish called will never
816+
* be called, and a messageId corresponding to the token will become reusable.
817+
* </p>
818+
* <p>If the publish callback is already be called, this function returns false.
819+
* </p>
820+
* <p>This function might not stop sending the published message.
821+
* </p>
822+
* *
823+
* @param token the token of removing published message
824+
* @return if the message is removed then true, otherwise false
825+
* @throws MqttException if there was an error removing the message.
826+
*/
827+
public boolean removeMessage(IMqttDeliveryToken token) throws MqttException;
812828

813829
/**
814830
* Sets a callback listener to use for events that happen asynchronously.

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,6 +1237,15 @@ public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttAc
12371237
return token;
12381238
}
12391239

1240+
/*
1241+
* (non-Javadoc)
1242+
*
1243+
* @see IMqttAsyncClient#removeMessage(IMqttDeliveryToken)
1244+
*/
1245+
public boolean removeMessage(IMqttDeliveryToken token) throws MqttException {
1246+
return comms.removeMessage(token);
1247+
}
1248+
12401249
/*
12411250
* (non-Javadoc)
12421251
*

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Vector;
2929

3030
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
31+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
3132
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
3233
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
3334
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -191,6 +192,10 @@ private String getSendPersistenceKey(MqttWireMessage message) {
191192
return PERSISTENCE_SENT_PREFIX + message.getMessageId();
192193
}
193194

195+
private String getSendPersistenceKey(int messageId) {
196+
return PERSISTENCE_SENT_PREFIX + messageId;
197+
}
198+
194199
private String getSendConfirmPersistenceKey(MqttWireMessage message) {
195200
return PERSISTENCE_CONFIRMED_PREFIX + message.getMessageId();
196201
}
@@ -641,6 +646,39 @@ protected void undo(MqttPublish message) throws MqttPersistenceException {
641646
}
642647
}
643648

649+
/**
650+
* This removes the MqttSend message from the outbound queue and persistence.
651+
* @param message The {@link MqttPublish} message to remove
652+
* @return if the message is removed, then true, otherwise false
653+
* @throws MqttException if an exception occurs whilst removing the message
654+
*/
655+
protected boolean removeMessage(IMqttDeliveryToken token) throws MqttException {
656+
final String methodName = "removeMessage";
657+
MqttMessage message = token.getMessage();
658+
int messageId = token.getMessageId();
659+
boolean result = false;
660+
synchronized (queueLock) {
661+
if (message.getQos() == 1) {
662+
if (outboundQoS1.remove(new Integer(messageId)) != null) {
663+
result = true;
664+
}
665+
}
666+
if (message.getQos() == 2) {
667+
if (outboundQoS2.remove(new Integer(messageId)) != null) {
668+
result = true;
669+
}
670+
}
671+
if (pendingMessages.removeElement(message)) {
672+
result = true;
673+
}
674+
persistence.remove(getSendPersistenceKey(messageId));
675+
String key = new Integer(messageId).toString();
676+
tokenStore.removeToken(key);
677+
releaseMessageId(messageId);
678+
}
679+
return result;
680+
}
681+
644682
/**
645683
* Check and send a ping if needed and check for ping timeout.
646684
* Need to send a ping if nothing has been sent or received
@@ -861,6 +899,7 @@ protected void notifySent(MqttWireMessage message) {
861899
log.fine(CLASS_NAME,methodName,"625",new Object[]{message.getKey()});
862900

863901
MqttToken token = tokenStore.getToken(message);
902+
if (token == null) return;
864903
token.internalTok.notifySent();
865904
if (message instanceof MqttPingReq) {
866905
synchronized (pingOutstandingLock) {

0 commit comments

Comments
 (0)