From df6e7d24abb2179b3b844c731d5839d563fbae27 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 13 Jun 2026 11:22:02 -0700 Subject: [PATCH] docs: Add ConsumerRecord, RecordMetadata, and FutureRecordMetadata to docs --- docs/apidoc/misc/ConsumerRecord.rst | 5 ++ docs/apidoc/misc/FutureRecordMetadata.rst | 6 ++ docs/apidoc/misc/RecordMetadata.rst | 5 ++ docs/apidoc/modules.rst | 70 ++++++++++++++++++----- kafka/consumer/fetcher.py | 30 ++++++++++ kafka/partitioner/abc.py | 44 ++++++++++++++ kafka/producer/future.py | 38 ++++++++++++ 7 files changed, 185 insertions(+), 13 deletions(-) create mode 100644 docs/apidoc/misc/ConsumerRecord.rst create mode 100644 docs/apidoc/misc/FutureRecordMetadata.rst create mode 100644 docs/apidoc/misc/RecordMetadata.rst diff --git a/docs/apidoc/misc/ConsumerRecord.rst b/docs/apidoc/misc/ConsumerRecord.rst new file mode 100644 index 000000000..659c174b9 --- /dev/null +++ b/docs/apidoc/misc/ConsumerRecord.rst @@ -0,0 +1,5 @@ +ConsumerRecord +============== + +.. autoclass:: kafka.consumer.fetcher.ConsumerRecord + :members: diff --git a/docs/apidoc/misc/FutureRecordMetadata.rst b/docs/apidoc/misc/FutureRecordMetadata.rst new file mode 100644 index 000000000..4bf8970f1 --- /dev/null +++ b/docs/apidoc/misc/FutureRecordMetadata.rst @@ -0,0 +1,6 @@ +FutureRecordMetadata +==================== + +.. autoclass:: kafka.producer.future.FutureRecordMetadata + :members: + :exclude-members: rebind diff --git a/docs/apidoc/misc/RecordMetadata.rst b/docs/apidoc/misc/RecordMetadata.rst new file mode 100644 index 000000000..94a16f376 --- /dev/null +++ b/docs/apidoc/misc/RecordMetadata.rst @@ -0,0 +1,5 @@ +RecordMetadata +============== + +.. autoclass:: kafka.producer.future.RecordMetadata + :members: diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst index 38e65ea4b..f65b55647 100644 --- a/docs/apidoc/modules.rst +++ b/docs/apidoc/modules.rst @@ -6,18 +6,69 @@ with Apache Kafka. The following sections group them by role; each class links to its own API reference page. -kafka -===== - Application code typically interacts with one of three top-level clients. Each owns a background IO thread and a shared async networking layer. +The sections below group each client with the data types and extension +points specific to it. + + +kafka.consumer +============== + +The high-level, group-aware message consumer and the data types it +produces. - :class:`~kafka.KafkaConsumer` - high-level, group-aware message consumer. Iterable, with manual or automatic offset commits, cooperative rebalance, pluggable deserializers, and transactional-read isolation. +- :class:`~kafka.consumer.fetcher.ConsumerRecord` - a single record + consumed from a topic partition, as yielded by + :meth:`~kafka.KafkaConsumer.poll` and consumer iteration. +- :class:`~kafka.ConsumerRebalanceListener` - base class for receiving + partition join/revoke callbacks during a group rebalance. Also includes + the async interface :class:`~kafka.AsyncConsumerRebalanceListener`. + +.. toctree:: + :maxdepth: 1 + :hidden: + + KafkaConsumer + ConsumerRecord + ConsumerRebalanceListener + + +kafka.producer +============== + +The high-level, asynchronous message producer and the data types and +extension points specific to it. + - :class:`~kafka.KafkaProducer` - high-level, asynchronous message producer. Batches records into a background sender thread, with optional idempotence, transactions, compression, and pluggable serializers. +- :class:`~kafka.producer.future.FutureRecordMetadata` - asynchronous + handle returned by :meth:`~kafka.KafkaProducer.send`; resolves to a + ``RecordMetadata`` once the record is acknowledged. +- :class:`~kafka.producer.future.RecordMetadata` - metadata about a + produced record after the broker has acknowledged it. +- :class:`~kafka.partitioner.Partitioner` - base class for pluggable + partition selection; controls which partition a record is assigned to. + +.. toctree:: + :maxdepth: 1 + :hidden: + + KafkaProducer + FutureRecordMetadata + RecordMetadata + Partitioner + + +kafka.admin +=========== + +Cluster administration operations. + - :class:`~kafka.KafkaAdminClient` - admin operations: topic, ACL, config, consumer group, partition, quota, log-directory, and quorum management. @@ -25,8 +76,6 @@ Each owns a background IO thread and a shared async networking layer. :maxdepth: 1 :hidden: - KafkaConsumer - KafkaProducer KafkaAdminClient @@ -68,7 +117,7 @@ driving the protocol layer directly from the REPL. other / misc ============ -Lightweight data types used throughout the client APIs (and useful when +Lightweight data types shared across the clients (and useful when working with the lower-level protocol layer). - :class:`~kafka.cluster.ClusterMetadata` - in-memory cache of brokers, @@ -81,11 +130,8 @@ working with the lower-level protocol layer). - :class:`~kafka.OffsetSpec` - enum for partition offset queries. - :class:`~kafka.IsolationLevel` - enum for transactional isolation. - :class:`~kafka.Serializer` - base class for serialization / deserialization - of key and value bytes. Includes helper classes `~kafka.DefaultSerializer` - and `~kafka.JsonSerializer`. -- :class:`~kafka.ConsumerRebalanceListener` - base class for consumer - class to receive join/rebalance group hooks. Also includes async - interface `~kafka.AsyncConsumerRebalanceListener`. + of key and value bytes. Includes helper classes :class:`~kafka.DefaultSerializer` + and :class:`~kafka.JsonSerializer`. .. toctree:: :maxdepth: 1 @@ -97,5 +143,3 @@ working with the lower-level protocol layer). OffsetSpec IsolationLevel Serializer - ConsumerRebalanceListener - Partitioner diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c7ae376b5..ec7b76e4d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -26,6 +26,36 @@ ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type", "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) +ConsumerRecord.__doc__ = """A single record (message) consumed from a topic partition. + +Yielded by :meth:`~kafka.KafkaConsumer.poll` (inside the returned +``{TopicPartition: [ConsumerRecord, ...]}`` mapping) and by iterating +over a :class:`~kafka.KafkaConsumer`. ``key`` and ``value`` are decoded +by the consumer's configured deserializers. + +Keyword Arguments: + topic (str): The topic this record was received from. + partition (int): The partition this record was received from. + leader_epoch (int): The partition leader epoch for this record, or -1 + if unknown. + offset (int): The position of this record in the topic partition. + timestamp (int): The timestamp of this record, in milliseconds since + the epoch (UTC), or -1 if unknown. + timestamp_type (int): The type of the timestamp: 0 for CreateTime (set + by the producer) or 1 for LogAppendTime (set by the broker). + key: The (deserialized) key of the record, or None. + value: The (deserialized) value of the record, or None. + headers (list): A list of ``(key, value)`` header tuples, where key is + a str and value is bytes. + checksum (int): Deprecated. The CRC32 checksum of the record, or None. + Removed in message format v2 (Kafka 0.11+). + serialized_key_size (int): The size of the serialized, uncompressed key + in bytes, or -1 if the key is None. + serialized_value_size (int): The size of the serialized, uncompressed + value in bytes, or -1 if the value is None. + serialized_header_size (int): The size of the serialized, uncompressed + headers in bytes, or -1 if there are no headers. +""" CompletedFetch = collections.namedtuple("CompletedFetch", diff --git a/kafka/partitioner/abc.py b/kafka/partitioner/abc.py index efd070737..17cde6271 100644 --- a/kafka/partitioner/abc.py +++ b/kafka/partitioner/abc.py @@ -2,7 +2,51 @@ class Partitioner(ABC): + """Base class for pluggable partition selection strategies. + + A :class:`~kafka.KafkaProducer` consults its configured partitioner to + choose a partition for each record whose ``partition`` was not supplied + explicitly to :meth:`~kafka.KafkaProducer.send`. Subclass this and + implement :meth:`partition`, then pass an instance via the + ``partitioner`` config argument. + + Two built-in implementations are provided: ``DefaultPartitioner`` + (murmur2-hash keyed records, random partition for null keys) and + ``StickyPartitioner`` (KIP-480; null-key records stick to one partition + per topic until a new batch is started, then rotate). + + Sticky-style partitioners may additionally implement an optional + ``on_new_batch(self, topic, cluster, prev_partition)`` hook, which the + producer calls when a null-key record would have triggered a fresh + batch, giving the partitioner a chance to rotate off its current sticky + choice. The hook is looked up with ``getattr``, so it is entirely + optional. + """ + @abstractmethod def partition(self, topic, key, serialized_key, value, serialized_value, cluster): + """Choose a partition for a record. + + Arguments: + topic (str): The topic the record is destined for. + key: The user-supplied key, before serialization. May be None. + serialized_key (bytes): The post-serializer key bytes, or None + when the caller passed ``key=None``. + value: The user-supplied value, before serialization. + serialized_value (bytes): The post-serializer value bytes, or + None when the caller passed ``value=None``. + cluster (ClusterMetadata): A live cluster snapshot. Use + ``cluster.partitions_for_topic(topic)`` for all partitions + and ``cluster.available_partitions_for_topic(topic)`` for + those whose leader is currently known. + + Returns: + int: The partition id to assign the record to. + + Raises: + ValueError: If the topic is not present in cluster metadata. + Partitioner exceptions surface to the caller via the + returned :class:`~kafka.producer.future.FutureRecordMetadata`. + """ pass diff --git a/kafka/producer/future.py b/kafka/producer/future.py index dfc6b0c33..f04fa9632 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -30,6 +30,19 @@ def wait(self, timeout=None): class FutureRecordMetadata(Future): + """An asynchronous handle to the result of a single :meth:`~kafka.KafkaProducer.send`. + + :meth:`~kafka.KafkaProducer.send` returns one of these immediately, + before the record has been transmitted to the broker. Call :meth:`get` + to block until the record is acknowledged and obtain its + :class:`RecordMetadata`, or register callbacks via + :meth:`~kafka.future.Future.add_callback` / + :meth:`~kafka.future.Future.add_errback` to be notified without + blocking. The future resolves successfully once the containing batch is + acknowledged according to the producer's ``acks`` configuration, or + fails with the relevant exception (for example + :class:`~kafka.errors.KafkaTimeoutError`). + """ __slots__ = ('_produce_future', 'args') def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): super().__init__() @@ -99,3 +112,28 @@ def get(self, timeout=None): RecordMetadata = collections.namedtuple( 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) +RecordMetadata.__doc__ = """Metadata about a record that has been acknowledged by the broker. + +Returned by :meth:`FutureRecordMetadata.get`, which resolves once the +batch containing the record has been acknowledged according to the +producer's ``acks`` configuration. + +Keyword Arguments: + topic (str): The topic the record was appended to. + partition (int): The partition the record was appended to. + topic_partition (TopicPartition): The ``(topic, partition)`` the record + was appended to. + offset (int): The offset of the record in the topic partition, or -1 if + the broker did not assign one (e.g. ``acks=0``). + timestamp (int): The timestamp of the record, in milliseconds since the + epoch (UTC). For CreateTime this is the producer-supplied timestamp; + for LogAppendTime it is the broker-assigned timestamp. + checksum (int): Deprecated. The CRC32 checksum of the record, or None. + Removed in message format v2 (Kafka 0.11+). + serialized_key_size (int): The size of the serialized, uncompressed key + in bytes, or -1 if the key is None. + serialized_value_size (int): The size of the serialized, uncompressed + value in bytes, or -1 if the value is None. + serialized_header_size (int): The size of the serialized, uncompressed + headers in bytes, or -1 if there are no headers. +"""