Skip to content

Commit 70b6398

Browse files
committed
Fixed #1820: Improved internal message bus performance
1 parent 2c593b6 commit 70b6398

2 files changed

Lines changed: 13 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* Fixed unknown version giving 200 instead of 404.
3737
* Fixed JSON-Patch that changed nothing giving an error back.
3838
* Fixed memory leak when creating Entities using MQTT.
39+
* Improved internal message bus when MQTT is not in use.
3940

4041

4142
## Release version 2.1.0

FROST-Server.Core/src/main/java/de/fraunhofer/iosb/ilt/frostserver/messagebus/InternalMessageBus.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package de.fraunhofer.iosb.ilt.frostserver.messagebus;
1919

20+
import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_BUS;
21+
2022
import de.fraunhofer.iosb.ilt.frostserver.model.EntityChangedMessage;
2123
import de.fraunhofer.iosb.ilt.frostserver.settings.BusSettings;
2224
import de.fraunhofer.iosb.ilt.frostserver.settings.ConfigDefaults;
@@ -53,13 +55,15 @@ public class InternalMessageBus implements MessageBus, ConfigDefaults {
5355
private BlockingQueue<EntityChangedMessage> entityChangedMessageQueue;
5456
private ExecutorService entityChangedExecutorService;
5557
private final List<MessageListener> listeners = new CopyOnWriteArrayList<>();
58+
private int poolSize;
59+
private int queueSize;
5660

5761
@Override
5862
public void init(CoreSettings settings) {
5963
BusSettings busSettings = settings.getBusSettings();
6064
Settings customSettings = busSettings.getCustomSettings();
61-
int poolSize = customSettings.getInt(TAG_WORKER_COUNT, defaultValueInt(TAG_WORKER_COUNT));
62-
int queueSize = customSettings.getInt(TAG_QUEUE_SIZE, defaultValueInt(TAG_QUEUE_SIZE));
65+
poolSize = customSettings.getInt(TAG_WORKER_COUNT, defaultValueInt(TAG_WORKER_COUNT));
66+
queueSize = customSettings.getInt(TAG_QUEUE_SIZE, defaultValueInt(TAG_QUEUE_SIZE));
6367

6468
entityChangedMessageQueue = new ArrayBlockingQueue<>(queueSize);
6569
entityChangedExecutorService = ProcessorHelper.createProcessors(
@@ -86,8 +90,13 @@ public void stop() {
8690

8791
@Override
8892
public void sendMessage(EntityChangedMessage message) {
93+
if (listeners.isEmpty()) {
94+
// No listeners, no point in doing anything.
95+
return;
96+
}
8997
if (!entityChangedMessageQueue.offer(message)) {
90-
LOGGER.error("Failed to add message to queue. Increase the queue size to allow a bigger buffer, or increase the worker pool size to empty the buffer quicker.");
98+
LOGGER.error("Failed to add message to message bus. Increase {}{} (currently {}) to allow a bigger buffer, or increase {}{} (currently {}) to empty the buffer quicker.",
99+
PREFIX_BUS, TAG_QUEUE_SIZE, queueSize, PREFIX_BUS, TAG_WORKER_COUNT, poolSize);
91100
}
92101
}
93102

0 commit comments

Comments
 (0)