Skip to content

Commit acfd605

Browse files
committed
Fixes #861: Perfomance degradation with increasing subscription count.
Iterating over all subscriptions for each incoming message is really slow when there are many subscriptions. There can only ever be one Subscription with no wildcards that matches a message topic, so iterating over all such subscriptions is not needed. A simple map lookup is enough. Wildcard subscriptions do need to be iterated over, since there is no easy way to see which match and witch do not. Signed-off-by: Hylke van der Schaaf <hylke.vds@gmail.com>
1 parent 240f82f commit acfd605

1 file changed

Lines changed: 25 additions & 10 deletions

File tree

  • org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ public class CommsCallback implements Runnable {
5252
private static final int INBOUND_QUEUE_SIZE = 10;
5353
private MqttCallback mqttCallback;
5454
private MqttCallbackExtended reconnectInternalCallback;
55-
private Hashtable<String, IMqttMessageListener> callbacks; // topicFilter -> messageHandler
56-
private ClientComms clientComms;
55+
private final Hashtable<String, IMqttMessageListener> callbacksWildcards; // topicFilter with wildcards -> messageHandler
56+
private final Hashtable<String, IMqttMessageListener> callbacksDirect; // topicFilter without wildcards -> messageHandler
57+
private final ClientComms clientComms;
5758
private final Vector<MqttWireMessage> messageQueue;
5859
private final Vector<MqttToken> completeQueue;
5960

@@ -75,7 +76,8 @@ private enum State {STOPPED, RUNNING, QUIESCING}
7576
this.clientComms = clientComms;
7677
this.messageQueue = new Vector<MqttWireMessage>(INBOUND_QUEUE_SIZE);
7778
this.completeQueue = new Vector<MqttToken>(INBOUND_QUEUE_SIZE);
78-
this.callbacks = new Hashtable<String, IMqttMessageListener>();
79+
this.callbacksWildcards = new Hashtable<String, IMqttMessageListener>();
80+
this.callbacksDirect = new Hashtable<String, IMqttMessageListener>();
7981
log.setResourceName(clientComms.getClient().getClientId());
8082
}
8183

@@ -351,7 +353,7 @@ public void fireActionEvent(MqttToken token) {
351353
*/
352354
public void messageArrived(MqttPublish sendMessage) {
353355
final String methodName = "messageArrived";
354-
if (mqttCallback != null || callbacks.size() > 0) {
356+
if (mqttCallback != null || !callbacksWildcards.isEmpty() || !callbacksDirect.isEmpty()) {
355357
// If we already have enough messages queued up in memory, wait
356358
// until some more queue space becomes available. This helps
357359
// the client protect itself from getting flooded by messages
@@ -481,34 +483,47 @@ protected Thread getThread() {
481483

482484

483485
public void setMessageListener(String topicFilter, IMqttMessageListener messageListener) {
484-
this.callbacks.put(topicFilter, messageListener);
486+
if (topicFilter.contains("#") || topicFilter.contains("+")) {
487+
this.callbacksWildcards.put(topicFilter, messageListener);
488+
} else {
489+
this.callbacksDirect.put(topicFilter, messageListener);
490+
}
485491
}
486492

487493

488494
public void removeMessageListener(String topicFilter) {
489-
this.callbacks.remove(topicFilter); // no exception thrown if the filter was not present
495+
this.callbacksWildcards.remove(topicFilter); // no exception thrown if the filter was not present
496+
this.callbacksDirect.remove(topicFilter); // no exception thrown if the filter was not present
490497
}
491498

492499
public void removeMessageListeners() {
493-
this.callbacks.clear();
500+
this.callbacksWildcards.clear();
501+
this.callbacksWildcards.clear();
494502
}
495503

496504

497505
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
498506
{
499507
boolean delivered = false;
500508

501-
Enumeration<String> keys = callbacks.keys();
509+
IMqttMessageListener callback = this.callbacksDirect.get(topicName);
510+
if (callback != null) {
511+
aMessage.setId(messageId);
512+
callback.messageArrived(topicName, aMessage);
513+
delivered = true;
514+
}
515+
516+
Enumeration<String> keys = callbacksWildcards.keys();
502517
while (keys.hasMoreElements()) {
503518
String topicFilter = (String)keys.nextElement();
504519
// callback may already have been removed in the meantime, so a null check is necessary
505-
IMqttMessageListener callback = callbacks.get(topicFilter);
520+
callback = callbacksWildcards.get(topicFilter);
506521
if(callback == null) {
507522
continue;
508523
}
509524
if (MqttTopic.isMatched(topicFilter, topicName)) {
510525
aMessage.setId(messageId);
511-
((IMqttMessageListener)callback).messageArrived(topicName, aMessage);
526+
callback.messageArrived(topicName, aMessage);
512527
delivered = true;
513528
}
514529
}

0 commit comments

Comments
 (0)