Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/apidoc/misc/ConsumerRecord.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ConsumerRecord
==============

.. autoclass:: kafka.consumer.fetcher.ConsumerRecord
:members:
6 changes: 6 additions & 0 deletions docs/apidoc/misc/FutureRecordMetadata.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FutureRecordMetadata
====================

.. autoclass:: kafka.producer.future.FutureRecordMetadata
:members:
:exclude-members: rebind
5 changes: 5 additions & 0 deletions docs/apidoc/misc/RecordMetadata.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
RecordMetadata
==============

.. autoclass:: kafka.producer.future.RecordMetadata
:members:
70 changes: 57 additions & 13 deletions docs/apidoc/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,76 @@ with Apache Kafka. The following sections group them by role; each class
links to its own API reference page.


kafka
=====

Application code typically interacts with one of three top-level clients.
Each owns a background IO thread and a shared async networking layer.
The sections below group each client with the data types and extension
points specific to it.


kafka.consumer
==============

The high-level, group-aware message consumer and the data types it
produces.

- :class:`~kafka.KafkaConsumer` - high-level, group-aware message consumer.
Iterable, with manual or automatic offset commits, cooperative rebalance,
pluggable deserializers, and transactional-read isolation.
- :class:`~kafka.consumer.fetcher.ConsumerRecord` - a single record
consumed from a topic partition, as yielded by
:meth:`~kafka.KafkaConsumer.poll` and consumer iteration.
- :class:`~kafka.ConsumerRebalanceListener` - base class for receiving
partition join/revoke callbacks during a group rebalance. Also includes
the async interface :class:`~kafka.AsyncConsumerRebalanceListener`.

.. toctree::
:maxdepth: 1
:hidden:

KafkaConsumer <KafkaConsumer>
ConsumerRecord <misc/ConsumerRecord>
ConsumerRebalanceListener <misc/ConsumerRebalanceListener>


kafka.producer
==============

The high-level, asynchronous message producer and the data types and
extension points specific to it.

- :class:`~kafka.KafkaProducer` - high-level, asynchronous message producer.
Batches records into a background sender thread, with optional
idempotence, transactions, compression, and pluggable serializers.
- :class:`~kafka.producer.future.FutureRecordMetadata` - asynchronous
handle returned by :meth:`~kafka.KafkaProducer.send`; resolves to a
``RecordMetadata`` once the record is acknowledged.
- :class:`~kafka.producer.future.RecordMetadata` - metadata about a
produced record after the broker has acknowledged it.
- :class:`~kafka.partitioner.Partitioner` - base class for pluggable
partition selection; controls which partition a record is assigned to.

.. toctree::
:maxdepth: 1
:hidden:

KafkaProducer <KafkaProducer>
FutureRecordMetadata <misc/FutureRecordMetadata>
RecordMetadata <misc/RecordMetadata>
Partitioner <misc/Partitioner>


kafka.admin
===========

Cluster administration operations.

- :class:`~kafka.KafkaAdminClient` - admin operations: topic, ACL, config,
consumer group, partition, quota, log-directory, and quorum management.

.. toctree::
:maxdepth: 1
:hidden:

KafkaConsumer <KafkaConsumer>
KafkaProducer <KafkaProducer>
KafkaAdminClient <KafkaAdminClient>


Expand Down Expand Up @@ -68,7 +117,7 @@ driving the protocol layer directly from the REPL.
other / misc
============

Lightweight data types used throughout the client APIs (and useful when
Lightweight data types shared across the clients (and useful when
working with the lower-level protocol layer).

- :class:`~kafka.cluster.ClusterMetadata` - in-memory cache of brokers,
Expand All @@ -81,11 +130,8 @@ working with the lower-level protocol layer).
- :class:`~kafka.OffsetSpec` - enum for partition offset queries.
- :class:`~kafka.IsolationLevel` - enum for transactional isolation.
- :class:`~kafka.Serializer` - base class for serialization / deserialization
of key and value bytes. Includes helper classes `~kafka.DefaultSerializer`
and `~kafka.JsonSerializer`.
- :class:`~kafka.ConsumerRebalanceListener` - base class for consumer
class to receive join/rebalance group hooks. Also includes async
interface `~kafka.AsyncConsumerRebalanceListener`.
of key and value bytes. Includes helper classes :class:`~kafka.DefaultSerializer`
and :class:`~kafka.JsonSerializer`.

.. toctree::
:maxdepth: 1
Expand All @@ -97,5 +143,3 @@ working with the lower-level protocol layer).
OffsetSpec <misc/OffsetSpec>
IsolationLevel <misc/IsolationLevel>
Serializer <misc/Serializer>
ConsumerRebalanceListener <misc/ConsumerRebalanceListener>
Partitioner <misc/Partitioner>
30 changes: 30 additions & 0 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,36 @@
ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type",
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
ConsumerRecord.__doc__ = """A single record (message) consumed from a topic partition.

Yielded by :meth:`~kafka.KafkaConsumer.poll` (inside the returned
``{TopicPartition: [ConsumerRecord, ...]}`` mapping) and by iterating
over a :class:`~kafka.KafkaConsumer`. ``key`` and ``value`` are decoded
by the consumer's configured deserializers.

Keyword Arguments:
topic (str): The topic this record was received from.
partition (int): The partition this record was received from.
leader_epoch (int): The partition leader epoch for this record, or -1
if unknown.
offset (int): The position of this record in the topic partition.
timestamp (int): The timestamp of this record, in milliseconds since
the epoch (UTC), or -1 if unknown.
timestamp_type (int): The type of the timestamp: 0 for CreateTime (set
by the producer) or 1 for LogAppendTime (set by the broker).
key: The (deserialized) key of the record, or None.
value: The (deserialized) value of the record, or None.
headers (list): A list of ``(key, value)`` header tuples, where key is
a str and value is bytes.
checksum (int): Deprecated. The CRC32 checksum of the record, or None.
Removed in message format v2 (Kafka 0.11+).
serialized_key_size (int): The size of the serialized, uncompressed key
in bytes, or -1 if the key is None.
serialized_value_size (int): The size of the serialized, uncompressed
value in bytes, or -1 if the value is None.
serialized_header_size (int): The size of the serialized, uncompressed
headers in bytes, or -1 if there are no headers.
"""


CompletedFetch = collections.namedtuple("CompletedFetch",
Expand Down
44 changes: 44 additions & 0 deletions kafka/partitioner/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,51 @@


class Partitioner(ABC):
"""Base class for pluggable partition selection strategies.

A :class:`~kafka.KafkaProducer` consults its configured partitioner to
choose a partition for each record whose ``partition`` was not supplied
explicitly to :meth:`~kafka.KafkaProducer.send`. Subclass this and
implement :meth:`partition`, then pass an instance via the
``partitioner`` config argument.

Two built-in implementations are provided: ``DefaultPartitioner``
(murmur2-hash keyed records, random partition for null keys) and
``StickyPartitioner`` (KIP-480; null-key records stick to one partition
per topic until a new batch is started, then rotate).

Sticky-style partitioners may additionally implement an optional
``on_new_batch(self, topic, cluster, prev_partition)`` hook, which the
producer calls when a null-key record would have triggered a fresh
batch, giving the partitioner a chance to rotate off its current sticky
choice. The hook is looked up with ``getattr``, so it is entirely
optional.
"""

@abstractmethod
def partition(self, topic, key, serialized_key, value, serialized_value, cluster):
"""Choose a partition for a record.

Arguments:
topic (str): The topic the record is destined for.
key: The user-supplied key, before serialization. May be None.
serialized_key (bytes): The post-serializer key bytes, or None
when the caller passed ``key=None``.
value: The user-supplied value, before serialization.
serialized_value (bytes): The post-serializer value bytes, or
None when the caller passed ``value=None``.
cluster (ClusterMetadata): A live cluster snapshot. Use
``cluster.partitions_for_topic(topic)`` for all partitions
and ``cluster.available_partitions_for_topic(topic)`` for
those whose leader is currently known.

Returns:
int: The partition id to assign the record to.

Raises:
ValueError: If the topic is not present in cluster metadata.
Partitioner exceptions surface to the caller via the
returned :class:`~kafka.producer.future.FutureRecordMetadata`.
"""
pass

38 changes: 38 additions & 0 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ def wait(self, timeout=None):


class FutureRecordMetadata(Future):
"""An asynchronous handle to the result of a single :meth:`~kafka.KafkaProducer.send`.

:meth:`~kafka.KafkaProducer.send` returns one of these immediately,
before the record has been transmitted to the broker. Call :meth:`get`
to block until the record is acknowledged and obtain its
:class:`RecordMetadata`, or register callbacks via
:meth:`~kafka.future.Future.add_callback` /
:meth:`~kafka.future.Future.add_errback` to be notified without
blocking. The future resolves successfully once the containing batch is
acknowledged according to the producer's ``acks`` configuration, or
fails with the relevant exception (for example
:class:`~kafka.errors.KafkaTimeoutError`).
"""
__slots__ = ('_produce_future', 'args')
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super().__init__()
Expand Down Expand Up @@ -99,3 +112,28 @@ def get(self, timeout=None):
RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
RecordMetadata.__doc__ = """Metadata about a record that has been acknowledged by the broker.

Returned by :meth:`FutureRecordMetadata.get`, which resolves once the
batch containing the record has been acknowledged according to the
producer's ``acks`` configuration.

Keyword Arguments:
topic (str): The topic the record was appended to.
partition (int): The partition the record was appended to.
topic_partition (TopicPartition): The ``(topic, partition)`` the record
was appended to.
offset (int): The offset of the record in the topic partition, or -1 if
the broker did not assign one (e.g. ``acks=0``).
timestamp (int): The timestamp of the record, in milliseconds since the
epoch (UTC). For CreateTime this is the producer-supplied timestamp;
for LogAppendTime it is the broker-assigned timestamp.
checksum (int): Deprecated. The CRC32 checksum of the record, or None.
Removed in message format v2 (Kafka 0.11+).
serialized_key_size (int): The size of the serialized, uncompressed key
in bytes, or -1 if the key is None.
serialized_value_size (int): The size of the serialized, uncompressed
value in bytes, or -1 if the value is None.
serialized_header_size (int): The size of the serialized, uncompressed
headers in bytes, or -1 if there are no headers.
"""
Loading