diff --git a/dependencies.yaml b/dependencies.yaml index 308aeec..608d16f 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,7 +17,7 @@ # under the License. # -pulsar-cpp: 4.1.0 +pulsar-cpp: 4.2.0 pybind11: 3.0.1 # The OpenSSL dependency is only used when building Python from source openssl: 1.1.1q diff --git a/pulsar/__init__.py b/pulsar/__init__.py index afcb634..30f6bf1 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -366,6 +366,19 @@ def producer_name(self) -> str: """ return self._message.producer_name() + def has_null_value(self) -> bool: + """ + Check if the message has a null value (tombstone). + + Messages with null values are used on compacted topics to delete + the entry for a specific key. + + Returns + ---------- + True if the message has a null value, False otherwise. + """ + return self._message.has_null_value() + def encryption_context(self) -> EncryptionContext | None: """ Get the encryption context for this message or None if it's not encrypted. @@ -1693,7 +1706,9 @@ def send(self, content, ---------- content: - A ``bytes`` object with the message payload. + A ``bytes`` object with the message payload, or ``None`` to send a null value + message (tombstone). Null value messages are used on compacted topics to delete + the entry for a specific key. properties: optional A dict of application-defined string properties. partition_key: optional @@ -1775,7 +1790,9 @@ def callback(res, msg_id): ---------- content - A `bytes` object with the message payload. + A ``bytes`` object with the message payload, or ``None`` to send a null value + message (tombstone). Null value messages are used on compacted topics to delete + the entry for a specific key. callback A callback that is invoked once the message has been acknowledged by the broker. properties: optional @@ -1823,9 +1840,12 @@ def close(self): def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id, replication_clusters, disable_replication, event_timestamp, deliver_at, deliver_after): - data = self._schema.encode(content) + if content is not None: + data = self._schema.encode(content) + _check_type(bytes, data, 'data') + else: + data = None - _check_type(bytes, data, 'data') _check_type_or_none(dict, properties, 'properties') _check_type_or_none(str, partition_key, 'partition_key') _check_type_or_none(str, ordering_key, 'ordering_key') @@ -1837,7 +1857,10 @@ def _build_msg(self, content, properties, partition_key, ordering_key, sequence_ _check_type_or_none(timedelta, deliver_after, 'deliver_after') mb = _pulsar.MessageBuilder() - mb.content(data) + if data is not None: + mb.content(data) + else: + mb.set_null_value() if properties: for k, v in properties.items(): mb.property(k, v) diff --git a/src/message.cc b/src/message.cc index f3247e6..c33780d 100644 --- a/src/message.cc +++ b/src/message.cc @@ -46,6 +46,7 @@ void export_message(py::module_& m) { .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference) .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference) .def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference) + .def("set_null_value", &MessageBuilder::setNullValue, return_value_policy::reference) .def("build", &MessageBuilder::build); class_(m, "MessageId") @@ -121,6 +122,7 @@ void export_message(py::module_& m) { .def("int_schema_version", &Message::getLongSchemaVersion) .def("schema_version", &Message::getSchemaVersion, return_value_policy::copy) .def("producer_name", &Message::getProducerName, return_value_policy::copy) + .def("has_null_value", &Message::hasNullValue) .def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference); MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index be817c5..3a3beb7 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -2136,6 +2136,146 @@ def router(msg: pulsar.Message, num_partitions: int): client.close() + def test_null_value_message(self): + client = Client(self.serviceUrl) + topic = "null-value-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest) + + producer.send(b"not null", partition_key="k1") + producer.send(None, partition_key="k2") + producer.send(b"also not null", partition_key="k3") + + msg1 = consumer.receive(TM) + self.assertEqual(msg1.data(), b"not null") + self.assertFalse(msg1.has_null_value()) + + msg2 = consumer.receive(TM) + self.assertTrue(msg2.has_null_value()) + self.assertEqual(msg2.data(), b"") + + msg3 = consumer.receive(TM) + self.assertEqual(msg3.data(), b"also not null") + self.assertFalse(msg3.has_null_value()) + + consumer.close() + producer.close() + client.close() + + def test_null_value_vs_empty_bytes(self): + client = Client(self.serviceUrl) + topic = "null-vs-empty-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest) + + producer.send(b"", partition_key="k1") + producer.send(None, partition_key="k2") + + msg1 = consumer.receive(TM) + self.assertFalse(msg1.has_null_value()) + self.assertEqual(msg1.data(), b"") + + msg2 = consumer.receive(TM) + self.assertTrue(msg2.has_null_value()) + + consumer.close() + producer.close() + client.close() + + def test_null_value_compaction(self): + client = Client(self.serviceUrl) + topic = "null-compact-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + + consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True) + consumer.close() + + # key1: value then tombstone -> removed after compaction + producer.send(b"hello-1", partition_key="key1") + producer.send(None, partition_key="key1") + + # key2: value only -> survives + producer.send(b"hello-2", partition_key="key2") + + # key3: value then tombstone -> removed + producer.send(b"hello-3", partition_key="key3") + producer.send(None, partition_key="key3") + + # key4: value only -> survives + producer.send(b"hello-4", partition_key="key4") + producer.close() + + url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic) + doHttpPut(url, "") + while True: + s = doHttpGet(url).decode("utf-8") + if "RUNNING" in s: + time.sleep(0.2) + else: + self.assertTrue("SUCCESS" in s) + break + + # The compacted ledger cursor update is async on the broker side, + # wait for it to be persisted before reading. + time.sleep(1.0) + + consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True) + messages = [] + while True: + try: + msg = consumer.receive(2000) + messages.append(msg) + except pulsar.Timeout: + break + + keys = [m.partition_key() for m in messages] + self.assertIn("key2", keys) + self.assertIn("key4", keys) + self.assertNotIn("key1", keys) + self.assertNotIn("key3", keys) + self.assertEqual(len(messages), 2) + + consumer.close() + client.close() + + def test_null_value_table_view(self): + client = Client(self.serviceUrl) + topic = "null-tv-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + + producer.send(b"hello", partition_key="key1") + + tv = client.create_table_view(topic) + self.assertEqual(tv.get("key1"), b"hello") + + producer.send(None, partition_key="key1") + for _ in range(50): + if tv.get("key1") is None: + break + time.sleep(0.1) + self.assertIsNone(tv.get("key1")) + + tv.close() + producer.close() + client.close() + + def test_null_value_with_properties(self): + client = Client(self.serviceUrl) + topic = "null-props-%s" % uuid.uuid4() + producer = client.create_producer(topic, batching_enabled=False) + consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest) + + producer.send(None, partition_key="k1", properties={"action": "delete"}) + + msg = consumer.receive(TM) + self.assertTrue(msg.has_null_value()) + self.assertEqual(msg.properties(), {"action": "delete"}) + self.assertEqual(msg.partition_key(), "k1") + + consumer.close() + producer.close() + client.close() + if __name__ == "__main__": main()