From 766d8b92cb7438ed7b3261f0b8888fc5224ab88b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 13 Jun 2026 11:11:04 -0700 Subject: [PATCH] Add MockGroup and test coordinator rebalance / metadata snapshot changes --- test/consumer/test_coordinator.py | 175 ++++++++++++++++++++++++++++++ test/mock_broker.py | 135 +++++++++++++++++++++++ test/test_mock_broker.py | 64 +++++++++++ 3 files changed, 374 insertions(+) diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 38facda21..4a94aaedc 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -2587,3 +2587,178 @@ def ok_heartbeat(api_key, api_version, correlation_id, request_bytes): coordinator.close(timeout_ms=0) client.close() manager.close() + + +def _apply_topic_metadata(cluster, topic, num_partitions, node_id=0): + """Build a MetadataResponse advertising `topic` with `num_partitions` and + apply it to `cluster`, firing its metadata-update listeners (which includes + the coordinator's _handle_metadata_update).""" + Broker = MetadataResponse.MetadataResponseBroker + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + response = MetadataResponse( + version=8, throttle_time_ms=0, + brokers=[Broker(node_id=node_id, host='localhost', port=9092, rack=None, version=8)], + cluster_id='mock-cluster', controller_id=node_id, + topics=[Topic(version=8, error_code=0, name=topic, is_internal=False, + partitions=[ + Partition(version=8, error_code=0, partition_index=p, + leader_id=node_id, leader_epoch=0, + replica_nodes=[node_id], isr_nodes=[node_id], + offline_replicas=[]) + for p in range(num_partitions)])]) + # Round-trip through the wire so every nested container carries _version, + # the same shape update_metadata sees from a real MetadataResponse. + cluster.update_metadata(MetadataResponse.decode(response.encode(), version=8)) + + +def _single_member(topic, member_id='member-1'): + return [JoinGroupResponse.JoinGroupResponseMember( + member_id=member_id, + metadata=ConsumerProtocolSubscription(0, [topic], b'').encode())] + + +def test_leader_rejoins_when_metadata_changes_after_assignment(coordinator): + """Once the leader has assigned partitions, a metadata change that alters + the subscribed topic's partition count must flip need_rejoin() + to True so the change is not silently lost.""" + coordinator._subscription.subscribe(topics=['t']) + # Topic has 1 partition when the leader computes the assignment. + _apply_topic_metadata(coordinator._cluster, 't', num_partitions=1) + assert coordinator._metadata_snapshot == {'t': {0}} + + # Perform assignment -> becomes leader and snapshots the metadata used. + coordinator._perform_assignment('member-1', 'range', _single_member('t')) + assert coordinator._is_leader + assert coordinator._assignment_snapshot == {'t': {0}} + + # Simulate the rest of a completed join (the leader keeps its snapshot). + coordinator._joined_subscription = {'t'} + coordinator._generation = Generation(1, 'member-1', 'range') + coordinator.state = MemberState.STABLE + coordinator.rejoin_needed = False + assert not coordinator.need_rejoin() + + # Topic grows to 2 partitions while the member is assigned. The metadata + # listener updates _metadata_snapshot; the assignment snapshot is now stale. + _apply_topic_metadata(coordinator._cluster, 't', num_partitions=2) + assert coordinator._metadata_snapshot == {'t': {0, 1}} + assert coordinator._assignment_snapshot == {'t': {0}} + assert coordinator.need_rejoin() + + +def test_follower_does_not_rejoin_on_metadata_change(coordinator): + """Only the leader watches metadata for partition changes. A follower + (assignment_snapshot is None, cleared in _on_join_complete) must not rejoin + just because the topic's partition count changed.""" + coordinator._subscription.subscribe(topics=['t']) + _apply_topic_metadata(coordinator._cluster, 't', num_partitions=1) + + # Simulate a completed join as a follower: snapshot cleared, joined. + coordinator._assignment_snapshot = None + coordinator._is_leader = False + coordinator._joined_subscription = {'t'} + coordinator._generation = Generation(1, 'member-1', 'range') + coordinator.state = MemberState.STABLE + coordinator.rejoin_needed = False + assert not coordinator.need_rejoin() + + # Metadata changes -- a follower must not be driven to rejoin by it. + _apply_topic_metadata(coordinator._cluster, 't', num_partitions=2) + assert coordinator._metadata_snapshot == {'t': {0, 1}} + assert not coordinator.need_rejoin() + + +def _metadata_topic(name, num_partitions, node_id=0): + """A MetadataResponseTopic for MockCluster.set_metadata(topics=[...]).""" + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + return Topic(version=8, error_code=0, name=name, is_internal=False, + partitions=[ + Partition(version=8, error_code=0, partition_index=p, + leader_id=node_id, leader_epoch=0, + replica_nodes=[node_id], isr_nodes=[node_id], + offline_replicas=[]) + for p in range(num_partitions)]) + + +def _group_coordinator(net, metrics, mock_cluster, **configs): + """Build a ConsumerCoordinator wired to a MockCluster, bootstrapped.""" + manager = KafkaConnectionManager( + net, bootstrap_servers=mock_cluster.bootstrap_servers(), + api_version=mock_cluster.broker_version, request_timeout_ms=5000) + mock_cluster.attach(manager) + client = KafkaNetClient(net=net, manager=manager) + coordinator = ConsumerCoordinator( + client, SubscriptionState(), metrics=metrics, + api_version=mock_cluster.broker_version, + heartbeat_interval_ms=20, retry_backoff_ms=20, **configs) + manager.bootstrap(timeout_ms=5000) + return coordinator, manager, client + + +def test_mock_group_single_member_join_assigns_all_partitions(net, metrics): + """Smoke test for MockCluster.add_group: a single consumer joins the group, + is elected leader, runs the assignor, and is assigned every partition of + the subscribed topic.""" + mock_cluster = MockCluster(num_brokers=1) + mock_cluster.set_metadata(topics=[_metadata_topic('t', num_partitions=3)]) + group = mock_cluster.add_group('my-group', coordinator=0) + coordinator, manager, client = _group_coordinator( + net, metrics, mock_cluster, group_id='my-group') + try: + coordinator._subscription.subscribe(topics=['t']) + # Ensure the cluster has the topic metadata the assignor needs. + client.cluster.request_update() + + assert coordinator.ensure_active_group(timeout_ms=5000) + + assert coordinator._subscription.assigned_partitions() == { + TopicPartition('t', 0), TopicPartition('t', 1), TopicPartition('t', 2)} + assert coordinator._is_leader + assert len(group.members) == 1 + assert group.generation == 1 + finally: + coordinator._close_heartbeat() + coordinator.reset_generation() + coordinator.close(timeout_ms=0) + client.close() + manager.close() + + +def test_metadata_growth_triggers_rejoin_end_to_end(net, metrics): + """KAFKA-3949 end-to-end: a consumer is the group leader with a 1-partition + topic; the topic grows to 2 partitions; a metadata refresh must drive a + rejoin so the consumer ends up assigned both partitions (the change is not + lost).""" + mock_cluster = MockCluster(num_brokers=1) + mock_cluster.set_metadata(topics=[_metadata_topic('t', num_partitions=1)]) + group = mock_cluster.add_group('my-group', coordinator=0) + coordinator, manager, client = _group_coordinator( + net, metrics, mock_cluster, group_id='my-group') + try: + coordinator._subscription.subscribe(topics=['t']) + assert coordinator.ensure_active_group(timeout_ms=5000) + assert coordinator._subscription.assigned_partitions() == {TopicPartition('t', 0)} + assert group.generation == 1 + + # The topic grows to 2 partitions. Refresh metadata so the coordinator's + # listener sees the change (the racing metadata update from KAFKA-3949). + mock_cluster.set_metadata(topics=[_metadata_topic('t', num_partitions=2)]) + future = client.cluster.request_update() + client.poll(future=future, timeout_ms=5000) + + # The leader's assignment snapshot is now stale -> must rejoin. + assert coordinator.need_rejoin() + + # Rejoin picks up the new partition; the change was not lost. + assert coordinator.ensure_active_group(timeout_ms=5000) + assert coordinator._subscription.assigned_partitions() == { + TopicPartition('t', 0), TopicPartition('t', 1)} + assert group.generation == 2 + finally: + coordinator._close_heartbeat() + coordinator.reset_generation() + coordinator.close(timeout_ms=0) + client.close() + manager.close() diff --git a/test/mock_broker.py b/test/mock_broker.py index 36ed7360a..cd61d355d 100644 --- a/test/mock_broker.py +++ b/test/mock_broker.py @@ -38,6 +38,12 @@ import kafka.errors as Errors from kafka.protocol.admin import DescribeClusterRequest, DescribeClusterResponse from kafka.protocol.broker_version_data import BrokerVersionData +from kafka.protocol.consumer import ( + HeartbeatRequest, HeartbeatResponse, + JoinGroupRequest, JoinGroupResponse, + LeaveGroupRequest, LeaveGroupResponse, + SyncGroupRequest, SyncGroupResponse, +) from kafka.protocol.metadata import ( ApiVersionsRequest, ApiVersionsResponse, CoordinatorType, FindCoordinatorRequest, FindCoordinatorResponse, @@ -574,6 +580,7 @@ def __init__(self, num_brokers=3, broker_version=(4, 2), host='localhost', base_ ] self._by_addr = {(b.host, b.port): b for b in self.brokers} self._coordinators = {} + self._groups = {} self.set_metadata() def __getitem__(self, node_id): @@ -658,6 +665,25 @@ def _handle_find_coordinator(self, api_key, api_version, correlation_id, request node_id=first.node_id, host=first.host, port=first.port, coordinators=coordinators) + def add_group(self, group_id, coordinator=0): + """Install a minimal classic-protocol group coordinator on a broker. + + Points ``FindCoordinator`` for ``group_id`` at the ``coordinator`` + broker (via :meth:`set_coordinator`) and registers a :class:`MockGroup` + there that auto-answers JoinGroup / SyncGroup / Heartbeat / LeaveGroup, + so a real consumer can complete a rebalance without the test scripting + each response. Returns the :class:`MockGroup` for inspection and + rebalance control (e.g. ``group.trigger_rebalance()``). + + Arguments: + group_id (str): consumer group id. + coordinator (int): index of the coordinator broker. Default 0. + """ + self.set_coordinator(group_id, coordinator) + group = MockGroup(self.brokers[coordinator], group_id) + self._groups[group_id] = group + return group + def attach(self, manager): """Monkey-patch a KafkaConnectionManager to route each new connection to the cluster member matching the target node's (host, port). @@ -700,3 +726,112 @@ def factory(**kwargs): return client return factory + + +def _as_bytes(value): + """Coerce a decoded metadata/assignment field (bytes or memoryview) to bytes.""" + if isinstance(value, bytes): + return value + return bytes(value) + + +class MockGroup: + """Minimal classic-protocol consumer-group coordinator for a MockBroker. + + Auto-answers JoinGroup / SyncGroup / Heartbeat / LeaveGroup so a real + consumer (or ConsumerCoordinator) can complete a rebalance against a mock + without the test scripting each response. Models the common case: + + - The first member to join (or whoever sent a non-empty member_id) is + elected leader; only the leader receives the full member list so it can + run the assignor. + - SyncGroup routes the leader-computed assignment back to each member -- + the broker just echoes ``assignments[member_id]`` from the request. + - Heartbeat returns NoError, unless :meth:`trigger_rebalance` armed a + one-shot RebalanceInProgress to drive a broker-initiated rejoin. + + This is deliberately not a full broker: no session-timeout eviction, no + generation fencing beyond what the handlers below do, classic + JoinGroup/SyncGroup only (not KIP-848). Use :attr:`members` / + :attr:`generation` / :attr:`leader` for assertions. + + Typically created via :meth:`MockCluster.add_group`. + """ + + def __init__(self, broker, group_id): + self.broker = broker + self.group_id = group_id + self.generation = 0 + self.members = {} # member_id -> subscription metadata bytes + self.leader = None + self._next_member_id = 0 + self._force_rebalance = False + broker.respond_always(JoinGroupRequest, self._on_join) + broker.respond_always(SyncGroupRequest, self._on_sync) + broker.respond_always(HeartbeatRequest, self._on_heartbeat) + broker.respond_always(LeaveGroupRequest, self._on_leave) + + def trigger_rebalance(self): + """Arm a one-shot RebalanceInProgress on the next Heartbeat, forcing + the consumer to rejoin (a broker-initiated rebalance).""" + self._force_rebalance = True + + def _on_join(self, api_key, api_version, correlation_id, request_bytes): + request = JoinGroupRequest.decode(request_bytes, version=api_version, header=True) + member_id = request.member_id + if not member_id: + member_id = '%s-member-%d' % (self.group_id, self._next_member_id) + self._next_member_id += 1 + # Record/refresh this member's subscription (first supported protocol). + protocol = request.protocols[0] + self.members[member_id] = _as_bytes(protocol.metadata) + if self.leader is None or self.leader not in self.members: + self.leader = member_id + self.generation += 1 + + Member = JoinGroupResponse.JoinGroupResponseMember + # Only the leader needs the member list (to compute the assignment). + members = [] + if member_id == self.leader: + members = [Member(member_id=m, group_instance_id=None, metadata=md) + for m, md in self.members.items()] + return JoinGroupResponse( + throttle_time_ms=0, error_code=0, + generation_id=self.generation, + protocol_type='consumer', + protocol_name=protocol.name, + leader=self.leader, + member_id=member_id, + members=members) + + def _on_sync(self, api_key, api_version, correlation_id, request_bytes): + request = SyncGroupRequest.decode(request_bytes, version=api_version, header=True) + # The leader carries every member's assignment; a follower sends none. + # Route this member's slice back (empty if not found). + assignment = b'' + for entry in request.assignments: + if entry.member_id == request.member_id: + assignment = _as_bytes(entry.assignment) + break + return SyncGroupResponse( + throttle_time_ms=0, error_code=0, + protocol_type='consumer', + protocol_name=request.protocol_name, + assignment=assignment) + + def _on_heartbeat(self, api_key, api_version, correlation_id, request_bytes): + if self._force_rebalance: + self._force_rebalance = False + return HeartbeatResponse( + throttle_time_ms=0, + error_code=Errors.RebalanceInProgressError.errno) + return HeartbeatResponse(throttle_time_ms=0, error_code=0) + + def _on_leave(self, api_key, api_version, correlation_id, request_bytes): + request = LeaveGroupRequest.decode(request_bytes, version=api_version, header=True) + Member = LeaveGroupResponse.MemberResponse + members = [] + for m in getattr(request, 'members', None) or []: + members.append(Member(member_id=m.member_id, error_code=0)) + self.members.pop(m.member_id, None) + return LeaveGroupResponse(throttle_time_ms=0, error_code=0, members=members) diff --git a/test/test_mock_broker.py b/test/test_mock_broker.py index 3be83667b..6c2c1df76 100644 --- a/test/test_mock_broker.py +++ b/test/test_mock_broker.py @@ -506,3 +506,67 @@ def test_stop_and_start(self): assert client.is_ready(0) finally: client.close() + + +class TestMockGroup: + """The MockCluster.add_group auto-coordinator (JoinGroup/SyncGroup/ + Heartbeat/LeaveGroup), tested at the protocol-handler level. + + The handlers return response objects directly (the broker encodes them + on the way out), so the tests can feed an encoded request in and assert + on the response object's fields. + """ + + def _join(self, group, member_id='', topics=('t',), api_version=7): + from kafka.coordinator.assignors.abstract import ConsumerProtocolSubscription + from kafka.protocol.consumer import JoinGroupRequest + request = JoinGroupRequest( + group_id=group.group_id, session_timeout_ms=30000, + rebalance_timeout_ms=300000, member_id=member_id, + group_instance_id=None, protocol_type='consumer', + protocols=[('range', ConsumerProtocolSubscription(0, list(topics), b'').encode())], + max_version=api_version) + request.API_VERSION = api_version + request.with_header(correlation_id=1) + return group._on_join(JoinGroupRequest.API_KEY, api_version, 1, + request.encode(header=True)) + + def test_add_group_elects_first_member_leader(self): + cluster = MockCluster(num_brokers=1) + group = cluster.add_group('g', coordinator=0) + # First join with an empty member_id: broker assigns one, elects it leader. + resp = self._join(group, member_id='') + assert resp.error_code == 0 + assert resp.leader == resp.member_id # this member is leader + assert resp.member_id # a member_id was assigned + assert resp.generation_id == 1 + assert len(resp.members) == 1 # leader gets the member list + assert group.leader == resp.member_id + assert list(group.members) == [resp.member_id] + + # A rejoin reusing the member_id keeps it leader and bumps generation. + resp2 = self._join(group, member_id=resp.member_id) + assert resp2.member_id == resp.member_id + assert resp2.leader == resp.member_id + assert resp2.generation_id == 2 + assert list(group.members) == [resp.member_id] # no duplicate member + + def test_trigger_rebalance_is_one_shot(self): + import kafka.errors as Errors + from kafka.protocol.consumer import HeartbeatRequest + cluster = MockCluster(num_brokers=1) + group = cluster.add_group('g', coordinator=0) + + def heartbeat(): + req = HeartbeatRequest(group_id='g', generation_id=1, member_id='m', + group_instance_id=None) + req.API_VERSION = 3 + req.with_header(correlation_id=1) + return group._on_heartbeat(HeartbeatRequest.API_KEY, 3, 1, req.encode(header=True)) + + # Default: healthy heartbeats. + assert heartbeat().error_code == 0 + # Armed: next heartbeat reports a rebalance, then heals. + group.trigger_rebalance() + assert heartbeat().error_code == Errors.RebalanceInProgressError.errno + assert heartbeat().error_code == 0