@@ -21,13 +21,24 @@ no matter the total size (tested with 80MB) *)
2121
2222Begin ["`Private`" ];
2323
24- {SocketWriteByteArrayFunc , SocketReadByteArrayFunc } = If [
24+ (*Define the most efficient pair of write bytearray and non-blocking read, for various WL versions. *)
25+ {SocketWriteByteArrayFunc , SocketReadByteArrayFuncNoWait } = If [
2526 $VersionNumber < 12 ,
26- iSocketWriteByteArray [socket_ ,ba_ ByteArray ] := ZeroMQLink ` Private ` ZMQWriteInternal [socket , Normal [ba ]];
27- iSocketReadByteArray [uuid_ String , flags_ Integer ]:= ByteArray @ iRecvSingleMultipartMessageSocket [uuid , flags ];
28- {iSocketWriteByteArray , iSocketReadByteArray }
27+ {
28+ Function [{socketOut , ba }, ZeroMQLink ` Private ` ZMQWriteInternal [socketOut , Normal [ba ]]],
29+ Function [{socketIn },
30+ Block [
31+ {data = ByteArray @ iRecvSingleMultipartMessageSocket [First @ socketIn , 1 (*Flag NOWAIT*) ]},
32+ If [Length [data ] >= 3 , Part [data ,4 ;; ], {}]
33+ ]
34+ ]
35+ }
2936 ,
30- {ZMQSocketWriteMessage , iRecvSingleMultipartBinaryMessageSocket }
37+ {
38+ ZMQSocketWriteMessage ,
39+ SocketReadMessage [# , "Blocking" -> False ] &
40+ }
41+
3142];
3243
3344$DEBUG = 1 ;
@@ -169,14 +180,13 @@ no matter the total size (tested with 80MB) *)
169180 $VersionNumber < $TaskSupportMinVersion ,
170181 (* Low CPU wait but need synchronous loop. *)
171182 evaluationLoop [socketIn_ SocketObject ]:= With [
172- {maxPause = $MaxIdlePause , minPause = $MinIdlePause , incr = $PauseIncrement ,
173- uuidIn = First @ socketIn , poller = {socketIn }},
183+ {maxPause = $MaxIdlePause , minPause = $MinIdlePause , incr = $PauseIncrement , poller = {socketIn }},
174184 Block [{msg },
175185 SendAck [];
176186 While [True ,
177- msg = SocketReadByteArrayFunc [ uuidIn , 1 (* NOWAIT *) ];
178- If [Length [msg ]> 3 ,
179- socketEventHandler [msg [[ 4 ;; ]] ];
187+ msg = SocketReadByteArrayFuncNoWait [ socketIn ];
188+ If [Length [msg ]> 0 ,
189+ socketEventHandler [msg ];
180190 ,
181191 SocketWaitNext [poller ];
182192 ]
@@ -186,14 +196,13 @@ no matter the total size (tested with 80MB) *)
186196 True ,
187197 (* Version with fixed asynchronous tasks *)
188198 evaluationLoop [socketIn_ SocketObject ]:= With [
189- {maxPause = $MaxIdlePause , minPause = $MinIdlePause , incr = $PauseIncrement ,
190- uuidIn = First @ socketIn , poller = {socketIn }},
199+ {maxPause = $MaxIdlePause , minPause = $MinIdlePause , incr = $PauseIncrement , poller = {socketIn }},
191200 $Task = SessionSubmit [
192201 ScheduledTask [
193202 (
194- msg = SocketReadByteArrayFunc [ uuidIn , 1 (* NOWAIT *) ];
195- If [Length [msg ]> 3 ,
196- socketEventHandler [msg [[ 4 ;; ]] ];
203+ msg = SocketReadByteArrayFuncNoWait [ socketIn ];
204+ If [Length [msg ]> 0 ,
205+ socketEventHandler [msg ];
197206 ,
198207 SocketWaitNext [poller ];
199208 ]
0 commit comments