Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,17 @@ def commit(self, offsets=None, timeout_ms=None):
self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms)

def group_metadata(self):
"""Return a snapshot of this consumer's group membership (KIP-447).
"""Return a snapshot of this consumer's group membership.

Pass the result to KafkaProducer.send_offsets_to_transaction() so the
broker can fence stale instances of this consumer when committing
offsets inside a transaction. The snapshot is always safe to call:
if no group_id is configured (manual assignment) the returned
ConsumerGroupMetadata has group_id=None.
offsets inside a transaction (KIP-447). The snapshot also exposes the
current MemberState (``state``), so callers can observe whether the
consumer has converged on a stable assignment.

The snapshot is always safe to call: if no group_id is configured
(manual assignment) the returned ConsumerGroupMetadata has
group_id=None and is permanently unjoined.

Returns:
ConsumerGroupMetadata
Expand Down
18 changes: 8 additions & 10 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest,
DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID,
)
from kafka.structs import ConsumerGroupMetadata
from kafka.structs import ConsumerGroupMetadata, MemberState
from kafka.util import Timer

log = logging.getLogger('kafka.coordinator')
heartbeat_log = logging.getLogger('kafka.coordinator.heartbeat')


class MemberState:
UNJOINED = '<unjoined>' # the client is not part of a group
REBALANCING = '<rebalancing>' # the client has begun rebalancing
STABLE = '<stable>' # the client has joined and is sending heartbeats


class Generation:
def __init__(self, generation_id, member_id, protocol):
self.generation_id = generation_id
Expand Down Expand Up @@ -891,20 +885,24 @@ def generation_if_stable(self):
return self._generation

def group_metadata(self):
"""Return a snapshot of this member's group identity (KIP-447).
"""Return a snapshot of this member's group membership.

Returns the current generation_id / member_id / group_instance_id even
when the group is not stable; the caller (typically
KafkaProducer.send_offsets_to_transaction) needs whatever is current
so the broker can fence stale instances. If the consumer has never
joined, the snapshot has the no-generation defaults.
so the broker can fence stale instances (KIP-447). If the consumer has
never joined, the snapshot has the no-generation defaults.

Also carries the live MemberState (``state``) so callers can observe
whether the group has converged (it is ignored by the fencing path).
"""
with self._lock:
return ConsumerGroupMetadata(
group_id=self.group_id,
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
state=self.state,
)

# deprecated
Expand Down
30 changes: 22 additions & 8 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,33 @@
"""


class MemberState:
UNJOINED = '<unjoined>' # the client is not part of a group
REBALANCING = '<rebalancing>' # the client has begun rebalancing
STABLE = '<stable>' # the client has joined and is sending heartbeats


ConsumerGroupMetadata = namedtuple("ConsumerGroupMetadata",
["group_id", "generation_id", "member_id", "group_instance_id"],
defaults=[-1, '', None])
ConsumerGroupMetadata.__doc__ = """A snapshot of a consumer's group membership (KIP-447).
["group_id", "generation_id", "member_id", "group_instance_id", "state"],
defaults=[None, -1, '', None, MemberState.UNJOINED])
ConsumerGroupMetadata.__doc__ = """A snapshot of a consumer's group membership.

The first four fields are the KIP-447 fencing identity: pass the snapshot to
KafkaProducer.send_offsets_to_transaction() so the broker can fence stale
consumer instances when committing offsets inside a transaction. The broker
uses member_id + generation_id + group_instance_id to verify the producer is
acting on behalf of the current group generation.

Passed to KafkaProducer.send_offsets_to_transaction() so the broker can fence
stale consumer instances when committing offsets inside a transaction. The
broker uses member_id + generation_id + group_instance_id to verify the
producer is acting on behalf of the current group generation.
The ``state`` field exposes the live MemberState (it is ignored by the
producer/fencing path). It lets callers observe whether the consumer has
converged on a stable assignment - useful for monitoring and for tests that
wait for a group to finish rebalancing.

Keyword Arguments:
group_id (str): The consumer group id.
group_id (str): The consumer group id, or None for manual assignment.
generation_id (int): The current generation id (-1 if unjoined).
member_id (str): The current member id ('' if unjoined).
group_instance_id (str): The static membership instance id, or None.
state (str): The current MemberState (one of MemberState.UNJOINED,
MemberState.REBALANCING, MemberState.STABLE).
"""
13 changes: 10 additions & 3 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ def test_group_metadata_unjoined(coordinator):
assert gm.member_id == '' # UNKNOWN_MEMBER_ID is ''
assert gm.generation_id == -1 # DEFAULT_GENERATION_ID
assert gm.group_instance_id is None
# Live coordination state: unjoined.
assert gm.state == MemberState.UNJOINED


def test_group_metadata_after_join(coordinator):
"""After joining, group_metadata() reflects the live generation."""
"""After joining, group_metadata() reflects the live generation and the
MemberState."""
coordinator._generation = Generation(generation_id=42,
member_id='mbr-1',
protocol='range')
Expand All @@ -89,11 +92,15 @@ def test_group_metadata_after_join(coordinator):
assert gm.member_id == 'mbr-1'
# group_instance_id comes from config (None by default for this fixture).
assert gm.group_instance_id is None
assert gm.state == MemberState.STABLE

# Still returns the snapshot even while rebalancing - the producer needs
# *something* to send and the broker handles fencing.
# *something* to send and the broker handles fencing. The state field
# tracks the in-progress (re)join.
coordinator.state = MemberState.REBALANCING
assert coordinator.group_metadata().generation_id == 42
gm = coordinator.group_metadata()
assert gm.generation_id == 42
assert gm.state == MemberState.REBALANCING


def test_group_protocols(coordinator):
Expand Down
Loading