@@ -15,8 +15,10 @@ public class EasyModbus2Mqtt
1515 private ModbusClient modbusClient = new ModbusClient ( ) ;
1616 List < ReadOrder > readOrders = new List < ReadOrder > ( ) ;
1717 string mqttBrokerAddress = "www.mqtt-dashboard.com" ;
18+ int mqttBrokerPort = 1883 ;
1819 string mqttRootTopic = "easymodbusclient" ;
1920 uPLibrary . Networking . M2Mqtt . MqttClient mqttClient ;
21+ public bool AutomaticReconnect { get ; set ; } = true ;
2022
2123
2224 public EasyModbus2Mqtt ( )
@@ -114,14 +116,16 @@ public void AddReadOrder(FunctionCode functionCode, int quantity, int startingAd
114116
115117 public void start ( )
116118 {
119+
117120 this . shouldStop = false ;
118121 if ( mqttBrokerAddress == null )
119122 throw new ArgumentOutOfRangeException ( "Mqtt Broker Address not initialized" ) ;
123+ mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress , mqttBrokerPort , false , null , null , uPLibrary . Networking . M2Mqtt . MqttSslProtocols . None ) ;
124+
125+ string clientID = new Guid ( ) . ToString ( ) ;
126+ mqttClient . Connect ( clientID ) ;
120127 if ( ! modbusClient . Connected )
121128 modbusClient . Connect ( ) ;
122- mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress ) ;
123- string clientID = new Guid ( ) . ToString ( ) ;
124- mqttClient . Connect ( clientID ) ;
125129 for ( int i = 0 ; i < readOrders . Count ; i ++ )
126130 {
127131 readOrders [ i ] . thread = new System . Threading . Thread ( new ParameterizedThreadStart ( ProcessData ) ) ;
@@ -137,7 +141,7 @@ public void publish(string[] topic, string[] payload, string mqttBrokerAddress)
137141 mqttClient . Disconnect ( ) ;
138142 if ( topic . Length != payload . Length )
139143 throw new ArgumentOutOfRangeException ( "Array topic and payload must be the same size" ) ;
140- mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress ) ;
144+ mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress , mqttBrokerPort , false , null , null , uPLibrary . Networking . M2Mqtt . MqttSslProtocols . None ) ;
141145 string clientID = Guid . NewGuid ( ) . ToString ( ) ;
142146 if ( ! mqttClient . IsConnected )
143147 mqttClient . Connect ( clientID ) ;
@@ -159,61 +163,92 @@ private void ProcessData(object param)
159163 {
160164 while ( ! shouldStop )
161165 {
166+
162167 ReadOrder readOrder = ( ReadOrder ) param ;
163168 lock ( lockProcessData )
164169 {
165-
166- if ( readOrder . FunctionCode == FunctionCode . ReadCoils )
170+ try
167171 {
168- bool [ ] value = modbusClient . ReadCoils ( readOrder . StartingAddress , readOrder . Quantity ) ;
169- for ( int i = 0 ; i < value . Length ; i ++ )
172+ if ( readOrder . FunctionCode == FunctionCode . ReadCoils )
170173 {
171- if ( readOrder . oldvalue [ i ] == null )
172- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
173- else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
174- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
175- readOrder . oldvalue [ i ] = value [ i ] ;
176- }
174+ bool [ ] value = modbusClient . ReadCoils ( readOrder . StartingAddress , readOrder . Quantity ) ;
175+ for ( int i = 0 ; i < value . Length ; i ++ )
176+ {
177+ if ( readOrder . oldvalue [ i ] == null )
178+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
179+ else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
180+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
181+ readOrder . oldvalue [ i ] = value [ i ] ;
182+ }
177183
178- }
179- if ( readOrder . FunctionCode == FunctionCode . ReadDiscreteInputs )
180- {
181- bool [ ] value = modbusClient . ReadDiscreteInputs ( readOrder . StartingAddress , readOrder . Quantity ) ;
182- for ( int i = 0 ; i < value . Length ; i ++ )
184+ }
185+ if ( readOrder . FunctionCode == FunctionCode . ReadDiscreteInputs )
183186 {
184- if ( readOrder . oldvalue [ i ] == null )
185- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
186- else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
187- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
188- readOrder . oldvalue [ i ] = value [ i ] ;
187+ bool [ ] value = modbusClient . ReadDiscreteInputs ( readOrder . StartingAddress , readOrder . Quantity ) ;
188+ for ( int i = 0 ; i < value . Length ; i ++ )
189+ {
190+ if ( readOrder . oldvalue [ i ] == null )
191+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
192+ else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
193+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
194+ readOrder . oldvalue [ i ] = value [ i ] ;
195+ }
189196 }
190- }
191- if ( readOrder . FunctionCode == FunctionCode . ReadHoldingRegisters )
192- {
193- int [ ] value = modbusClient . ReadHoldingRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
194- for ( int i = 0 ; i < value . Length ; i ++ )
197+ if ( readOrder . FunctionCode == FunctionCode . ReadHoldingRegisters )
195198 {
196- if ( readOrder . oldvalue [ i ] == null )
197- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
198- else if ( ( int ) readOrder . oldvalue [ i ] != value [ i ] )
199- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
200- readOrder . oldvalue [ i ] = value [ i ] ;
199+ int [ ] value = modbusClient . ReadHoldingRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
200+ for ( int i = 0 ; i < value . Length ; i ++ )
201+ {
202+ float scale = readOrder . Scale != null ? ( readOrder . Scale [ i ] == 0 ) ? 1 : readOrder . Scale [ i ] : 1 ;
203+ if ( readOrder . oldvalue [ i ] == null )
204+ {
205+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
206+ readOrder . oldvalue [ i ] = value [ i ] ;
207+ }
208+ else if ( ( ( int ) readOrder . oldvalue [ i ] != value [ i ] ) && ( readOrder . Hysteresis != null ? ( ( value [ i ] < ( int ) readOrder . oldvalue [ i ] - ( int ) readOrder . Hysteresis [ i ] ) | ( value [ i ] > ( int ) readOrder . oldvalue [ i ] + ( int ) readOrder . Hysteresis [ i ] ) ) : true ) )
209+ {
210+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
211+ readOrder . oldvalue [ i ] = value [ i ] ;
212+ }
213+ }
201214 }
202- }
203215
204- if ( readOrder . FunctionCode == FunctionCode . ReadInputRegisters )
216+ if ( readOrder . FunctionCode == FunctionCode . ReadInputRegisters )
217+ {
218+ int [ ] value = modbusClient . ReadInputRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
219+ for ( int i = 0 ; i < value . Length ; i ++ )
220+ {
221+ float scale = readOrder . Scale != null ? ( readOrder . Scale [ i ] == 0 ) ? 1 : readOrder . Scale [ i ] : 1 ;
222+ if ( readOrder . oldvalue [ i ] == null )
223+ {
224+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
225+ readOrder . oldvalue [ i ] = value [ i ] ;
226+ }
227+ else if ( ( ( int ) readOrder . oldvalue [ i ] != value [ i ] ) && ( readOrder . Hysteresis != null ? ( ( value [ i ] < ( int ) readOrder . oldvalue [ i ] - ( int ) readOrder . Hysteresis [ i ] ) | ( value [ i ] > ( int ) readOrder . oldvalue [ i ] + ( int ) readOrder . Hysteresis [ i ] ) ) : true ) )
228+ {
229+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
230+ readOrder . oldvalue [ i ] = value [ i ] ;
231+ }
232+ }
233+ }
234+ }
235+ catch ( Exception exc )
205236 {
206- int [ ] value = modbusClient . ReadInputRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
207- for ( int i = 0 ; i < value . Length ; i ++ )
237+ modbusClient . Disconnect ( ) ;
238+ Thread . Sleep ( 2000 ) ;
239+ if ( ! AutomaticReconnect )
240+ throw exc ;
241+ if ( ! modbusClient . Connected )
208242 {
209- if ( readOrder . oldvalue [ i ] == null )
210- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
211- else if ( ( int ) readOrder . oldvalue [ i ] != value [ i ] )
212- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
213- readOrder . oldvalue [ i ] = value [ i ] ;
243+ try
244+ {
245+
246+ modbusClient . Connect ( ) ;
247+ }
248+ catch ( Exception ) { }
214249 }
215250 }
216- }
251+ }
217252 System . Threading . Thread . Sleep ( readOrder . CylceTime ) ;
218253 }
219254 }
@@ -231,6 +266,18 @@ public string MqttBrokerAddress
231266 }
232267 }
233268
269+ public int MqttBrokerPort
270+ {
271+ get
272+ {
273+ return this . mqttBrokerPort ;
274+ }
275+ set
276+ {
277+ this . mqttBrokerPort = value ;
278+ }
279+ }
280+
234281 public string MqttRootTopic
235282 {
236283 get
@@ -396,13 +443,16 @@ public string SerialPort
396443
397444 public class ReadOrder
398445 {
399- public FunctionCode FunctionCode ; //Function Code to execute
400- public int CylceTime = 500 ; //Polling intervall in ms
401- public int StartingAddress ; //First Modbus Register to Read (0-based)
446+ public FunctionCode FunctionCode ; //Function Code to execute
447+ public int CylceTime = 500 ; //Polling intervall in ms
448+ public int StartingAddress ; //First Modbus Register to Read (0-based)
402449 public int Quantity ;
403- public string [ ] Topic ; //Symbolnames can by replaced, by default we Push to the topic e.g. for Coils: /modbusclient/coils/1
450+ public string [ ] Topic ; //Symbolnames can by replaced, by default we Push to the topic e.g. for Coils: /modbusclient/coils/1
451+ public int [ ] Hysteresis ; //Values for 16-Bit Registers will be published of the dieffreence is greater than Hysteresis
452+ public string [ ] Unit ; //Unit for Analog Values (Holding Registers and Input Registers)
453+ public float [ ] Scale ; //Scale for Analog Values (Holding Registers and Input Registers)
404454 internal System . Threading . Thread thread ;
405- internal object [ ] oldvalue ;
455+ internal object [ ] oldvalue ;
406456 }
407457
408458 public enum FunctionCode
0 commit comments