Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

pulsar-cpp: 4.1.0
pulsar-cpp: 4.2.0
pybind11: 3.0.1
# The OpenSSL dependency is only used when building Python from source
openssl: 1.1.1q
33 changes: 28 additions & 5 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ def producer_name(self) -> str:
"""
return self._message.producer_name()

def has_null_value(self) -> bool:
"""
Check if the message has a null value (tombstone).

Messages with null values are used on compacted topics to delete
the entry for a specific key.

Returns
----------
True if the message has a null value, False otherwise.
"""
return self._message.has_null_value()

def encryption_context(self) -> EncryptionContext | None:
"""
Get the encryption context for this message or None if it's not encrypted.
Expand Down Expand Up @@ -1693,7 +1706,9 @@ def send(self, content,
----------

content:
A ``bytes`` object with the message payload.
A ``bytes`` object with the message payload, or ``None`` to send a null value
message (tombstone). Null value messages are used on compacted topics to delete
the entry for a specific key.
properties: optional
A dict of application-defined string properties.
partition_key: optional
Expand Down Expand Up @@ -1775,7 +1790,9 @@ def callback(res, msg_id):
----------

content
A `bytes` object with the message payload.
A ``bytes`` object with the message payload, or ``None`` to send a null value
message (tombstone). Null value messages are used on compacted topics to delete
the entry for a specific key.
callback
A callback that is invoked once the message has been acknowledged by the broker.
properties: optional
Expand Down Expand Up @@ -1823,9 +1840,12 @@ def close(self):
def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after):
data = self._schema.encode(content)
if content is not None:
data = self._schema.encode(content)
_check_type(bytes, data, 'data')
else:
data = None

_check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
_check_type_or_none(str, ordering_key, 'ordering_key')
Expand All @@ -1837,7 +1857,10 @@ def _build_msg(self, content, properties, partition_key, ordering_key, sequence_
_check_type_or_none(timedelta, deliver_after, 'deliver_after')

mb = _pulsar.MessageBuilder()
mb.content(data)
if data is not None:
mb.content(data)
else:
mb.set_null_value()
if properties:
for k, v in properties.items():
mb.property(k, v)
Expand Down
2 changes: 2 additions & 0 deletions src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void export_message(py::module_& m) {
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference)
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference)
.def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference)
.def("set_null_value", &MessageBuilder::setNullValue, return_value_policy::reference)
.def("build", &MessageBuilder::build);

class_<MessageId>(m, "MessageId")
Expand Down Expand Up @@ -121,6 +122,7 @@ void export_message(py::module_& m) {
.def("int_schema_version", &Message::getLongSchemaVersion)
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy)
.def("producer_name", &Message::getProducerName, return_value_policy::copy)
.def("has_null_value", &Message::hasNullValue)
.def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference);

MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,
Expand Down
140 changes: 140 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,146 @@ def router(msg: pulsar.Message, num_partitions: int):

client.close()

def test_null_value_message(self):
client = Client(self.serviceUrl)
topic = "null-value-%s" % uuid.uuid4()
producer = client.create_producer(topic, batching_enabled=False)
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)

producer.send(b"not null", partition_key="k1")
producer.send(None, partition_key="k2")
producer.send(b"also not null", partition_key="k3")

msg1 = consumer.receive(TM)
self.assertEqual(msg1.data(), b"not null")
self.assertFalse(msg1.has_null_value())

msg2 = consumer.receive(TM)
self.assertTrue(msg2.has_null_value())
self.assertEqual(msg2.data(), b"")

msg3 = consumer.receive(TM)
self.assertEqual(msg3.data(), b"also not null")
self.assertFalse(msg3.has_null_value())

consumer.close()
producer.close()
client.close()

def test_null_value_vs_empty_bytes(self):
client = Client(self.serviceUrl)
topic = "null-vs-empty-%s" % uuid.uuid4()
producer = client.create_producer(topic, batching_enabled=False)
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)

producer.send(b"", partition_key="k1")
producer.send(None, partition_key="k2")

msg1 = consumer.receive(TM)
self.assertFalse(msg1.has_null_value())
self.assertEqual(msg1.data(), b"")

msg2 = consumer.receive(TM)
self.assertTrue(msg2.has_null_value())

consumer.close()
producer.close()
client.close()

def test_null_value_compaction(self):
client = Client(self.serviceUrl)
topic = "null-compact-%s" % uuid.uuid4()
producer = client.create_producer(topic, batching_enabled=False)

consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
consumer.close()

# key1: value then tombstone -> removed after compaction
producer.send(b"hello-1", partition_key="key1")
producer.send(None, partition_key="key1")

# key2: value only -> survives
producer.send(b"hello-2", partition_key="key2")

# key3: value then tombstone -> removed
producer.send(b"hello-3", partition_key="key3")
producer.send(None, partition_key="key3")

# key4: value only -> survives
producer.send(b"hello-4", partition_key="key4")
producer.close()

url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
doHttpPut(url, "")
while True:
s = doHttpGet(url).decode("utf-8")
if "RUNNING" in s:
time.sleep(0.2)
else:
self.assertTrue("SUCCESS" in s)
break

# The compacted ledger cursor update is async on the broker side,
# wait for it to be persisted before reading.
time.sleep(1.0)

consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
messages = []
while True:
try:
msg = consumer.receive(2000)
messages.append(msg)
except pulsar.Timeout:
break

keys = [m.partition_key() for m in messages]
self.assertIn("key2", keys)
self.assertIn("key4", keys)
self.assertNotIn("key1", keys)
self.assertNotIn("key3", keys)
self.assertEqual(len(messages), 2)

consumer.close()
client.close()

def test_null_value_table_view(self):
client = Client(self.serviceUrl)
topic = "null-tv-%s" % uuid.uuid4()
producer = client.create_producer(topic, batching_enabled=False)

producer.send(b"hello", partition_key="key1")

tv = client.create_table_view(topic)
self.assertEqual(tv.get("key1"), b"hello")

producer.send(None, partition_key="key1")
for _ in range(50):
if tv.get("key1") is None:
break
time.sleep(0.1)
self.assertIsNone(tv.get("key1"))

tv.close()
producer.close()
client.close()

def test_null_value_with_properties(self):
client = Client(self.serviceUrl)
topic = "null-props-%s" % uuid.uuid4()
producer = client.create_producer(topic, batching_enabled=False)
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)

producer.send(None, partition_key="k1", properties={"action": "delete"})

msg = consumer.receive(TM)
self.assertTrue(msg.has_null_value())
self.assertEqual(msg.properties(), {"action": "delete"})
self.assertEqual(msg.partition_key(), "k1")

consumer.close()
producer.close()
client.close()


if __name__ == "__main__":
main()