Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions kafka/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,31 @@


class Future:
__slots__ = ('is_done', 'value', 'exception', '_callbacks', '_errbacks', '_lock')
error_on_callbacks = False # and errbacks
__slots__ = ('is_done', 'value', 'exception', '_callbacks', '_errbacks', '_lock', '_error_on_callbacks')

def __init__(self):
# Class-level default for error_on_callbacks. Per-instance values passed to
# __init__ take precedence; this remains overridable globally (e.g. test
# suites set it to surface callback exceptions).
_default_error_on_callbacks = False

def __init__(self, error_on_callbacks=None):
self.is_done = False
self.value = None
self.exception = None
self._callbacks = []
self._errbacks = []
self._lock = threading.Lock()
# None means "inherit the class-level default"; an explicit bool
# overrides it for this instance only.
self._error_on_callbacks = error_on_callbacks

@property
def error_on_callbacks(self):
"""When True, exceptions raised inside callbacks/errbacks are re-raised
to the caller instead of only being logged."""
if self._error_on_callbacks is None:
return self._default_error_on_callbacks
return self._error_on_callbacks

def succeeded(self):
return self.is_done and self.exception is None
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class FutureRecordMetadata(Future):
:class:`~kafka.errors.KafkaTimeoutError`).
"""
__slots__ = ('_produce_future', 'args')
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super().__init__()
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size, error_on_callbacks=None):
super().__init__(error_on_callbacks=error_on_callbacks)
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
Expand Down
6 changes: 6 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ class KafkaProducer:
acknowledgement. Users should generally prefer to leave this config
unset and instead use delivery_timeout_ms to control retry behavior.
Default: float('inf') (infinite)
error_on_callbacks (bool): If True, exceptions raised inside callbacks
and errbacks registered on the future returned by send() are
re-raised to the caller instead of only being logged. If None, the
Future class-level default is used (False). Default: None.
batch_size (int): Requests sent to brokers will contain multiple
batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce
Expand Down Expand Up @@ -414,6 +418,7 @@ class KafkaProducer:
'acks': -1,
'compression_type': None,
'retries': float('inf'),
'error_on_callbacks': None,
'batch_size': 16384,
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
Expand Down Expand Up @@ -936,6 +941,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
error_on_callbacks=self.config['error_on_callbacks'],
).failure(e)

# Track if the user passed an explicit partition b/c sticky logic does not apply
Expand Down
9 changes: 7 additions & 2 deletions kafka/producer/producer_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FinalState(IntEnum):


class ProducerBatch:
def __init__(self, tp, records, now=None):
def __init__(self, tp, records, now=None, error_on_callbacks=None):
now = time.monotonic() if now is None else now
self.max_record_size = 0
self.created = now
Expand All @@ -29,6 +29,10 @@ def __init__(self, tp, records, now=None):
self.produce_future = FutureProduceResult(tp)
self._record_futures = []
self._retry = False
# Propagated to each per-record FutureRecordMetadata so user callbacks
# registered via send().add_callback()/add_errback() can opt in to
# re-raising exceptions instead of only logging them.
self._error_on_callbacks = error_on_callbacks
self._final_state = None

@property
Expand Down Expand Up @@ -71,7 +75,8 @@ def try_append(self, timestamp_ms, key, value, headers, now=None):
metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1,
error_on_callbacks=self._error_on_callbacks)
self._record_futures.append(future)
return future

Expand Down
7 changes: 4 additions & 3 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class RecordAccumulator:
'retry_backoff_ms': 100,
'transaction_manager': None,
'message_version': 2,
'error_on_callbacks': None,
}

def __init__(self, **configs):
Expand Down Expand Up @@ -182,7 +183,7 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None,
self.config['batch_size']
)

batch = ProducerBatch(tp, records, now=now)
batch = ProducerBatch(tp, records, now=now, error_on_callbacks=self.config['error_on_callbacks'])
future = batch.try_append(timestamp_ms, key, value, headers, now=now)
if not future:
raise Exception()
Expand Down Expand Up @@ -271,7 +272,7 @@ def split_and_reenqueue(self, batch, now=None):
self.config['compression_attrs'],
self.config['batch_size'],
)
current_batch = ProducerBatch(tp, builder, now=now)
current_batch = ProducerBatch(tp, builder, now=now, error_on_callbacks=self.config['error_on_callbacks'])
current_batch.created = batch.created

for record in group:
Expand All @@ -285,7 +286,7 @@ def split_and_reenqueue(self, batch, now=None):
self.config['compression_attrs'],
self.config['batch_size'],
)
current_batch = ProducerBatch(tp, builder, now=now)
current_batch = ProducerBatch(tp, builder, now=now, error_on_callbacks=self.config['error_on_callbacks'])
current_batch.created = batch.created
metadata = builder.append(record.timestamp, record.key, record.value, record.headers)

Expand Down
2 changes: 1 addition & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@


from kafka.future import Future
Future.error_on_callbacks = True # always fail during testing
Future._default_error_on_callbacks = True # always fail during testing
18 changes: 18 additions & 0 deletions test/producer/test_producer_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from kafka.errors import IllegalStateError, KafkaError
from kafka.future import Future
from kafka.producer.future import FutureRecordMetadata, RecordMetadata
from kafka.producer.producer_batch import ProducerBatch
from kafka.record.memory_records import MemoryRecordsBuilder
Expand All @@ -24,6 +25,23 @@ def batch(tp, memory_records_builder):
return ProducerBatch(tp, memory_records_builder)


def test_producer_batch_error_on_callbacks_default(tp, memory_records_builder):
# Unset -> record futures inherit the Future class-level default (#2366).
batch = ProducerBatch(tp, memory_records_builder)
future = batch.try_append(0, b'key', b'value', [])
assert future.error_on_callbacks is Future._default_error_on_callbacks


def test_producer_batch_error_on_callbacks_propagates(tp, memory_records_builder):
# ProducerBatch threads an explicit error_on_callbacks down to each record
# future, overriding the class-level default in both directions (#2366).
batch = ProducerBatch(tp, memory_records_builder, error_on_callbacks=True)
assert batch.try_append(0, b'k', b'v', []).error_on_callbacks is True

batch_off = ProducerBatch(tp, memory_records_builder, error_on_callbacks=False)
assert batch_off.try_append(0, b'k', b'v', []).error_on_callbacks is False


def test_producer_batch_producer_id(tp, memory_records_builder):
batch = ProducerBatch(tp, memory_records_builder)
assert batch.producer_id == -1
Expand Down
42 changes: 42 additions & 0 deletions test/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,45 @@ def test_chain_failure(self):
f1.failure(ValueError('err'))
assert f2.failed()
assert isinstance(f2.exception, ValueError)


def _raise(exc):
raise exc


class TestFutureErrorOnCallbacks:
"""error_on_callbacks is now a per-instance option (see #2366).

An explicit value passed to ``Future(error_on_callbacks=...)`` takes
precedence over the class-level default. (Note: the test suite sets the
class-level default to True via ``test/__init__.py``, so these tests pass
explicit values to assert override behavior independent of that default.)
"""

def test_none_inherits_class_default(self):
assert Future().error_on_callbacks is Future._default_error_on_callbacks

def test_explicit_false_overrides_class_default(self):
f = Future(error_on_callbacks=False)
assert f.error_on_callbacks is False
f.add_callback(lambda v: _raise(ValueError('boom')))
f.success(1) # suppressed despite class default True
assert f.succeeded()

def test_callback_exception_raised_when_enabled(self):
f = Future(error_on_callbacks=True)
f.add_callback(lambda v: _raise(ValueError('boom')))
with pytest.raises(ValueError, match='boom'):
f.success(1)

def test_errback_exception_raised_when_enabled(self):
f = Future(error_on_callbacks=True)
f.add_errback(lambda e: _raise(RuntimeError('boom')))
with pytest.raises(RuntimeError, match='boom'):
f.failure(ValueError('orig'))

def test_already_done_callback_raises_when_enabled(self):
f = Future(error_on_callbacks=True)
f.success(1)
with pytest.raises(ValueError, match='boom'):
f.add_callback(lambda v: _raise(ValueError('boom')))
Loading