11(* ::Package:: *)
22
3- (* Not useful since we apparently never receive multipart messages,
4- no matter the total size (tested with 80MB) *)
3+
54$NotSupportedVersionErrNo = 10 ;
65$MinVersionSupported = 11.3 ;
76If [$VersionNumber < $MinVersionSupported , Exit [$NotSupportedVersionErrNo ]];
@@ -44,11 +43,15 @@ no matter the total size (tested with 80MB) *)
4443 ]
4544 ]
4645 },
47- True
48- ,
46+ PacletFind ["ZeroMQLink" -> "1.2*" ] == {},
4947 {
5048 ZMQSocketWriteMessage ,
5149 SocketReadMessage [# , "Blocking" -> False ] &
50+ },
51+ True ,
52+ {
53+ SocketWriteMessage [## , "Blocking" -> False ] & ,
54+ SocketReadMessage [# , "Blocking" -> False ] &
5255 }
5356];
5457
@@ -185,10 +188,10 @@ no matter the total size (tested with 80MB) *)
185188$MaxIdlePause =. 001 ;
186189$MinIdlePause = 0.0001 ;
187190$PauseIncrement = 0.0001 ;
188- $TaskSupportMinVersion = Infinity ;
189-
191+ (*$ListenerSupportMinVersion = 12.3;*)
192+ $ListenerSupportMinVersion = Infinity ;
190193Which [
191- $VersionNumber < $TaskSupportMinVersion ,
194+ $VersionNumber < $ListenerSupportMinVersion ,
192195 (* Low CPU wait but need synchronous loop. *)
193196 evaluationLoop [socketIn_ SocketObject ]:= With [
194197 {maxPause = $MaxIdlePause , minPause = $MinIdlePause , incr = $PauseIncrement , poller = {socketIn }},
@@ -204,8 +207,20 @@ no matter the total size (tested with 80MB) *)
204207 ]
205208 ]
206209 ],
210+ (* Version with SocketListen, code is cleaner is 3 times slower as the version above.
211+ Possibly during the async events and pre-emptive evaluation.
212+ *)
207213 True ,
208- (* Version with fixed asynchronous tasks *)
214+ evaluationLoop [socketIn_ SocketObject ]:= (
215+ $SocketListener = SocketListen [
216+ socketIn ,
217+ socketEventHandler [# DataByteArray ]&
218+ ];
219+ SendAck [];
220+ Pause [2 ^ 60 ];
221+ );
222+ (*
223+ Version with fixed asynchronous tasks
209224 evaluationLoop[socketIn_SocketObject]:= With[
210225 {maxPause=$MaxIdlePause, minPause=$MinIdlePause, incr=$PauseIncrement, poller={socketIn}},
211226 $Task = SessionSubmit[
@@ -218,32 +233,33 @@ no matter the total size (tested with 80MB) *)
218233 SocketWaitNext[poller];
219234 ]
220235 ),
221- 0.0001 (* negligeable compared to IO operations ~1ms. We basically need 0 but can't use this value. *)
236+ 0.0001 negligeable compared to IO operations ~1ms. We basically need 0 but can't use this value.
222237 ],
223238 Method->"Idle",
224239 HandlerFunctions-><|"TaskStarted"->SendAck[]|>
225240 ];
226241 ];
242+ *)
227243];
228244(* can be useful for loopback connections which are available only if a task can be used.
229245Does not kill the kernel *)
230246ClientLibrary ` disconnect [] := Quit [];
231- ClientLibrary ` disconnect [] /; ($Task =!= None ) := (
232- TaskRemove [ $Task ];
247+ ClientLibrary ` disconnect [] /; ($SocketListener =!= None ) := (
248+ DeleteObject [ $SocketListener ];
233249 Scan [
234250 If [# =!= None , Close [# ]] & ,
235251 {$LoggerSocket , $OutputSocket , $InputSocket }
236252 ]
237253);
238- $Task = None ;
254+ $SocketListener = None ;
239255$LoggerSocket = None ;
240256$OutputSocket = None ;
241257$InputSocket = None ;
242258
243259$MaxMessagesReturned = 31 ;
244260$NoMessage = ByteArray [{0 }];
245261
246- SlaveKernelPrivateStart [inputsocket_ String , outputsocket_ String , logsocket_ String , loglevel_ Integer ] := (
262+ KernelPrivateStart [inputsocket_ String , outputsocket_ String , logsocket_ String , loglevel_ Integer ] := (
247263 $LoggerSocket = SocketConnect [logsocket ,"ZMQ_PUB" ];
248264 If [FailureQ [$LoggerSocket ],
249265 Print ["Failed to connect to logging socket: " , logsocket ]
@@ -252,12 +268,12 @@ no matter the total size (tested with 80MB) *)
252268 setLogLevel [loglevel ];
253269 addMessageHandler [];
254270 addPrintHandler [];
255- SlaveKernelPrivateStart [inputsocket , outputsocket ]
271+ KernelPrivateStart [inputsocket , outputsocket ]
256272 ];
257273);
258274
259275
260- SlaveKernelPrivateStart [inputsocket_ String , outputsocket_ String ] := Block [
276+ KernelPrivateStart [inputsocket_ String , outputsocket_ String ] := Block [
261277 {listener , msg },
262278 $InputSocket = SocketConnect [inputsocket , "ZMQ_Pull" ];
263279 $OutputSocket = SocketConnect [outputsocket , "ZMQ_Push" ];
0 commit comments