From f4538966fa8474e43a3742bc09a642c439448043 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 13 Jun 2026 17:33:38 -0700 Subject: [PATCH 1/2] tests: Add ConsumerGroupRunner fixture to manage multi-consumer group integration tests --- test/integration/conftest.py | 122 +++++++++++++++ test/integration/test_admin_integration.py | 109 +++---------- test/integration/test_consumer_integration.py | 146 +++++------------- 3 files changed, 179 insertions(+), 198 deletions(-) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index b49b6ffbb..2a1d26cdc 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -1,5 +1,8 @@ +import collections import contextlib import os +import threading +import time from urllib.parse import urlparse import uuid @@ -7,6 +10,7 @@ from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer from kafka.net.compat import KafkaNetClient +from kafka.structs import MemberState from kafka.util import TOPIC_LEGAL_CHARS, TOPIC_MAX_LENGTH, ensure_valid_topic_name from test.testutil import env_kafka_version, random_string from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params @@ -248,3 +252,121 @@ def _send_messages(number_range, partition=0, topic=topic, producer=producer, re return [msg for (msg, f) in messages_and_futures] return _send_messages + + +class ConsumerGroupRunner: + """Manage pools of group-consuming threads for integration tests. + + Each spawned consumer runs ``poll()`` in its own background thread until + the runner is torn down. Use :meth:`wait_for_stable` to block until every + member has joined and the group has finished rebalancing. Threads are + stopped and joined automatically when the ``consumer_group`` fixture tears + down, so tests don't need their own ``try/finally`` bookkeeping. + + consumers = consumer_group.spawn(group_id='g', count=4) + consumer_group.wait_for_stable() + # ... assert on consumers[i].assignment() ... + + ``wait_for_stable`` reads ``consumer.group_metadata()`` / ``assignment()`` + from the test thread while each consumer polls on its own thread. Both + reads are lock-protected snapshots, so this cross-thread observation is + safe even though KafkaConsumer is otherwise single-threaded. + """ + def __init__(self, consumer_factory): + self._factory = consumer_factory + self._members = [] # [{thread, stop, ready, consumer, error}, ...] + + @property + def consumers(self): + """Live consumers in spawn order (across all groups).""" + return [m['consumer'] for m in self._members] + + def spawn(self, group_id, count=1, client_id_prefix='consumer', **consumer_params): + """Start ``count`` consumer threads in ``group_id``; return their consumers. + + Blocks until each consumer has been constructed (or re-raises whatever + the consumer thread failed with during startup). Extra keyword args are + passed through to ``kafka_consumer_factory``. + """ + started = [] + for _ in range(count): + member = { + 'stop': threading.Event(), + 'ready': threading.Event(), + 'consumer': None, + 'error': None, + } + client_id = '%s-%d' % (client_id_prefix, len(self._members)) + + def run(member=member, client_id=client_id): + try: + with self._factory(group_id=group_id, client_id=client_id, + **consumer_params) as c: + member['consumer'] = c + member['ready'].set() + while not member['stop'].is_set(): + c.poll(timeout_ms=200) + except Exception as e: # surfaced via spawn() / stop() + member['error'] = e + finally: + member['ready'].set() + + member['thread'] = threading.Thread(target=run, name=client_id, daemon=True) + self._members.append(member) + member['thread'].start() + assert member['ready'].wait(timeout=15), \ + 'consumer %s failed to start within 15s' % client_id + if member['error'] is not None: + raise member['error'] + started.append(member['consumer']) + return started + + def wait_for_stable(self, timeout=30, poll_interval=1): + """Block until every member is STABLE, assigned, and on one generation. + + "One generation" is checked per group_id: a fully converged group has + all of its members reporting the same generation_id, which means the + most recent rebalance has propagated to everyone. + """ + deadline = time.monotonic() + timeout + while True: + generations = collections.defaultdict(set) + converged = True + for c in self.consumers: + meta = c.group_metadata() + if meta.state != MemberState.STABLE or not c.assignment(): + converged = False + break + generations[meta.group_id].add(meta.generation_id) + if converged and all(len(g) == 1 for g in generations.values()): + return + assert time.monotonic() < deadline, 'timeout waiting for stable group' + time.sleep(poll_interval) + + def stop(self): + """Signal all threads to stop, then join them (called at teardown).""" + for member in self._members: + member['stop'].set() + for member in self._members: + member['thread'].join(timeout=5) + assert not member['thread'].is_alive(), \ + 'consumer thread %s did not exit' % member['thread'].name + + +@pytest.fixture +def consumer_group(kafka_consumer_factory): + """Return a ConsumerGroupRunner for multi-threaded consumer group tests. + + Spawns group-consuming threads on demand and tears them all down (stop + + join) when the test finishes:: + + def test_something(consumer_group, topic): + consumer_group.spawn(group_id='g', count=3) + consumer_group.wait_for_stable() + ... + """ + runner = ConsumerGroupRunner(kafka_consumer_factory) + try: + yield runner + finally: + runner.stop() diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index b3718b075..9ae8fc168 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -1,5 +1,4 @@ from logging import info -from threading import Event, Thread from time import monotonic as time, sleep import pytest @@ -170,95 +169,31 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') -def test_describe_group_exists(kafka_admin_client, kafka_consumer_factory, topic): - """Tests that the describe consumer group call returns valid consumer group information - This test takes inspiration from the test 'test_group' in test_consumer_group.py. - """ - consumers = {} - stop = {} - threads = {} +def test_describe_group_exists(kafka_admin_client, consumer_group, topic): + """Tests that the describe consumer group call returns valid consumer group information""" random_group_id = 'test-group-' + random_string(6) group_id_list = [random_group_id, random_group_id + '_2'] - generations = {group_id_list[0]: set(), group_id_list[1]: set()} - def consumer_thread(i, group_id): - assert i not in consumers - assert i not in stop - stop[i] = Event() - with kafka_consumer_factory(group_id=group_id) as c: - consumers[i] = c - while not stop[i].is_set(): - consumers[i].poll(timeout_ms=200) - consumers[i] = None - stop[i] = None - - num_consumers = 3 - for i in range(num_consumers): - group_id = group_id_list[i % 2] - t = Thread(target=consumer_thread, args=(i, group_id,)) - t.start() - threads[i] = t - - try: - timeout = time() + 35 - while True: - info('Checking consumers...') - for c in range(num_consumers): - - # Verify all consumers have been created - if c not in consumers: - break - - # Verify all consumers have an assignment - elif not consumers[c].assignment(): - break - - # If all consumers exist and have an assignment - else: - - info('All consumers have assignment... checking for stable group') - # Verify all consumers are in the same generation - # then log state and break while loop - - for consumer in consumers.values(): - generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id) - - is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()]) - - # New generation assignment is not complete until - # coordinator.rejoining = False - rejoining = any([consumer._coordinator.rejoining - for consumer in list(consumers.values())]) - - if not rejoining and is_same_generation: - break - assert time() < timeout, "timeout waiting for assignments" - info('sleeping...') - sleep(1) - info('Group stabilized; verifying assignment') - output = kafka_admin_client.describe_groups(group_id_list) - assert len(output) == 2 - groups = set() - for group in output.values(): - assert(group['group_id'] in group_id_list) - if group['group_id'] == group_id_list[0]: - assert(len(group['members']) == 2) - else: - assert(len(group['members']) == 1) - for member in group['members']: - assert(member['member_metadata']['topics'] == [topic]) - assert(member['member_assignment']['assigned_partitions'][0]['topic'] == topic) - groups.add(group['group_id']) - assert(sorted(list(groups)) == group_id_list) - finally: - info('Shutting down %s consumers', num_consumers) - for c in range(num_consumers): - info('Stopping consumer %s', c) - stop[c].set() - for c in range(num_consumers): - info('Waiting for consumer thread %s', c) - threads[c].join() - threads[c] = None + # Two members in the first group, one in the second. + consumer_group.spawn(group_id=group_id_list[0], count=2) + consumer_group.spawn(group_id=group_id_list[1], count=1) + consumer_group.wait_for_stable(timeout=35) + + info('Group stabilized; verifying assignment') + output = kafka_admin_client.describe_groups(group_id_list) + assert len(output) == 2 + groups = set() + for group in output.values(): + assert(group['group_id'] in group_id_list) + if group['group_id'] == group_id_list[0]: + assert(len(group['members']) == 2) + else: + assert(len(group['members']) == 1) + for member in group['members']: + assert(member['member_metadata']['topics'] == [topic]) + assert(member['member_assignment']['assigned_partitions'][0]['topic'] == topic) + groups.add(group['group_id']) + assert(sorted(list(groups)) == group_id_list) @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index 402ecee64..c6cd4d415 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -1,6 +1,4 @@ -import collections import logging -import threading import time from unittest.mock import patch, ANY @@ -346,119 +344,45 @@ def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic @pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') -def test_group(kafka_consumer_factory, topic): +def test_group(consumer_group, topic): num_partitions = 4 - consumers = {} - stop = {} - threads = {} - messages = collections.defaultdict(lambda: collections.defaultdict(list)) - group_id = 'test-group-' + random_string(6) - def consumer_thread(i): - assert i not in consumers - assert i not in stop - stop[i] = threading.Event() - # Tight session/request timeouts so close() can't outlast the - # join(timeout=5) below. request_timeout_ms must exceed - # session_timeout_ms, and both must exceed heartbeat_interval_ms - # (default 500 in the consumer_factory fixture). - with kafka_consumer_factory(group_id=group_id, - client_id="consumer_thread-%s" % i, - session_timeout_ms=3000, - request_timeout_ms=4000, - bootstrap_timeout_ms=5000) as c: - consumers[i] = c - while not stop[i].is_set(): - for tp, records in consumers[i].poll(timeout_ms=200).items(): - messages[i][tp].extend(records) - consumers[i] = None - stop[i] = None - num_consumers = 4 - for i in range(num_consumers): - t = threading.Thread(target=consumer_thread, args=(i,)) - t.daemon = True - t.start() - threads[i] = t - - try: - timeout = time.monotonic() + 15 - while True: - assert time.monotonic() < timeout, "timeout waiting for assignments" - # Verify all consumers have been created - missing_consumers = set(range(num_consumers)) - set(consumers.keys()) - if missing_consumers: - logging.info('Waiting on consumer threads: %s', missing_consumers) - time.sleep(1) - continue + group_id = 'test-group-' + random_string(6) - unassigned_consumers = {c for c, consumer in consumers.items() if not consumer.assignment()} - if unassigned_consumers: - logging.info('Waiting for consumer assignments: %s', unassigned_consumers) - time.sleep(1) + # Tight session/request timeouts so close() can't outlast the join(timeout=5) + # at teardown. request_timeout_ms must exceed session_timeout_ms, and both + # must exceed heartbeat_interval_ms (default 500 in the consumer factory). + consumers = consumer_group.spawn( + group_id=group_id, count=num_consumers, + session_timeout_ms=3000, request_timeout_ms=4000, bootstrap_timeout_ms=5000) + + consumer_group.wait_for_stable(timeout=15) + + logging.info('Group stabilized; verifying assignment') + group_assignment = set() + for c in consumers: + assert len(c.assignment()) != 0 + assert set.isdisjoint(c.assignment(), group_assignment) + group_assignment.update(c.assignment()) + + assert group_assignment == set([ + TopicPartition(topic, partition) + for partition in range(num_partitions)]) + logging.info('Assignment looks good!') + + logging.info('Verifying heartbeats') + while True: + for c in consumers: + heartbeat = c._coordinator.heartbeat + last_hb = time.monotonic() - 0.5 + if (heartbeat.heartbeat_failed or + heartbeat.last_receive < last_hb or + heartbeat.last_reset > last_hb): + time.sleep(0.1) continue - - # If all consumers exist and have an assignment - logging.info('All consumers have assignment... checking for stable group') - # Verify all consumers are in the same generation - # then log state and break while loop - generations = set([consumer._coordinator._generation.generation_id - for consumer in consumers.values()]) - - # New generation assignment is not complete until - # coordinator.rejoining = False - rejoining = set([c for c, consumer in consumers.items() if consumer._coordinator.rejoining]) - - if not rejoining and len(generations) == 1: - for c, consumer in consumers.items(): - logging.info("[%s] %s %s: %s", c, - consumer._coordinator._generation.generation_id, - consumer._coordinator._generation.member_id, - consumer.assignment()) - break - else: - logging.info('Rejoining: %s, generations: %s', rejoining, generations) - time.sleep(1) - continue - - logging.info('Group stabilized; verifying assignment') - group_assignment = set() - for c in range(num_consumers): - assert len(consumers[c].assignment()) != 0 - assert set.isdisjoint(consumers[c].assignment(), group_assignment) - group_assignment.update(consumers[c].assignment()) - - assert group_assignment == set([ - TopicPartition(topic, partition) - for partition in range(num_partitions)]) - logging.info('Assignment looks good!') - - logging.info('Verifying heartbeats') - while True: - for c in range(num_consumers): - heartbeat = consumers[c]._coordinator.heartbeat - last_hb = time.monotonic() - 0.5 - if (heartbeat.heartbeat_failed or - heartbeat.last_receive < last_hb or - heartbeat.last_reset > last_hb): - time.sleep(0.1) - continue - else: - break - logging.info('Heartbeats look good') - - finally: - logging.info('Shutting down %s consumers', num_consumers) - # Signal all stops first, then join. Serial stop-then-join causes the - # broker to process N back-to-back rebalances (one per LeaveGroup); - # parallel teardown lets all consumers close concurrently against a - # single rebalance pass and keeps each join() within budget. - for c in range(num_consumers): - logging.info('Stopping consumer %s', c) - stop[c].set() - for c in range(num_consumers): - threads[c].join(timeout=5) - assert not threads[c].is_alive() - threads[c] = None + else: + break + logging.info('Heartbeats look good') @pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') From 717025f05e3fbd493653b97a0a71357bf72b7162 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 13 Jun 2026 17:47:56 -0700 Subject: [PATCH 2/2] pylint --- test/integration/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 2a1d26cdc..f673b6a29 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -317,7 +317,7 @@ def run(member=member, client_id=client_id): assert member['ready'].wait(timeout=15), \ 'consumer %s failed to start within 15s' % client_id if member['error'] is not None: - raise member['error'] + raise member['error'] # pylint: disable=raising-bad-type started.append(member['consumer']) return started