diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 26f243735..6f33ad6cb 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -662,13 +662,17 @@ def commit(self, offsets=None, timeout_ms=None): self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms) def group_metadata(self): - """Return a snapshot of this consumer's group membership (KIP-447). + """Return a snapshot of this consumer's group membership. Pass the result to KafkaProducer.send_offsets_to_transaction() so the broker can fence stale instances of this consumer when committing - offsets inside a transaction. The snapshot is always safe to call: - if no group_id is configured (manual assignment) the returned - ConsumerGroupMetadata has group_id=None. + offsets inside a transaction (KIP-447). The snapshot also exposes the + current MemberState (``state``), so callers can observe whether the + consumer has converged on a stable assignment. + + The snapshot is always safe to call: if no group_id is configured + (manual assignment) the returned ConsumerGroupMetadata has + group_id=None and is permanently unjoined. Returns: ConsumerGroupMetadata diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 074e29a7f..1e20f0d26 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -16,19 +16,13 @@ HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, ) -from kafka.structs import ConsumerGroupMetadata +from kafka.structs import ConsumerGroupMetadata, MemberState from kafka.util import Timer log = logging.getLogger('kafka.coordinator') heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat') -class MemberState: - UNJOINED = '' # the client is not part of a group - REBALANCING = '' # the client has begun rebalancing - STABLE = '' # the client has joined and is sending heartbeats - - class Generation: def __init__(self, generation_id, member_id, protocol): self.generation_id = generation_id @@ -891,13 +885,16 @@ def generation_if_stable(self): return self._generation def group_metadata(self): - """Return a snapshot of this member's group identity (KIP-447). + """Return a snapshot of this member's group membership. Returns the current generation_id / member_id / group_instance_id even when the group is not stable; the caller (typically KafkaProducer.send_offsets_to_transaction) needs whatever is current - so the broker can fence stale instances. If the consumer has never - joined, the snapshot has the no-generation defaults. + so the broker can fence stale instances (KIP-447). If the consumer has + never joined, the snapshot has the no-generation defaults. + + Also carries the live MemberState (``state``) so callers can observe + whether the group has converged (it is ignored by the fencing path). """ with self._lock: return ConsumerGroupMetadata( @@ -905,6 +902,7 @@ def group_metadata(self): generation_id=self._generation.generation_id, member_id=self._generation.member_id, group_instance_id=self.group_instance_id, + state=self.state, ) # deprecated diff --git a/kafka/structs.py b/kafka/structs.py index a6b590e97..cdb509722 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -51,19 +51,33 @@ """ +class MemberState: + UNJOINED = '' # the client is not part of a group + REBALANCING = '' # the client has begun rebalancing + STABLE = '' # the client has joined and is sending heartbeats + + ConsumerGroupMetadata = namedtuple("ConsumerGroupMetadata", - ["group_id", "generation_id", "member_id", "group_instance_id"], - defaults=[-1, '', None]) -ConsumerGroupMetadata.__doc__ = """A snapshot of a consumer's group membership (KIP-447). + ["group_id", "generation_id", "member_id", "group_instance_id", "state"], + defaults=[None, -1, '', None, MemberState.UNJOINED]) +ConsumerGroupMetadata.__doc__ = """A snapshot of a consumer's group membership. + +The first four fields are the KIP-447 fencing identity: pass the snapshot to +KafkaProducer.send_offsets_to_transaction() so the broker can fence stale +consumer instances when committing offsets inside a transaction. The broker +uses member_id + generation_id + group_instance_id to verify the producer is +acting on behalf of the current group generation. -Passed to KafkaProducer.send_offsets_to_transaction() so the broker can fence -stale consumer instances when committing offsets inside a transaction. The -broker uses member_id + generation_id + group_instance_id to verify the -producer is acting on behalf of the current group generation. +The ``state`` field exposes the live MemberState (it is ignored by the +producer/fencing path). It lets callers observe whether the consumer has +converged on a stable assignment - useful for monitoring and for tests that +wait for a group to finish rebalancing. Keyword Arguments: - group_id (str): The consumer group id. + group_id (str): The consumer group id, or None for manual assignment. generation_id (int): The current generation id (-1 if unjoined). member_id (str): The current member id ('' if unjoined). group_instance_id (str): The static membership instance id, or None. + state (str): The current MemberState (one of MemberState.UNJOINED, + MemberState.REBALANCING, MemberState.STABLE). """ diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 4a94aaedc..97082a3ac 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -76,10 +76,13 @@ def test_group_metadata_unjoined(coordinator): assert gm.member_id == '' # UNKNOWN_MEMBER_ID is '' assert gm.generation_id == -1 # DEFAULT_GENERATION_ID assert gm.group_instance_id is None + # Live coordination state: unjoined. + assert gm.state == MemberState.UNJOINED def test_group_metadata_after_join(coordinator): - """After joining, group_metadata() reflects the live generation.""" + """After joining, group_metadata() reflects the live generation and the + MemberState.""" coordinator._generation = Generation(generation_id=42, member_id='mbr-1', protocol='range') @@ -89,11 +92,15 @@ def test_group_metadata_after_join(coordinator): assert gm.member_id == 'mbr-1' # group_instance_id comes from config (None by default for this fixture). assert gm.group_instance_id is None + assert gm.state == MemberState.STABLE # Still returns the snapshot even while rebalancing - the producer needs - # *something* to send and the broker handles fencing. + # *something* to send and the broker handles fencing. The state field + # tracks the in-progress (re)join. coordinator.state = MemberState.REBALANCING - assert coordinator.group_metadata().generation_id == 42 + gm = coordinator.group_metadata() + assert gm.generation_id == 42 + assert gm.state == MemberState.REBALANCING def test_group_protocols(coordinator):