Skip to content

Commit 66835df

Browse files
author
Ranjan Dasgupta
authored
Merge pull request #775 from 51systems/abstract-hrt
Implementation specific timer for ClientState
2 parents fb0abaa + 99eb959 commit 66835df

5 files changed

Lines changed: 208 additions & 12 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ public class ClientComms {
9595
* @param executorService the {@link ExecutorService}
9696
* @throws MqttException if an exception occurs whilst communicating with the server
9797
*/
98-
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender, ExecutorService executorService) throws MqttException {
98+
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender,
99+
ExecutorService executorService, HighResolutionTimer highResolutionTimer) throws MqttException {
99100
this.conState = DISCONNECTED;
100101
this.client = client;
101102
this.persistence = persistence;
@@ -105,7 +106,7 @@ public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, M
105106

106107
this.tokenStore = new CommsTokenStore(getClient().getClientId());
107108
this.callback = new CommsCallback(this);
108-
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);
109+
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender, highResolutionTimer);
109110

110111
callback.setClientState(clientState);
111112
log.setResourceName(getClient().getClientId());

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.eclipse.paho.client.mqttv3.internal.ConnectActionListener;
3434
import org.eclipse.paho.client.mqttv3.internal.DisconnectedMessageBuffer;
3535
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
36+
import org.eclipse.paho.client.mqttv3.internal.HighResolutionTimer;
37+
import org.eclipse.paho.client.mqttv3.internal.SystemHighResolutionTimer;
3638
import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
3739
import org.eclipse.paho.client.mqttv3.internal.NetworkModuleService;
3840
import org.eclipse.paho.client.mqttv3.internal.wire.MqttDisconnect;
@@ -433,6 +435,124 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
433435
*/
434436
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence,
435437
MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
438+
this(serverURI, clientId, persistence, pingSender, executorService, null);
439+
}
440+
/**
441+
* Create an MqttAsyncClient that is used to communicate with an MQTT
442+
* server.
443+
* <p>
444+
* The address of a server can be specified on the constructor.
445+
* Alternatively a list containing one or more servers can be specified
446+
* using the {@link MqttConnectOptions#setServerURIs(String[])
447+
* setServerURIs} method on MqttConnectOptions.
448+
*
449+
* <p>
450+
* The <code>serverURI</code> parameter is typically used with the the
451+
* <code>clientId</code> parameter to form a key. The key is used to store
452+
* and reference messages while they are being delivered. Hence the
453+
* serverURI specified on the constructor must still be specified even if a
454+
* list of servers is specified on an MqttConnectOptions object. The
455+
* serverURI on the constructor must remain the same across restarts of the
456+
* client for delivery of messages to be maintained from a given client to a
457+
* given server or set of servers.
458+
*
459+
* <p>
460+
* The address of the server to connect to is specified as a URI. Two types
461+
* of connection are supported <code>tcp://</code> for a TCP connection and
462+
* <code>ssl://</code> for a TCP connection secured by SSL/TLS. For example:
463+
* </p>
464+
* <ul>
465+
* <li><code>tcp://localhost:1883</code></li>
466+
* <li><code>ssl://localhost:8883</code></li>
467+
* </ul>
468+
* <p>
469+
* If the port is not specified, it will default to 1883 for
470+
* <code>tcp://</code>" URIs, and 8883 for <code>ssl://</code> URIs.
471+
* </p>
472+
*
473+
* <p>
474+
* A client identifier <code>clientId</code> must be specified and be less
475+
* that 65535 characters. It must be unique across all clients connecting to
476+
* the same server. The clientId is used by the server to store data related
477+
* to the client, hence it is important that the clientId remain the same
478+
* when connecting to a server if durable subscriptions or reliable
479+
* messaging are required.
480+
* <p>
481+
* A convenience method is provided to generate a random client id that
482+
* should satisfy this criteria - {@link #generateClientId()}. As the client
483+
* identifier is used by the server to identify a client when it reconnects,
484+
* the client must use the same identifier between connections if durable
485+
* subscriptions or reliable delivery of messages is required.
486+
* </p>
487+
* <p>
488+
* In Java SE, SSL can be configured in one of several ways, which the
489+
* client will use in the following order:
490+
* </p>
491+
* <ul>
492+
* <li><strong>Supplying an <code>SSLSocketFactory</code></strong> -
493+
* applications can use
494+
* {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply a
495+
* factory with the appropriate SSL settings.</li>
496+
* <li><strong>SSL Properties</strong> - applications can supply SSL
497+
* settings as a simple Java Properties using
498+
* {@link MqttConnectOptions#setSSLProperties(Properties)}.</li>
499+
* <li><strong>Use JVM settings</strong> - There are a number of standard
500+
* Java system properties that can be used to configure key and trust
501+
* stores.</li>
502+
* </ul>
503+
*
504+
* <p>
505+
* In Java ME, the platform settings are used for SSL connections.
506+
* </p>
507+
* <p>
508+
* A persistence mechanism is used to enable reliable messaging. For
509+
* messages sent at qualities of service (QoS) 1 or 2 to be reliably
510+
* delivered, messages must be stored (on both the client and server) until
511+
* the delivery of the message is complete. If messages are not safely
512+
* stored when being delivered then a failure in the client or server can
513+
* result in lost messages. A pluggable persistence mechanism is supported
514+
* via the {@link MqttClientPersistence} interface. An implementer of this
515+
* interface that safely stores messages must be specified in order for
516+
* delivery of messages to be reliable. In addition
517+
* {@link MqttConnectOptions#setCleanSession(boolean)} must be set to false.
518+
* In the event that only QoS 0 messages are sent or received or
519+
* cleanSession is set to true then a safe store is not needed.
520+
* </p>
521+
* <p>
522+
* An implementation of file-based persistence is provided in class
523+
* {@link MqttDefaultFilePersistence} which will work in all Java SE based
524+
* systems. If no persistence is needed, the persistence parameter can be
525+
* explicitly set to <code>null</code>.
526+
* </p>
527+
*
528+
* @param serverURI
529+
* the address of the server to connect to, specified as a URI.
530+
* Can be overridden using
531+
* {@link MqttConnectOptions#setServerURIs(String[])}
532+
* @param clientId
533+
* a client identifier that is unique on the server being
534+
* connected to
535+
* @param persistence
536+
* the persistence class to use to store in-flight message. If
537+
* null then the default persistence mechanism is used
538+
* @param pingSender
539+
* Custom {@link MqttPingSender} implementation.
540+
* @param executorService
541+
* used for managing threads. If null no executor service is used.
542+
* @param highResolutionTimer
543+
* used for providing time values for keepalive ping scheduling. If {@code null}, a default timer is used
544+
* @throws IllegalArgumentException
545+
* if the URI does not start with "tcp://", "ssl://" or
546+
* "local://"
547+
* @throws IllegalArgumentException
548+
* if the clientId is null or is greater than 65535 characters
549+
* in length
550+
* @throws MqttException
551+
* if any other problem was encountered
552+
*/
553+
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence,
554+
MqttPingSender pingSender, ScheduledExecutorService executorService,
555+
HighResolutionTimer highResolutionTimer) throws MqttException {
436556
final String methodName = "MqttAsyncClient";
437557

438558
log.setResourceName(clientId);
@@ -461,13 +581,17 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
461581
this.persistence = new MemoryPersistence();
462582
}
463583

584+
if (highResolutionTimer == null) {
585+
highResolutionTimer = new SystemHighResolutionTimer();
586+
}
587+
464588
this.executorService = executorService;
465589

466590
// @TRACE 101=<init> ClientID={0} ServerURI={1} PersistenceType={2}
467591
log.fine(CLASS_NAME, methodName, "101", new Object[] { clientId, serverURI, persistence });
468592

469593
this.persistence.open(clientId, serverURI);
470-
this.comms = new ClientComms(this, this.persistence, pingSender, this.executorService);
594+
this.comms = new ClientComms(this, this.persistence, pingSender, this.executorService, highResolutionTimer);
471595
this.persistence.close();
472596
this.topics = new Hashtable();
473597

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public class ClientState {
122122
private long keepAliveNanos; // nanoseconds time
123123
private boolean cleanSession;
124124
private MqttClientPersistence persistence;
125+
private HighResolutionTimer highResolutionTimer;
125126

126127
private int maxInflight = 0;
127128
private int actualInFlight = 0;
@@ -148,7 +149,8 @@ public class ClientState {
148149
private MqttPingSender pingSender = null;
149150

150151
protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore,
151-
CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException {
152+
CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender,
153+
HighResolutionTimer highResolutionTimer) throws MqttException {
152154

153155
log.setResourceName(clientComms.getClient().getClientId());
154156
log.finer(CLASS_NAME, "<Init>", "" );
@@ -168,6 +170,7 @@ protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenSt
168170
this.tokenStore = tokenStore;
169171
this.clientComms = clientComms;
170172
this.pingSender = pingSender;
173+
this.highResolutionTimer = highResolutionTimer;
171174

172175
restoreState();
173176
}
@@ -719,7 +722,7 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
719722
long nextPingTime = TimeUnit.NANOSECONDS.toMillis(this.keepAliveNanos); // milliseconds relative time
720723

721724
if (connected && this.keepAliveNanos > 0) {
722-
long time = System.nanoTime();
725+
long time = highResolutionTimer.nanoTime();
723726
// Below might not be necessary since move to nanoTime (Issue #278)
724727
//Reduce schedule frequency since System.currentTimeMillis is no accurate, add a buffer
725728
//It is 1/10 in minimum keepalive unit.
@@ -897,7 +900,7 @@ public void setKeepAliveInterval(long interval) {
897900
public void notifySentBytes(int sentBytesCount) {
898901
final String methodName = "notifySentBytes";
899902
if (sentBytesCount > 0) {
900-
this.lastOutboundActivity = System.nanoTime();
903+
this.lastOutboundActivity = highResolutionTimer.nanoTime();
901904
}
902905
// @TRACE 643=sent bytes count={0}
903906
log.fine(CLASS_NAME, methodName, "643", new Object[] {
@@ -912,7 +915,7 @@ public void notifySentBytes(int sentBytesCount) {
912915
protected void notifySent(MqttWireMessage message) {
913916
final String methodName = "notifySent";
914917

915-
this.lastOutboundActivity = System.nanoTime();
918+
this.lastOutboundActivity = highResolutionTimer.nanoTime();
916919
//@TRACE 625=key={0}
917920
log.fine(CLASS_NAME,methodName,"625",new Object[]{message.getKey()});
918921

@@ -924,7 +927,7 @@ protected void notifySent(MqttWireMessage message) {
924927
token.internalTok.notifySent();
925928
if (message instanceof MqttPingReq) {
926929
synchronized (pingOutstandingLock) {
927-
long time = System.nanoTime();
930+
long time = highResolutionTimer.nanoTime();
928931
synchronized (pingOutstandingLock) {
929932
lastPing = time;
930933
pingOutstanding++;
@@ -978,7 +981,7 @@ protected boolean checkQuiesceLock() {
978981
public void notifyReceivedBytes(int receivedBytesCount) {
979982
final String methodName = "notifyReceivedBytes";
980983
if (receivedBytesCount > 0) {
981-
this.lastInboundActivity = System.nanoTime();
984+
this.lastInboundActivity = highResolutionTimer.nanoTime();
982985
}
983986
// @TRACE 630=received bytes count={0}
984987
log.fine(CLASS_NAME, methodName, "630", new Object[] {
@@ -993,7 +996,7 @@ public void notifyReceivedBytes(int receivedBytesCount) {
993996
*/
994997
protected void notifyReceivedAck(MqttAck ack) throws MqttException {
995998
final String methodName = "notifyReceivedAck";
996-
this.lastInboundActivity = System.nanoTime();
999+
this.lastInboundActivity = highResolutionTimer.nanoTime();
9971000

9981001
// @TRACE 627=received key={0} message={1}
9991002
log.fine(CLASS_NAME, methodName, "627", new Object[] {
@@ -1075,7 +1078,7 @@ protected void notifyReceivedAck(MqttAck ack) throws MqttException {
10751078
*/
10761079
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
10771080
final String methodName = "notifyReceivedMsg";
1078-
this.lastInboundActivity = System.nanoTime();
1081+
this.lastInboundActivity = highResolutionTimer.nanoTime();
10791082

10801083
// @TRACE 651=received key={0} message={1}
10811084
log.fine(CLASS_NAME, methodName, "651", new Object[] {
@@ -1435,7 +1438,8 @@ protected void close() {
14351438
callback = null;
14361439
clientComms = null;
14371440
persistence = null;
1438-
pingCommand = null;
1441+
pingCommand = null;
1442+
highResolutionTimer = null;
14391443
}
14401444

14411445
public Properties getDebug() {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2009, 2018 IBM Corp.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* https://www.eclipse.org/legal/epl-2.0
10+
* and the Eclipse Distribution License is available at
11+
* https://www.eclipse.org/org/documents/edl-v10.php
12+
*
13+
* Contributors:
14+
* Dustin Thomson - initial API and implementation and/or initial documentation
15+
*/
16+
17+
package org.eclipse.paho.client.mqttv3.internal;
18+
19+
/**
20+
* A high-resolution timer source.
21+
*
22+
* Implementations must use clocks that are guaranteed to be monotonic and continue to run
23+
* even if the CPU enters a low-power state.
24+
*/
25+
public interface HighResolutionTimer {
26+
27+
/**
28+
* Returns the current value of a high-resolution time source, in nanoseconds.
29+
*
30+
* This method can only be used to measure elapsed time and may return negative values.
31+
*
32+
* @return the current value of a high-resolution time source, in nanoseconds.
33+
*/
34+
public long nanoTime();
35+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2009, 2018 IBM Corp.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v2.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* https://www.eclipse.org/legal/epl-2.0
10+
* and the Eclipse Distribution License is available at
11+
* https://www.eclipse.org/org/documents/edl-v10.php
12+
*
13+
* Contributors:
14+
* Dustin Thomson - initial API and implementation and/or initial documentation
15+
*/
16+
17+
package org.eclipse.paho.client.mqttv3.internal;
18+
19+
/**
20+
* A high resolution timer appropriate for use by most JVMs.
21+
*
22+
* This implementation delegates {@link #nanoTime()} to {@link System#nanoTime()}.
23+
*
24+
* Note: This implementation is not appropriate for use on Android, as the clock backing {@link System#nanoTime()} stops
25+
* when the system enters deep sleep.
26+
*/
27+
public class SystemHighResolutionTimer implements HighResolutionTimer {
28+
@Override
29+
public long nanoTime() {
30+
return System.nanoTime();
31+
}
32+
}

0 commit comments

Comments
 (0)