Skip to content

Commit 8bb476e

Browse files
committed
Separating Incoming and Outgoing Topic Aliases and handling protocol errors a better
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent effca14 commit 8bb476e

7 files changed

Lines changed: 63 additions & 39 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,8 @@ public IMqttToken connect(MqttConnectionOptions options, Object userContext, Mqt
890890
if(this.connOpts.isCleanSession()) {
891891
this.mqttSession.clearSession();
892892
}
893+
894+
this.mqttSession.setIncomingTopicAliasMax(this.connOpts.getTopicAliasMaximum());
893895

894896
comms.setNetworkModuleIndex(0);
895897
connectActionListener.connect();

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttClientException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ public class MqttClientException {
1010
*/
1111
public static final short REASON_CODE_INVALID_TOPIC_ALAS = 32301;
1212

13+
/**
14+
* The Server sent a publish message with an unknown topic alias and no topic string.
15+
*/
16+
public static final short REASON_CODE_UNKNOWN_TOPIC_ALIAS = 32302;
17+
1318
/**
1419
* Client timed out while waiting for a response from the server. The server is
1520
* no longer responding to keep-alive messages.

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -513,14 +513,14 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
513513
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
514514
if (message instanceof MqttPublish && (((MqttPublish) message).getMessage().getQos() != 0)) {
515515
message.setMessageId(getNextMessageId());
516-
if (this.mqttSession.getTopicAliasMaximum() > 0) {
516+
if (this.mqttSession.getOutgoingTopicAliasMaximum() > 0) {
517517
String topic = ((MqttPublish) message).getTopicName();
518518
if (outgoingTopicAliases.containsKey(topic)) {
519519
// Existing Topic Alias, Assign it and remove the topic string
520520
((MqttPublish) message).getProperties().setTopicAlias(outgoingTopicAliases.get(topic));
521521
((MqttPublish) message).setTopicName(null);
522522
} else {
523-
if (outgoingTopicAliasCount <= this.mqttSession.getTopicAliasMaximum()) {
523+
if (outgoingTopicAliasCount <= this.mqttSession.getOutgoingTopicAliasMaximum()) {
524524
// Create a new Topic Alias and increment the counter
525525
((MqttPublish) message).getProperties().setTopicAlias(outgoingTopicAliasCount);
526526
outgoingTopicAliases.put(((MqttPublish) message).getTopicName(), outgoingTopicAliasCount);
@@ -1117,41 +1117,44 @@ protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
11171117
if (!quiescing) {
11181118
if (message instanceof MqttPublish) {
11191119
MqttPublish send = (MqttPublish) message;
1120-
// If using Topic Aliases, restore the Topic Name or create the new Alias
1121-
if (send.getTopicName() != null && send.getProperties().getTopicAlias() != 0) {
1122-
// We've been sent an new topic alias
1123-
1124-
// Do we have space for this alias? / Are aliases enabled?
1125-
if (incomingTopicAliases.size() < clientComms.getConOptions().getTopicAliasMaximum()
1126-
&& clientComms.getConOptions().getTopicAliasMaximum() > 0) {
1120+
1121+
// Do we have an incoming topic Alias?
1122+
if(send.getProperties().getTopicAlias() != null && send.getProperties().getTopicAlias() != 0) {
1123+
int incomingTopicAlias = send.getProperties().getTopicAlias();
1124+
1125+
// Are incoming Topic Aliases enabled / is it a valid Alias?
1126+
if(incomingTopicAlias > this.mqttSession.getIncomingTopicAliasMax() || incomingTopicAlias == 0) {
1127+
// @TRACE 653=Invalid Topic Alias: topicAliasMax={0}, publishTopicAlias={1}
1128+
log.severe(CLASS_NAME, methodName, "653",
1129+
new Object[] { Integer.valueOf(this.mqttSession.getIncomingTopicAliasMax()),
1130+
Integer.valueOf(incomingTopicAlias) });
1131+
if (callback != null) {
1132+
callback.mqttErrorOccured(new MqttException(MqttException.REASON_CODE_INVALID_TOPIC_ALAS));
1133+
}
1134+
throw new MqttException(MqttClientException.REASON_CODE_INVALID_TOPIC_ALAS);
1135+
1136+
}
1137+
1138+
// Is this alias being sent with a topic string?
1139+
if(send.getTopicName() != null) {
11271140
// @TRACE 652=Setting Incoming New Topic Alias alias={0}, topicName={1}
11281141
log.fine(CLASS_NAME, methodName, "652",
11291142
new Object[] { Integer.valueOf(send.getProperties().getTopicAlias()), send.getTopicName() });
11301143
incomingTopicAliases.put(send.getProperties().getTopicAlias(), send.getTopicName());
11311144
} else {
1132-
// @TRACE 653=Invalid Topic Alias: topicAliasMax={0}, publishTopicAlias={1}
1133-
log.severe(CLASS_NAME, methodName, "653",
1134-
new Object[] { Integer.valueOf(clientComms.getConOptions().getTopicAliasMaximum()),
1135-
Integer.valueOf(send.getProperties().getTopicAlias()) });
1136-
throw new MqttException(MqttClientException.REASON_CODE_INVALID_TOPIC_ALAS);
1137-
}
1138-
} else if (send.getTopicName() == null && send.getProperties().getTopicAlias() != 0) {
1139-
// We've been sent an existing topic alias
1140-
if (incomingTopicAliases.get(send.getProperties().getTopicAlias()) != null) {
1141-
send.setTopicName(incomingTopicAliases.get(send.getProperties().getTopicAlias()));
1142-
}
1143-
} else if ((send.getTopicName() == null || send.getTopicName().length() == 0)
1144-
&& (send.getProperties().getTopicAlias() == 0
1145-
|| send.getProperties().getTopicAlias() > clientComms.getConOptions().getTopicAliasMaximum())) {
1146-
// No Topic String provided, topic alias is invalid
1147-
// @TRACE 653=Invalid Topic Alias: topicAliasMax={0}, publishTopicAlias={1}
1148-
log.fine(CLASS_NAME, methodName, "653",
1149-
new Object[] { Integer.valueOf(clientComms.getConOptions().getTopicAliasMaximum()),
1150-
Integer.valueOf(send.getProperties().getTopicAlias()) });
1151-
if (callback != null) {
1152-
callback.mqttErrorOccured(new MqttException(MqttException.REASON_CODE_INVALID_TOPIC_ALAS));
1145+
// No Topic String, so must be in incomingTopicAliases.
1146+
if(incomingTopicAliases.contains(incomingTopicAlias)) {
1147+
send.setTopicName(incomingTopicAliases.get(incomingTopicAlias));
1148+
} else {
1149+
// @TRACE 654=Unknown Topic Alias: Incoming Alias={1}
1150+
log.severe(CLASS_NAME, methodName, "654",
1151+
new Object[] { Integer.valueOf(send.getProperties().getTopicAlias()) });
1152+
throw new MqttException(MqttClientException.REASON_CODE_UNKNOWN_TOPIC_ALIAS);
1153+
}
11531154
}
11541155
}
1156+
1157+
11551158
switch (send.getMessage().getQos()) {
11561159
case 0:
11571160
case 1:

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ConnectActionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void onSuccess(IMqttToken token) {
114114
mqttSession.setMaximumQoS(myToken.getMessageProperties().getMaximumQoS());
115115
mqttSession.setRetainAvailable(myToken.getMessageProperties().isRetainAvailable());
116116
mqttSession.setMaximumPacketSize(myToken.getMessageProperties().getMaximumPacketSize());
117-
mqttSession.setTopicAliasMaximum(myToken.getMessageProperties().getTopicAliasMaximum());
117+
mqttSession.setOutgoingTopicAliasMaximum(myToken.getMessageProperties().getTopicAliasMaximum());
118118
mqttSession
119119
.setWildcardSubscriptionsAvailable(myToken.getMessageProperties().isWildcardSubscriptionsAvailable());
120120
mqttSession.setSubscriptionIdentifiersAvailable(

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttSession.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public class MqttSession {
2929
private Integer maximumQoS = 2;
3030
private Boolean retainAvailable = true;
3131
private Long maximumPacketSize = -1L;
32-
private Integer topicAliasMaximum = 0;
32+
private Integer outgoingTopicAliasMaximum = 0;
33+
private Integer incomingTopicAliasMax = 0;
3334
private Boolean wildcardSubscriptionsAvailable = true;
3435
private Boolean subscriptionIdentifiersAvailable = true;
3536
private Boolean sharedSubscriptionsAvailable = true;
@@ -81,12 +82,12 @@ public void setMaximumPacketSize(Long maximumPacketSize) {
8182
this.maximumPacketSize = maximumPacketSize;
8283
}
8384

84-
public Integer getTopicAliasMaximum() {
85-
return topicAliasMaximum;
85+
public Integer getOutgoingTopicAliasMaximum() {
86+
return outgoingTopicAliasMaximum;
8687
}
8788

88-
public void setTopicAliasMaximum(Integer topicAliasMaximum) {
89-
this.topicAliasMaximum = topicAliasMaximum;
89+
public void setOutgoingTopicAliasMaximum(Integer topicAliasMaximum) {
90+
this.outgoingTopicAliasMaximum = topicAliasMaximum;
9091
}
9192

9293
public Boolean isWildcardSubscriptionsAvailable() {
@@ -126,4 +127,14 @@ public void setClientId(String clientId) {
126127
this.clientId = clientId;
127128
}
128129

130+
131+
public Integer getIncomingTopicAliasMax() {
132+
return incomingTopicAliasMax;
133+
}
134+
135+
136+
public void setIncomingTopicAliasMax(Integer incomingTopicAliasMax) {
137+
this.incomingTopicAliasMax = incomingTopicAliasMax;
138+
}
139+
129140
}

org.eclipse.paho.mqttv5.client/src/main/resources/org/eclipse/paho/mqttv5/client/internal/nls/logcat.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
651=received key={0} message={1}
135135
652=Setting Incoming New Topic Alias alias={0}, topicName={1}
136136
653=Invalid Topic Alias: topicAliasMax={0}, publishTopicAlias={1}
137+
654=Unknown Topic Alias: Incoming Alias={1}
137138
659=start timer for client:{0}
138139
660=Check schedule at {0}
139140
661=stop

org.eclipse.paho.mqttv5.testclient/src/main/java/org/eclipse/paho/mqttv5/testclient/V5Client.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public V5Client() throws InterruptedException {
3636
MqttConnectionOptionsBuilder conOptsBuilder = new MqttConnectionOptionsBuilder();
3737
MqttConnectionOptions conOpts = conOptsBuilder.serverURI(broker).cleanSession(true)
3838
.sessionExpiryInterval(120L).automaticReconnect(true)
39-
.will(topic, new MqttMessage(willContent.getBytes(), qos, false, null)).topicAliasMaximum(1000).build();
39+
.topicAliasMaximum(0)
40+
.will(topic, new MqttMessage(willContent.getBytes(), qos, false, null)).build();
4041
asyncClient.setCallback(this);
4142

4243

@@ -135,18 +136,19 @@ public void disconnected(MqttDisconnectResponse disconnectResponse) {
135136

136137
@Override
137138
public void mqttErrorOccured(MqttException exception) {
138-
// TODO Auto-generated method stub
139+
System.out.println("An exception occured in the MQTT Client: " + exception.getMessage());
139140

140141
}
141142

142143
@Override
143144
public void connectComplete(boolean reconnect, String serverURI) {
145+
System.out.println("Client successfully connected, reconnect: " + reconnect + ", URI: " + serverURI);
144146

145147
}
146148

147149
@Override
148150
public void authPacketArrived(int reasonCode, MqttProperties properties) {
149-
151+
System.out.println("An auth packet was recieved");
150152
}
151153

152154

0 commit comments

Comments
 (0)