Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions test/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import collections
import contextlib
import os
import threading
import time
from urllib.parse import urlparse
import uuid

import pytest

from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.net.compat import KafkaNetClient
from kafka.structs import MemberState
from kafka.util import TOPIC_LEGAL_CHARS, TOPIC_MAX_LENGTH, ensure_valid_topic_name
from test.testutil import env_kafka_version, random_string
from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params
Expand Down Expand Up @@ -248,3 +252,121 @@ def _send_messages(number_range, partition=0, topic=topic, producer=producer, re
return [msg for (msg, f) in messages_and_futures]

return _send_messages


class ConsumerGroupRunner:
"""Manage pools of group-consuming threads for integration tests.

Each spawned consumer runs ``poll()`` in its own background thread until
the runner is torn down. Use :meth:`wait_for_stable` to block until every
member has joined and the group has finished rebalancing. Threads are
stopped and joined automatically when the ``consumer_group`` fixture tears
down, so tests don't need their own ``try/finally`` bookkeeping.

consumers = consumer_group.spawn(group_id='g', count=4)
consumer_group.wait_for_stable()
# ... assert on consumers[i].assignment() ...

``wait_for_stable`` reads ``consumer.group_metadata()`` / ``assignment()``
from the test thread while each consumer polls on its own thread. Both
reads are lock-protected snapshots, so this cross-thread observation is
safe even though KafkaConsumer is otherwise single-threaded.
"""
def __init__(self, consumer_factory):
self._factory = consumer_factory
self._members = [] # [{thread, stop, ready, consumer, error}, ...]

@property
def consumers(self):
"""Live consumers in spawn order (across all groups)."""
return [m['consumer'] for m in self._members]

def spawn(self, group_id, count=1, client_id_prefix='consumer', **consumer_params):
"""Start ``count`` consumer threads in ``group_id``; return their consumers.

Blocks until each consumer has been constructed (or re-raises whatever
the consumer thread failed with during startup). Extra keyword args are
passed through to ``kafka_consumer_factory``.
"""
started = []
for _ in range(count):
member = {
'stop': threading.Event(),
'ready': threading.Event(),
'consumer': None,
'error': None,
}
client_id = '%s-%d' % (client_id_prefix, len(self._members))

def run(member=member, client_id=client_id):
try:
with self._factory(group_id=group_id, client_id=client_id,
**consumer_params) as c:
member['consumer'] = c
member['ready'].set()
while not member['stop'].is_set():
c.poll(timeout_ms=200)
except Exception as e: # surfaced via spawn() / stop()
member['error'] = e
finally:
member['ready'].set()

member['thread'] = threading.Thread(target=run, name=client_id, daemon=True)
self._members.append(member)
member['thread'].start()
assert member['ready'].wait(timeout=15), \
'consumer %s failed to start within 15s' % client_id
if member['error'] is not None:
raise member['error'] # pylint: disable=raising-bad-type
started.append(member['consumer'])
return started

def wait_for_stable(self, timeout=30, poll_interval=1):
"""Block until every member is STABLE, assigned, and on one generation.

"One generation" is checked per group_id: a fully converged group has
all of its members reporting the same generation_id, which means the
most recent rebalance has propagated to everyone.
"""
deadline = time.monotonic() + timeout
while True:
generations = collections.defaultdict(set)
converged = True
for c in self.consumers:
meta = c.group_metadata()
if meta.state != MemberState.STABLE or not c.assignment():
converged = False
break
generations[meta.group_id].add(meta.generation_id)
if converged and all(len(g) == 1 for g in generations.values()):
return
assert time.monotonic() < deadline, 'timeout waiting for stable group'
time.sleep(poll_interval)

def stop(self):
"""Signal all threads to stop, then join them (called at teardown)."""
for member in self._members:
member['stop'].set()
for member in self._members:
member['thread'].join(timeout=5)
assert not member['thread'].is_alive(), \
'consumer thread %s did not exit' % member['thread'].name


@pytest.fixture
def consumer_group(kafka_consumer_factory):
"""Return a ConsumerGroupRunner for multi-threaded consumer group tests.

Spawns group-consuming threads on demand and tears them all down (stop +
join) when the test finishes::

def test_something(consumer_group, topic):
consumer_group.spawn(group_id='g', count=3)
consumer_group.wait_for_stable()
...
"""
runner = ConsumerGroupRunner(kafka_consumer_factory)
try:
yield runner
finally:
runner.stop()
109 changes: 22 additions & 87 deletions test/integration/test_admin_integration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from logging import info
from threading import Event, Thread
from time import monotonic as time, sleep

import pytest
Expand Down Expand Up @@ -170,95 +169,31 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
This test takes inspiration from the test 'test_group' in test_consumer_group.py.
"""
consumers = {}
stop = {}
threads = {}
def test_describe_group_exists(kafka_admin_client, consumer_group, topic):
"""Tests that the describe consumer group call returns valid consumer group information"""
random_group_id = 'test-group-' + random_string(6)
group_id_list = [random_group_id, random_group_id + '_2']
generations = {group_id_list[0]: set(), group_id_list[1]: set()}
def consumer_thread(i, group_id):
assert i not in consumers
assert i not in stop
stop[i] = Event()
with kafka_consumer_factory(group_id=group_id) as c:
consumers[i] = c
while not stop[i].is_set():
consumers[i].poll(timeout_ms=200)
consumers[i] = None
stop[i] = None

num_consumers = 3
for i in range(num_consumers):
group_id = group_id_list[i % 2]
t = Thread(target=consumer_thread, args=(i, group_id,))
t.start()
threads[i] = t

try:
timeout = time() + 35
while True:
info('Checking consumers...')
for c in range(num_consumers):

# Verify all consumers have been created
if c not in consumers:
break

# Verify all consumers have an assignment
elif not consumers[c].assignment():
break

# If all consumers exist and have an assignment
else:

info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop

for consumer in consumers.values():
generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id)

is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()])

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = any([consumer._coordinator.rejoining
for consumer in list(consumers.values())])

if not rejoining and is_same_generation:
break
assert time() < timeout, "timeout waiting for assignments"
info('sleeping...')
sleep(1)

info('Group stabilized; verifying assignment')
output = kafka_admin_client.describe_groups(group_id_list)
assert len(output) == 2
groups = set()
for group in output.values():
assert(group['group_id'] in group_id_list)
if group['group_id'] == group_id_list[0]:
assert(len(group['members']) == 2)
else:
assert(len(group['members']) == 1)
for member in group['members']:
assert(member['member_metadata']['topics'] == [topic])
assert(member['member_assignment']['assigned_partitions'][0]['topic'] == topic)
groups.add(group['group_id'])
assert(sorted(list(groups)) == group_id_list)
finally:
info('Shutting down %s consumers', num_consumers)
for c in range(num_consumers):
info('Stopping consumer %s', c)
stop[c].set()
for c in range(num_consumers):
info('Waiting for consumer thread %s', c)
threads[c].join()
threads[c] = None
# Two members in the first group, one in the second.
consumer_group.spawn(group_id=group_id_list[0], count=2)
consumer_group.spawn(group_id=group_id_list[1], count=1)
consumer_group.wait_for_stable(timeout=35)

info('Group stabilized; verifying assignment')
output = kafka_admin_client.describe_groups(group_id_list)
assert len(output) == 2
groups = set()
for group in output.values():
assert(group['group_id'] in group_id_list)
if group['group_id'] == group_id_list[0]:
assert(len(group['members']) == 2)
else:
assert(len(group['members']) == 1)
for member in group['members']:
assert(member['member_metadata']['topics'] == [topic])
assert(member['member_assignment']['assigned_partitions'][0]['topic'] == topic)
groups.add(group['group_id'])
assert(sorted(list(groups)) == group_id_list)


@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
Expand Down
Loading
Loading