From d2935115da6c13d6b3bccbfce1d4411537b01dd3 Mon Sep 17 00:00:00 2001 From: Gregory Freilikhman Date: Tue, 19 May 2026 11:20:58 +0300 Subject: [PATCH] feat: support null value messages (tombstones) for compacted topics Add support for sending and detecting null value messages, which are used as tombstones on compacted topics to delete entries for specific keys. This wraps the C++ client's MessageBuilder::setNullValue() and Message::hasNullValue() APIs added in pulsar-client-cpp#563. Changes: - Bump pulsar-cpp dependency to 4.2.0 - Add pybind11 bindings for set_null_value and has_null_value - Allow Producer.send(None) to produce a null value message - Add Message.has_null_value() to detect tombstone messages - Skip schema encoding when content is None (mirrors Java client) - Add integration tests for null values, compaction, and table view Requires pulsar-client-cpp >= 4.2.0 (not yet released). Co-authored-by: Cursor --- dependencies.yaml | 2 +- pulsar/__init__.py | 33 ++++++++-- src/message.cc | 2 + tests/pulsar_test.py | 140 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 171 insertions(+), 6 deletions(-) diff --git a/dependencies.yaml b/dependencies.yaml index 308aeec2..608d16fc 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,7 +17,7 @@ # under the License. # -pulsar-cpp: 4.1.0 +pulsar-cpp: 4.2.0 pybind11: 3.0.1 # The OpenSSL dependency is only used when building Python from source openssl: 1.1.1q diff --git a/pulsar/__init__.py b/pulsar/__init__.py index afcb6340..30f6bf11 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -366,6 +366,19 @@ def producer_name(self) -> str: """ return self._message.producer_name() + def has_null_value(self) -> bool: + """ + Check if the message has a null value (tombstone). + + Messages with null values are used on compacted topics to delete + the entry for a specific key. + + Returns + ---------- + True if the message has a null value, False otherwise. + """ + return self._message.has_null_value() + def encryption_context(self) -> EncryptionContext | None: """ Get the encryption context for this message or None if it's not encrypted. @@ -1693,7 +1706,9 @@ def send(self, content, ---------- content: - A ``bytes`` object with the message payload. + A ``bytes`` object with the message payload, or ``None`` to send a null value + message (tombstone). Null value messages are used on compacted topics to delete + the entry for a specific key. properties: optional A dict of application-defined string properties. partition_key: optional @@ -1775,7 +1790,9 @@ def callback(res, msg_id): ---------- content - A `bytes` object with the message payload. + A ``bytes`` object with the message payload, or ``None`` to send a null value + message (tombstone). Null value messages are used on compacted topics to delete + the entry for a specific key. callback A callback that is invoked once the message has been acknowledged by the broker. properties: optional @@ -1823,9 +1840,12 @@ def close(self): def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id, replication_clusters, disable_replication, event_timestamp, deliver_at, deliver_after): - data = self._schema.encode(content) + if content is not None: + data = self._schema.encode(content) + _check_type(bytes, data, 'data') + else: + data = None - _check_type(bytes, data, 'data') _check_type_or_none(dict, properties, 'properties') _check_type_or_none(str, partition_key, 'partition_key') _check_type_or_none(str, ordering_key, 'ordering_key') @@ -1837,7 +1857,10 @@ def _build_msg(self, content, properties, partition_key, ordering_key, sequence_ _check_type_or_none(timedelta, deliver_after, 'deliver_after') mb = _pulsar.MessageBuilder() - mb.content(data) + if data is not None: + mb.content(data) + else: + mb.set_null_value() if properties: for k, v in properties.items(): mb.property(k, v) diff --git a/src/message.cc b/src/message.cc index f3247e65..c33780d6 100644 --- a/src/message.cc +++ b/src/message.cc @@ -46,6 +46,7 @@ void export_message(py::module_& m) { .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference) .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference) .def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference) + .def("set_null_value", &MessageBuilder::setNullValue, return_value_policy::reference) .def("build", &MessageBuilder::build); class_(m, "MessageId") @@ -121,6 +122,7 @@ void export_message(py::module_& m) { .def("int_schema_version", &Message::getLongSchemaVersion) .def("schema_version", &Message::getSchemaVersion, return_value_policy::copy) .def("producer_name", &Message::getProducerName, return_value_policy::copy) + .def("has_null_value", &Message::hasNullValue) .def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference); MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index be817c5d..3a3beb70 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -2136,6 +2136,146 @@ def router(msg: pulsar.Message, num_partitions: int): client.close() + def test_null_value_message(self): + client = Client(self.serviceUrl) + topic = "null-value-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest) + + producer.send(b"not null", partition_key="k1") + producer.send(None, partition_key="k2") + producer.send(b"also not null", partition_key="k3") + + msg1 = consumer.receive(TM) + self.assertEqual(msg1.data(), b"not null") + self.assertFalse(msg1.has_null_value()) + + msg2 = consumer.receive(TM) + self.assertTrue(msg2.has_null_value()) + self.assertEqual(msg2.data(), b"") + + msg3 = consumer.receive(TM) + self.assertEqual(msg3.data(), b"also not null") + self.assertFalse(msg3.has_null_value()) + + consumer.close() + producer.close() + client.close() + + def test_null_value_vs_empty_bytes(self): + client = Client(self.serviceUrl) + topic = "null-vs-empty-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest) + + producer.send(b"", partition_key="k1") + producer.send(None, partition_key="k2") + + msg1 = consumer.receive(TM) + self.assertFalse(msg1.has_null_value()) + self.assertEqual(msg1.data(), b"") + + msg2 = consumer.receive(TM) + self.assertTrue(msg2.has_null_value()) + + consumer.close() + producer.close() + client.close() + + def test_null_value_compaction(self): + client = Client(self.serviceUrl) + topic = "null-compact-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + + consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True) + consumer.close() + + # key1: value then tombstone -> removed after compaction + producer.send(b"hello-1", partition_key="key1") + producer.send(None, partition_key="key1") + + # key2: value only -> survives + producer.send(b"hello-2", partition_key="key2") + + # key3: value then tombstone -> removed + producer.send(b"hello-3", partition_key="key3") + producer.send(None, partition_key="key3") + + # key4: value only -> survives + producer.send(b"hello-4", partition_key="key4") + producer.close() + + url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic) + doHttpPut(url, "") + while True: + s = doHttpGet(url).decode("utf-8") + if "RUNNING" in s: + time.sleep(0.2) + else: + self.assertTrue("SUCCESS" in s) + break + + # The compacted ledger cursor update is async on the broker side, + # wait for it to be persisted before reading. + time.sleep(1.0) + + consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True) + messages = [] + while True: + try: + msg = consumer.receive(2000) + messages.append(msg) + except pulsar.Timeout: + break + + keys = [m.partition_key() for m in messages] + self.assertIn("key2", keys) + self.assertIn("key4", keys) + self.assertNotIn("key1", keys) + self.assertNotIn("key3", keys) + self.assertEqual(len(messages), 2) + + consumer.close() + client.close() + + def test_null_value_table_view(self): + client = Client(self.serviceUrl) + topic = "null-tv-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + + producer.send(b"hello", partition_key="key1") + + tv = client.create_table_view(topic) + self.assertEqual(tv.get("key1"), b"hello") + + producer.send(None, partition_key="key1") + for _ in range(50): + if tv.get("key1") is None: + break + time.sleep(0.1) + self.assertIsNone(tv.get("key1")) + + tv.close() + producer.close() + client.close() + + def test_null_value_with_properties(self): + client = Client(self.serviceUrl) + topic = "null-props-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest) + + producer.send(None, partition_key="k1", properties={"action": "delete"}) + + msg = consumer.receive(TM) + self.assertTrue(msg.has_null_value()) + self.assertEqual(msg.properties(), {"action": "delete"}) + self.assertEqual(msg.partition_key(), "k1") + + consumer.close() + producer.close() + client.close() + if __name__ == "__main__": main()