From 9738a1fd69b3ad93c6a7c4e5a212278bd9219c94 Mon Sep 17 00:00:00 2001 From: Kevin Abramczyk Date: Tue, 16 Jun 2026 01:23:47 +0200 Subject: [PATCH 1/2] Add Electrum protocol version negotiation Moves server.version handling out of _ping_loop into a dedicated `handshaking` state in the monitor loop state machine. The handshake runs synchronously after recv/write/notify threads are started but before the ping thread is launched, ensuring the protocol ordering constraint (server.version before server.ping, required since protocol 1.2) is enforced by the architecture, not just by call order within a thread. Changes: - New `handshaking` state between `creating_threads` and `execute_recreation_callback`; ping thread only starts after a successful handshake - Protocol range raised from ["1.2", "1.4"] to ["1.3", "1.4"]; blockchain.block.header, used throughout spectrum.py, was added in 1.3 - Negotiated server software version and protocol version stored as `server_software_version` and `negotiated_protocol` on the socket instance and logged at INFO level - All failure paths (RPCError, timeout, unexpected response format, sub-minimum negotiated version) trigger reconnect via `broken_killing_threads` - Fix logger.exception() missing argument in _create_threads Tested against libbitcoin, ElectrumX, Fulcrum and electrs-esplora. --- src/cryptoadvance/spectrum/elsock.py | 68 +++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/src/cryptoadvance/spectrum/elsock.py b/src/cryptoadvance/spectrum/elsock.py index 5b7dc98..216b57f 100644 --- a/src/cryptoadvance/spectrum/elsock.py +++ b/src/cryptoadvance/spectrum/elsock.py @@ -17,6 +17,14 @@ logger = logging.getLogger(__name__) +def _ver(version_str: str) -> tuple: + """Parse a "X.Y" version string into a comparable tuple.""" + try: + return tuple(int(x) for x in str(version_str).split(".")) + except ValueError: + return (0,) + + class ElSockTimeoutException(Exception): """Called in different contexts where a timeout is relevant""" @@ -54,6 +62,10 @@ class ElectrumSocket: sleep_recv_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse sleep_write_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse socket_timeout = 10 # seconds for self._socket.recv(2048) (won't show up in the logs) + + CLIENT_NAME = "Spectrum/1.0" + PROTOCOL_MIN = "1.3" # blockchain.block.header was added in 1.3 + PROTOCOL_MAX = "1.4" # fmt: on def __init__( @@ -108,6 +120,8 @@ def __init__( self._requests = [] self._notifications = [] self._wanted_status = "ok" # "ok" or "down" + self.server_software_version = None # set during version negotiation, e.g. "/libbitcoin:4.0.0/" + self.negotiated_protocol = None # set during version negotiation, e.g. "1.4" # The monitor-thread will create the other threads self._monitor_thread = create_and_start_bg_thread(self._monitor_loop) while not (self.status == "ok" or self.status.startswith("broken_")): @@ -231,11 +245,10 @@ def _create_threads(self) -> bool: try: self._recv_thread = create_and_start_bg_thread(self.recv_loop) self._write_thread = create_and_start_bg_thread(self._write_loop) - self._ping_thread = create_and_start_bg_thread(self._ping_loop) self._notify_thread = create_and_start_bg_thread(self._notify_loop) return True except Exception as e: - logger.exception() + logger.exception(e) return False def is_socket_closed(self) -> bool: @@ -257,14 +270,15 @@ def _monitor_loop(self): If the ping thread is not alive, the socket connection and threads will be recreated via walking through this state-machine: - [![](https://mermaid.ink/img/pako:eNqdkj9vAyEMxb_KyWOVWzre0KkZM3VrqU4OOAmCwxFn-kdRvnu5I0mVKmEok-H9eDbiHUCzIehgFBR6triNOLQfjyo0eb09vDdt-9ToSCg2bPuRtSMpIrtZW0d2FHpnvZ8I2WXWjAW5rd23rPCGP0OBpuq-xZ_Da_BquFvkaYDacP9oUH13haUv0kmoj1Rkzt3R-zVqV25VgNmAHSxgoDigNfmPD9MtBbKjgRR0uTS0weRFgQrHjKa9ySlYGiscodugH2kBmIRfvoOGTmKiM3SKyoXaY3hl_t3TbLIq4ZozdvwB373ZuA?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqdkj9vAyEMxb_KyWOVWzre0KkZM3VrqU4OOAmCwxFn-kdRvnu5I0mVKmEok-H9eDbiHUCzIehgFBR6triNOLQfjyo0eb09vDdt-9ToSCg2bPuRtSMpIrtZW0d2FHpnvZ8I2WXWjAW5rd23rPCGP0OBpuq-xZ_Da_BquFvkaYDacP9oUH13haUv0kmoj1Rkzt3R-zVqV25VgNmAHSxgoDigNfmPD9MtBbKjgRR0uTS0weRFgQrHjKa9ySlYGiscodugH2kBmIRfvoOGTmKiM3SKyoXaY3hl_t3TbLIq4ZozdvwB373ZuA) + [![](https://mermaid.ink/img/pako:eNqdkk0OwiAQha9CWBq7cenClbdQ04wwFsJfQqmaNL27FFxYQzGRFXnvm5lHmJEyx5HuCe0DBDxK6DyY5r47WxLPaXMhTXMgzCMEabu2d0xhyKZTybt6p9C2Smo9E0FElvcZKXvrLSs8dw-bofm23uJLXIKLcCXyHaAW7o8BP99dJAVY3gtQUc_Uh5AAfCIbArYec72L4UDrKzBVLqh9VaVZqnWKbgk16A1IHhdmpEGgSavD8QaDDnSaXhlMzyQ?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqdkk0OwiAQha9CWBq7cenClbdQ04wwFsJfQqmaNL27FFxYQzGRFXnvm5lHmJEyx5HuCe0DBDxK6DyY5r47WxLPaXMhTXMgzCMEabu2d0xhyKZTybt6p9C2Smo9E0FElvcZKXvrLSs8dw-bofm23uJLXIKLcCXyHaAW7o8BP99dJAVY3gtQUc_Uh5AAfCIbArYec72L4UDrKzBVLqh9VaVZqnWKbgk16A1IHhdmpEGgSavD8QaDDnSaXhlMzyQ) The states are stored in the `ElectrumSocket.state` property. The Constructor of the `ElectrumSocket` is hardly doing more than just setting up the `_monitor_thread` which is an endless loop going through these states: * `creating_sockets` will create the sockets and pass to `creating_threads` or to `broken_creating_sockets` if that fails * `broken_creating_sockets` will try to create the socket and sleep for some time if that fails (and endlessly try to do that) - * `creating_threads` will create the write/recv/ping/notify threads and start them + * `creating_threads` will create the write/recv/notify threads (not ping yet) and start them + * `handshaking` calls server.version synchronously. On success it stores the negotiated protocol version and starts the ping thread. On failure it transitions to `broken_killing_threads`. * `execute_recreation_callback` will call that callback after setting the status to `ok` - * the `ok` state will now simply check the other thready and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads` + * the `ok` state will now simply check the other threads and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads` * `broken_killing_threads` will set `self.running` to false and wait for the threads to terminate. Especially the `recv` thread might not terminate until he get internet connection (again). This might take forever. If all threads are terminated, it will transition to `creating_socket` """ @@ -296,7 +310,47 @@ def _monitor_loop(self): if not self._create_threads(): time.sleep(10) continue - self.status = "execute_recreation_callback" + self.status = "handshaking" + + if self.status == "handshaking": + self.server_software_version = None + self.negotiated_protocol = None + _handshake_ok = False + try: + result = self.call( + "server.version", + [self.CLIENT_NAME, [self.PROTOCOL_MIN, self.PROTOCOL_MAX]], + ) + if isinstance(result, list) and len(result) == 2: + self.server_software_version, self.negotiated_protocol = result + logger.info( + f"Electrum handshake complete: server={self.server_software_version!r} " + f"protocol={self.negotiated_protocol!r}" + ) + if _ver(self.negotiated_protocol) < _ver(self.PROTOCOL_MIN): + logger.error( + f"Negotiated protocol {self.negotiated_protocol!r} is below " + f"required minimum {self.PROTOCOL_MIN!r}. Reconnecting." + ) + else: + _handshake_ok = True + else: + logger.error(f"Unexpected server.version response: {result!r}. Reconnecting.") + except RPCError as e: + logger.error( + f"server.version rejected (code={e.code}): {e.message!r}. " + f"Server may not support protocol range " + f"[{self.PROTOCOL_MIN}, {self.PROTOCOL_MAX}]. Reconnecting." + ) + except ElSockTimeoutException: + logger.error("server.version timed out. Reconnecting.") + except Exception as e: + logger.error(f"server.version failed: {e}. Reconnecting.") + if not _handshake_ok: + self.status = "broken_killing_threads" + else: + self._ping_thread = create_and_start_bg_thread(self._ping_loop) + self.status = "execute_recreation_callback" if self.status == "execute_recreation_callback": # set the new status here before we call the callback @@ -502,7 +556,7 @@ def _ping_loop(self): try: self.ping() tries = 0 - except ElSockTimeoutException as e: + except ElSockTimeoutException: tries = tries + 1 logger.error( f"Timeout in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met" From 47dc34030df01dd2b08b959e333b6ddeb43c44ac Mon Sep 17 00:00:00 2001 From: al-munazzim Date: Sun, 28 Jun 2026 20:49:44 +0000 Subject: [PATCH 2/2] test: update Electrum socket state expectations --- tests/integration/elsock_test.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/tests/integration/elsock_test.py b/tests/integration/elsock_test.py index 5b5dbde..2ce1a33 100644 --- a/tests/integration/elsock_test.py +++ b/tests/integration/elsock_test.py @@ -54,7 +54,13 @@ def callback(something): ) assert ( caplog.text.count( - "ElectrumSocket Status changed from creating_threads to execute_recreation_callback" + "ElectrumSocket Status changed from creating_threads to handshaking" + ) + == 1 + ) + assert ( + caplog.text.count( + "ElectrumSocket Status changed from handshaking to execute_recreation_callback" ) == 1 ) @@ -74,6 +80,14 @@ def callback(something): f"...................................... timer: {i} seconds passed (elsock.is_socket_closed() returns {elsock.is_socket_closed()})" ) time.sleep(1) + for i in range(0, 20): + if elsock.status == "ok": + break + logger.info( + f"...................................... waiting for socket recovery: {i} seconds passed (status={elsock.status})" + ) + time.sleep(1) + assert elsock.status == "ok" logger.info( f"{datetime.now()}========================The socket connection should now work properly again================================" ) @@ -81,7 +95,7 @@ def callback(something): ts = elsock.ping() logger.info(f"second working ping in {ts} ms") assert ts < 10 - assert caplog.text.count("ElectrumSocket Status changed") == 9 + assert caplog.text.count("ElectrumSocket Status changed") == 11 assert ( caplog.text.count( @@ -103,7 +117,13 @@ def callback(something): ) assert ( caplog.text.count( - "ElectrumSocket Status changed from creating_threads to execute_recreation_callback" + "ElectrumSocket Status changed from creating_threads to handshaking" + ) + == 2 + ) + assert ( + caplog.text.count( + "ElectrumSocket Status changed from handshaking to execute_recreation_callback" ) == 2 )