Skip to content

Commit bf2745b

Browse files
author
Ranjan Dasgupta
authored
Merge pull request #862 from hylkevds/issue861
Fixes #861: Perfomance degradation with increasing subscription count.
2 parents 6d36f4b + d4c5c1c commit bf2745b

1 file changed

Lines changed: 25 additions & 10 deletions

File tree

  • org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ public class CommsCallback implements Runnable {
5252
private static final int INBOUND_QUEUE_SIZE = 10;
5353
private MqttCallback mqttCallback;
5454
private MqttCallbackExtended reconnectInternalCallback;
55-
private Hashtable<String, IMqttMessageListener> callbacks; // topicFilter -> messageHandler
56-
private ClientComms clientComms;
55+
private final Hashtable<String, IMqttMessageListener> callbacksWildcards; // topicFilter with wildcards -> messageHandler
56+
private final Hashtable<String, IMqttMessageListener> callbacksDirect; // topicFilter without wildcards -> messageHandler
57+
private final ClientComms clientComms;
5758
private final Vector<MqttWireMessage> messageQueue;
5859
private final Vector<MqttToken> completeQueue;
5960

@@ -75,7 +76,8 @@ private enum State {STOPPED, RUNNING, QUIESCING}
7576
this.clientComms = clientComms;
7677
this.messageQueue = new Vector<MqttWireMessage>(INBOUND_QUEUE_SIZE);
7778
this.completeQueue = new Vector<MqttToken>(INBOUND_QUEUE_SIZE);
78-
this.callbacks = new Hashtable<String, IMqttMessageListener>();
79+
this.callbacksWildcards = new Hashtable<String, IMqttMessageListener>();
80+
this.callbacksDirect = new Hashtable<String, IMqttMessageListener>();
7981
log.setResourceName(clientComms.getClient().getClientId());
8082
}
8183

@@ -351,7 +353,7 @@ public void fireActionEvent(MqttToken token) {
351353
*/
352354
public void messageArrived(MqttPublish sendMessage) {
353355
final String methodName = "messageArrived";
354-
if (mqttCallback != null || callbacks.size() > 0) {
356+
if (mqttCallback != null || !callbacksWildcards.isEmpty() || !callbacksDirect.isEmpty()) {
355357
// If we already have enough messages queued up in memory, wait
356358
// until some more queue space becomes available. This helps
357359
// the client protect itself from getting flooded by messages
@@ -481,34 +483,47 @@ protected Thread getThread() {
481483

482484

483485
public void setMessageListener(String topicFilter, IMqttMessageListener messageListener) {
484-
this.callbacks.put(topicFilter, messageListener);
486+
if (topicFilter.contains("#") || topicFilter.contains("+")) {
487+
this.callbacksWildcards.put(topicFilter, messageListener);
488+
} else {
489+
this.callbacksDirect.put(topicFilter, messageListener);
490+
}
485491
}
486492

487493

488494
public void removeMessageListener(String topicFilter) {
489-
this.callbacks.remove(topicFilter); // no exception thrown if the filter was not present
495+
this.callbacksWildcards.remove(topicFilter); // no exception thrown if the filter was not present
496+
this.callbacksDirect.remove(topicFilter); // no exception thrown if the filter was not present
490497
}
491498

492499
public void removeMessageListeners() {
493-
this.callbacks.clear();
500+
this.callbacksWildcards.clear();
501+
this.callbacksDirect.clear();
494502
}
495503

496504

497505
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
498506
{
499507
boolean delivered = false;
500508

501-
Enumeration<String> keys = callbacks.keys();
509+
IMqttMessageListener callback = this.callbacksDirect.get(topicName);
510+
if (callback != null) {
511+
aMessage.setId(messageId);
512+
callback.messageArrived(topicName, aMessage);
513+
delivered = true;
514+
}
515+
516+
Enumeration<String> keys = callbacksWildcards.keys();
502517
while (keys.hasMoreElements()) {
503518
String topicFilter = (String)keys.nextElement();
504519
// callback may already have been removed in the meantime, so a null check is necessary
505-
IMqttMessageListener callback = callbacks.get(topicFilter);
520+
callback = callbacksWildcards.get(topicFilter);
506521
if(callback == null) {
507522
continue;
508523
}
509524
if (MqttTopic.isMatched(topicFilter, topicName)) {
510525
aMessage.setId(messageId);
511-
((IMqttMessageListener)callback).messageArrived(topicName, aMessage);
526+
callback.messageArrived(topicName, aMessage);
512527
delivered = true;
513528
}
514529
}

0 commit comments

Comments
 (0)