Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2587,3 +2587,178 @@ def ok_heartbeat(api_key, api_version, correlation_id, request_bytes):
coordinator.close(timeout_ms=0)
client.close()
manager.close()


def _apply_topic_metadata(cluster, topic, num_partitions, node_id=0):
"""Build a MetadataResponse advertising `topic` with `num_partitions` and
apply it to `cluster`, firing its metadata-update listeners (which includes
the coordinator's _handle_metadata_update)."""
Broker = MetadataResponse.MetadataResponseBroker
Topic = MetadataResponse.MetadataResponseTopic
Partition = Topic.MetadataResponsePartition
response = MetadataResponse(
version=8, throttle_time_ms=0,
brokers=[Broker(node_id=node_id, host='localhost', port=9092, rack=None, version=8)],
cluster_id='mock-cluster', controller_id=node_id,
topics=[Topic(version=8, error_code=0, name=topic, is_internal=False,
partitions=[
Partition(version=8, error_code=0, partition_index=p,
leader_id=node_id, leader_epoch=0,
replica_nodes=[node_id], isr_nodes=[node_id],
offline_replicas=[])
for p in range(num_partitions)])])
# Round-trip through the wire so every nested container carries _version,
# the same shape update_metadata sees from a real MetadataResponse.
cluster.update_metadata(MetadataResponse.decode(response.encode(), version=8))


def _single_member(topic, member_id='member-1'):
return [JoinGroupResponse.JoinGroupResponseMember(
member_id=member_id,
metadata=ConsumerProtocolSubscription(0, [topic], b'').encode())]


def test_leader_rejoins_when_metadata_changes_after_assignment(coordinator):
"""Once the leader has assigned partitions, a metadata change that alters
the subscribed topic's partition count must flip need_rejoin()
to True so the change is not silently lost."""
coordinator._subscription.subscribe(topics=['t'])
# Topic has 1 partition when the leader computes the assignment.
_apply_topic_metadata(coordinator._cluster, 't', num_partitions=1)
assert coordinator._metadata_snapshot == {'t': {0}}

# Perform assignment -> becomes leader and snapshots the metadata used.
coordinator._perform_assignment('member-1', 'range', _single_member('t'))
assert coordinator._is_leader
assert coordinator._assignment_snapshot == {'t': {0}}

# Simulate the rest of a completed join (the leader keeps its snapshot).
coordinator._joined_subscription = {'t'}
coordinator._generation = Generation(1, 'member-1', 'range')
coordinator.state = MemberState.STABLE
coordinator.rejoin_needed = False
assert not coordinator.need_rejoin()

# Topic grows to 2 partitions while the member is assigned. The metadata
# listener updates _metadata_snapshot; the assignment snapshot is now stale.
_apply_topic_metadata(coordinator._cluster, 't', num_partitions=2)
assert coordinator._metadata_snapshot == {'t': {0, 1}}
assert coordinator._assignment_snapshot == {'t': {0}}
assert coordinator.need_rejoin()


def test_follower_does_not_rejoin_on_metadata_change(coordinator):
"""Only the leader watches metadata for partition changes. A follower
(assignment_snapshot is None, cleared in _on_join_complete) must not rejoin
just because the topic's partition count changed."""
coordinator._subscription.subscribe(topics=['t'])
_apply_topic_metadata(coordinator._cluster, 't', num_partitions=1)

# Simulate a completed join as a follower: snapshot cleared, joined.
coordinator._assignment_snapshot = None
coordinator._is_leader = False
coordinator._joined_subscription = {'t'}
coordinator._generation = Generation(1, 'member-1', 'range')
coordinator.state = MemberState.STABLE
coordinator.rejoin_needed = False
assert not coordinator.need_rejoin()

# Metadata changes -- a follower must not be driven to rejoin by it.
_apply_topic_metadata(coordinator._cluster, 't', num_partitions=2)
assert coordinator._metadata_snapshot == {'t': {0, 1}}
assert not coordinator.need_rejoin()


def _metadata_topic(name, num_partitions, node_id=0):
"""A MetadataResponseTopic for MockCluster.set_metadata(topics=[...])."""
Topic = MetadataResponse.MetadataResponseTopic
Partition = Topic.MetadataResponsePartition
return Topic(version=8, error_code=0, name=name, is_internal=False,
partitions=[
Partition(version=8, error_code=0, partition_index=p,
leader_id=node_id, leader_epoch=0,
replica_nodes=[node_id], isr_nodes=[node_id],
offline_replicas=[])
for p in range(num_partitions)])


def _group_coordinator(net, metrics, mock_cluster, **configs):
"""Build a ConsumerCoordinator wired to a MockCluster, bootstrapped."""
manager = KafkaConnectionManager(
net, bootstrap_servers=mock_cluster.bootstrap_servers(),
api_version=mock_cluster.broker_version, request_timeout_ms=5000)
mock_cluster.attach(manager)
client = KafkaNetClient(net=net, manager=manager)
coordinator = ConsumerCoordinator(
client, SubscriptionState(), metrics=metrics,
api_version=mock_cluster.broker_version,
heartbeat_interval_ms=20, retry_backoff_ms=20, **configs)
manager.bootstrap(timeout_ms=5000)
return coordinator, manager, client


def test_mock_group_single_member_join_assigns_all_partitions(net, metrics):
"""Smoke test for MockCluster.add_group: a single consumer joins the group,
is elected leader, runs the assignor, and is assigned every partition of
the subscribed topic."""
mock_cluster = MockCluster(num_brokers=1)
mock_cluster.set_metadata(topics=[_metadata_topic('t', num_partitions=3)])
group = mock_cluster.add_group('my-group', coordinator=0)
coordinator, manager, client = _group_coordinator(
net, metrics, mock_cluster, group_id='my-group')
try:
coordinator._subscription.subscribe(topics=['t'])
# Ensure the cluster has the topic metadata the assignor needs.
client.cluster.request_update()

assert coordinator.ensure_active_group(timeout_ms=5000)

assert coordinator._subscription.assigned_partitions() == {
TopicPartition('t', 0), TopicPartition('t', 1), TopicPartition('t', 2)}
assert coordinator._is_leader
assert len(group.members) == 1
assert group.generation == 1
finally:
coordinator._close_heartbeat()
coordinator.reset_generation()
coordinator.close(timeout_ms=0)
client.close()
manager.close()


def test_metadata_growth_triggers_rejoin_end_to_end(net, metrics):
"""KAFKA-3949 end-to-end: a consumer is the group leader with a 1-partition
topic; the topic grows to 2 partitions; a metadata refresh must drive a
rejoin so the consumer ends up assigned both partitions (the change is not
lost)."""
mock_cluster = MockCluster(num_brokers=1)
mock_cluster.set_metadata(topics=[_metadata_topic('t', num_partitions=1)])
group = mock_cluster.add_group('my-group', coordinator=0)
coordinator, manager, client = _group_coordinator(
net, metrics, mock_cluster, group_id='my-group')
try:
coordinator._subscription.subscribe(topics=['t'])
assert coordinator.ensure_active_group(timeout_ms=5000)
assert coordinator._subscription.assigned_partitions() == {TopicPartition('t', 0)}
assert group.generation == 1

# The topic grows to 2 partitions. Refresh metadata so the coordinator's
# listener sees the change (the racing metadata update from KAFKA-3949).
mock_cluster.set_metadata(topics=[_metadata_topic('t', num_partitions=2)])
future = client.cluster.request_update()
client.poll(future=future, timeout_ms=5000)

# The leader's assignment snapshot is now stale -> must rejoin.
assert coordinator.need_rejoin()

# Rejoin picks up the new partition; the change was not lost.
assert coordinator.ensure_active_group(timeout_ms=5000)
assert coordinator._subscription.assigned_partitions() == {
TopicPartition('t', 0), TopicPartition('t', 1)}
assert group.generation == 2
finally:
coordinator._close_heartbeat()
coordinator.reset_generation()
coordinator.close(timeout_ms=0)
client.close()
manager.close()
135 changes: 135 additions & 0 deletions test/mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
import kafka.errors as Errors
from kafka.protocol.admin import DescribeClusterRequest, DescribeClusterResponse
from kafka.protocol.broker_version_data import BrokerVersionData
from kafka.protocol.consumer import (
HeartbeatRequest, HeartbeatResponse,
JoinGroupRequest, JoinGroupResponse,
LeaveGroupRequest, LeaveGroupResponse,
SyncGroupRequest, SyncGroupResponse,
)
from kafka.protocol.metadata import (
ApiVersionsRequest, ApiVersionsResponse, CoordinatorType,
FindCoordinatorRequest, FindCoordinatorResponse,
Expand Down Expand Up @@ -574,6 +580,7 @@ def __init__(self, num_brokers=3, broker_version=(4, 2), host='localhost', base_
]
self._by_addr = {(b.host, b.port): b for b in self.brokers}
self._coordinators = {}
self._groups = {}
self.set_metadata()

def __getitem__(self, node_id):
Expand Down Expand Up @@ -658,6 +665,25 @@ def _handle_find_coordinator(self, api_key, api_version, correlation_id, request
node_id=first.node_id, host=first.host, port=first.port,
coordinators=coordinators)

def add_group(self, group_id, coordinator=0):
"""Install a minimal classic-protocol group coordinator on a broker.

Points ``FindCoordinator`` for ``group_id`` at the ``coordinator``
broker (via :meth:`set_coordinator`) and registers a :class:`MockGroup`
there that auto-answers JoinGroup / SyncGroup / Heartbeat / LeaveGroup,
so a real consumer can complete a rebalance without the test scripting
each response. Returns the :class:`MockGroup` for inspection and
rebalance control (e.g. ``group.trigger_rebalance()``).

Arguments:
group_id (str): consumer group id.
coordinator (int): index of the coordinator broker. Default 0.
"""
self.set_coordinator(group_id, coordinator)
group = MockGroup(self.brokers[coordinator], group_id)
self._groups[group_id] = group
return group

def attach(self, manager):
"""Monkey-patch a KafkaConnectionManager to route each new connection
to the cluster member matching the target node's (host, port).
Expand Down Expand Up @@ -700,3 +726,112 @@ def factory(**kwargs):
return client

return factory


def _as_bytes(value):
"""Coerce a decoded metadata/assignment field (bytes or memoryview) to bytes."""
if isinstance(value, bytes):
return value
return bytes(value)


class MockGroup:
"""Minimal classic-protocol consumer-group coordinator for a MockBroker.

Auto-answers JoinGroup / SyncGroup / Heartbeat / LeaveGroup so a real
consumer (or ConsumerCoordinator) can complete a rebalance against a mock
without the test scripting each response. Models the common case:

- The first member to join (or whoever sent a non-empty member_id) is
elected leader; only the leader receives the full member list so it can
run the assignor.
- SyncGroup routes the leader-computed assignment back to each member --
the broker just echoes ``assignments[member_id]`` from the request.
- Heartbeat returns NoError, unless :meth:`trigger_rebalance` armed a
one-shot RebalanceInProgress to drive a broker-initiated rejoin.

This is deliberately not a full broker: no session-timeout eviction, no
generation fencing beyond what the handlers below do, classic
JoinGroup/SyncGroup only (not KIP-848). Use :attr:`members` /
:attr:`generation` / :attr:`leader` for assertions.

Typically created via :meth:`MockCluster.add_group`.
"""

def __init__(self, broker, group_id):
self.broker = broker
self.group_id = group_id
self.generation = 0
self.members = {} # member_id -> subscription metadata bytes
self.leader = None
self._next_member_id = 0
self._force_rebalance = False
broker.respond_always(JoinGroupRequest, self._on_join)
broker.respond_always(SyncGroupRequest, self._on_sync)
broker.respond_always(HeartbeatRequest, self._on_heartbeat)
broker.respond_always(LeaveGroupRequest, self._on_leave)

def trigger_rebalance(self):
"""Arm a one-shot RebalanceInProgress on the next Heartbeat, forcing
the consumer to rejoin (a broker-initiated rebalance)."""
self._force_rebalance = True

def _on_join(self, api_key, api_version, correlation_id, request_bytes):
request = JoinGroupRequest.decode(request_bytes, version=api_version, header=True)
member_id = request.member_id
if not member_id:
member_id = '%s-member-%d' % (self.group_id, self._next_member_id)
self._next_member_id += 1
# Record/refresh this member's subscription (first supported protocol).
protocol = request.protocols[0]
self.members[member_id] = _as_bytes(protocol.metadata)
if self.leader is None or self.leader not in self.members:
self.leader = member_id
self.generation += 1

Member = JoinGroupResponse.JoinGroupResponseMember
# Only the leader needs the member list (to compute the assignment).
members = []
if member_id == self.leader:
members = [Member(member_id=m, group_instance_id=None, metadata=md)
for m, md in self.members.items()]
return JoinGroupResponse(
throttle_time_ms=0, error_code=0,
generation_id=self.generation,
protocol_type='consumer',
protocol_name=protocol.name,
leader=self.leader,
member_id=member_id,
members=members)

def _on_sync(self, api_key, api_version, correlation_id, request_bytes):
request = SyncGroupRequest.decode(request_bytes, version=api_version, header=True)
# The leader carries every member's assignment; a follower sends none.
# Route this member's slice back (empty if not found).
assignment = b''
for entry in request.assignments:
if entry.member_id == request.member_id:
assignment = _as_bytes(entry.assignment)
break
return SyncGroupResponse(
throttle_time_ms=0, error_code=0,
protocol_type='consumer',
protocol_name=request.protocol_name,
assignment=assignment)

def _on_heartbeat(self, api_key, api_version, correlation_id, request_bytes):
if self._force_rebalance:
self._force_rebalance = False
return HeartbeatResponse(
throttle_time_ms=0,
error_code=Errors.RebalanceInProgressError.errno)
return HeartbeatResponse(throttle_time_ms=0, error_code=0)

def _on_leave(self, api_key, api_version, correlation_id, request_bytes):
request = LeaveGroupRequest.decode(request_bytes, version=api_version, header=True)
Member = LeaveGroupResponse.MemberResponse
members = []
for m in getattr(request, 'members', None) or []:
members.append(Member(member_id=m.member_id, error_code=0))
self.members.pop(m.member_id, None)
return LeaveGroupResponse(throttle_time_ms=0, error_code=0, members=members)
Loading
Loading