It looks like there's not tests of the new KafkaProducer for the various acks levels:
https://github.com/dpkp/kafka-python/blob/master/test/test_producer.py
These tests are present for the old/deprecated producers here:
|
############################ |
|
# Producer ACK Tests # |
|
############################ |
|
|
|
def test_acks_none(self): |
|
partition = self.client.get_partition_ids_for_topic(self.topic)[0] |
|
start_offset = self.current_offset(self.topic, partition) |
|
|
|
producer = Producer( |
|
self.client, |
|
req_acks=Producer.ACK_NOT_REQUIRED, |
|
) |
|
resp = producer.send_messages(self.topic, partition, self.msg("one")) |
|
|
|
# No response from produce request with no acks required |
|
self.assertEqual(len(resp), 0) |
|
|
|
# But the message should still have been delivered |
|
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) |
|
producer.stop() |
|
|
|
def test_acks_local_write(self): |
|
partition = self.client.get_partition_ids_for_topic(self.topic)[0] |
|
start_offset = self.current_offset(self.topic, partition) |
|
|
|
producer = Producer( |
|
self.client, |
|
req_acks=Producer.ACK_AFTER_LOCAL_WRITE, |
|
) |
|
resp = producer.send_messages(self.topic, partition, self.msg("one")) |
|
|
|
self.assert_produce_response(resp, start_offset) |
|
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) |
|
|
|
producer.stop() |
|
|
|
def test_acks_cluster_commit(self): |
|
partition = self.client.get_partition_ids_for_topic(self.topic)[0] |
|
start_offset = self.current_offset(self.topic, partition) |
|
|
|
producer = Producer( |
|
self.client, |
|
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, |
|
) |
|
|
|
resp = producer.send_messages(self.topic, partition, self.msg("one")) |
|
self.assert_produce_response(resp, start_offset) |
|
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) |
|
|
|
producer.stop() |
We should add/migrate tests of these scenarios to the new KafkaProducer.
Reason:
We just observed a very confusing scenario in production where a KafkaProducer-based producer is sending thousands of messages using acks=1, none of which returned any errors. However, two messages are missing when we dump all messages from that topic. So I just want to make sure this part of kafka-python has test coverage to eliminate it from potential root causes.
It looks like there's not tests of the new
KafkaProducerfor the variousackslevels:https://github.com/dpkp/kafka-python/blob/master/test/test_producer.py
These tests are present for the old/deprecated producers here:
kafka-python/test/test_producer_integration.py
Lines 417 to 466 in 3ff3d75
We should add/migrate tests of these scenarios to the new
KafkaProducer.Reason:
We just observed a very confusing scenario in production where a
KafkaProducer-based producer is sending thousands of messages usingacks=1, none of which returned any errors. However, two messages are missing when we dump all messages from that topic. So I just want to make sure this part ofkafka-pythonhas test coverage to eliminate it from potential root causes.