Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions pychunkedgraph/ingest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Dict, Generator, Tuple

import numpy as np
from kvdbclient import BigTableConfig, HBaseConfig
from kvdbclient import get_config_class
from rich import box
from rich.console import Group
from rich.live import Live
Expand Down Expand Up @@ -62,10 +62,7 @@ def bootstrap(
TEST_RUN=test_run,
)
backend_type = config["backend_client"].get("TYPE", "bigtable")
if backend_type == "hbase":
client_config = HBaseConfig(**config["backend_client"]["CONFIG"])
else:
client_config = BigTableConfig(**config["backend_client"]["CONFIG"])
client_config = get_config_class(backend_type)(**config["backend_client"]["CONFIG"])
client_info = BackendClientInfo(backend_type, client_config)

graph_config = GraphConfig(
Expand Down
292 changes: 31 additions & 261 deletions pychunkedgraph/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,170 +1,49 @@
import atexit
import os
import signal
import subprocess
from functools import partial
from contextlib import ExitStack
from datetime import timedelta
from functools import partial

import numpy as np
import pytest

# Skip the old monolithic test file if it still exists (e.g., during branch transitions)
collect_ignore = ["test_uncategorized.py"]
import numpy as np
from google.auth import credentials
from google.cloud import bigtable
from kvdbclient_testing import backends as _backend_harnesses

from ..ingest.utils import bootstrap
from ..graph.edges import Edges
from ..graph.chunkedgraph import ChunkedGraph
from ..ingest.create.parent_layer import add_parent_chunk

from ..graph.edges import Edges
from ..ingest.utils import bootstrap
from .helpers import (
CloudVolumeMock,
TensorStoreMock,
mock_ws_info,
create_chunk,
to_label,
get_layer_chunk_bounds,
mock_ws_info,
)
from .hbase_mock_server import start_hbase_mock_server

_emulator_proc = None
_emulator_cleaned = False


def _delete_test_table(graph, backend="bigtable"):
"""Test-only: delete the backing table for cleanup."""
if backend == "bigtable":
graph.client._admin_table.delete()
else:
resp = graph.client._session.delete(graph.client._table_url("/schema"))
if resp.status_code not in (200, 404):
resp.raise_for_status()

# Skip the old monolithic test file if it still exists (e.g., during branch transitions)
collect_ignore = ["test_uncategorized.py"]

def _cleanup_emulator():
global _emulator_cleaned
if _emulator_cleaned or _emulator_proc is None:
return
_emulator_cleaned = True
try:
pgid = os.getpgid(_emulator_proc.pid)
os.killpg(pgid, signal.SIGTERM)
try:
_emulator_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
os.killpg(pgid, signal.SIGKILL)
_emulator_proc.wait(timeout=5)
except (ProcessLookupError, OSError, ChildProcessError):
pass
# Hard kill cbtemulator in case it survived the process group signal
subprocess.run(["pkill", "-9", "cbtemulator"], stderr=subprocess.DEVNULL)


def setup_emulator_env():
bt_env_init = subprocess.run(
["gcloud", "beta", "emulators", "bigtable", "env-init"], stdout=subprocess.PIPE
)
os.environ["BIGTABLE_EMULATOR_HOST"] = (
bt_env_init.stdout.decode("utf-8").strip().split("=")[-1]
)

c = bigtable.Client(
project="IGNORE_ENVIRONMENT_PROJECT",
credentials=credentials.AnonymousCredentials(),
admin=True,
)
t = c.instance("emulated_instance").table("emulated_table")

try:
t.create()
return True
except Exception as err:
print("Bigtable Emulator not yet ready: %s" % err)
return False
# Backends are discovered from KVDbClient; a new backend there is tested with no change here.
_HARNESSES = {h.name: h for h in _backend_harnesses()}


@pytest.fixture(scope="session", autouse=True)
def bigtable_emulator(request):
global _emulator_proc, _emulator_cleaned
from time import sleep

_emulator_cleaned = False

# Kill any leftover emulator processes from previous runs
subprocess.run(["pkill", "-9", "cbtemulator"], stderr=subprocess.DEVNULL)

# Start Emulator
_emulator_proc = subprocess.Popen(
[
"gcloud",
"beta",
"emulators",
"bigtable",
"start",
"--host-port=localhost:8539",
],
preexec_fn=os.setsid,
stdout=subprocess.PIPE,
)

# Register atexit handler as safety net for abnormal exits
atexit.register(_cleanup_emulator)

# Wait for Emulator to start up
print("Waiting for BigTables Emulator to start up...", end="")
retries = 5
while retries > 0:
if setup_emulator_env() is True:
break
else:
retries -= 1
sleep(5)

if retries == 0:
print(
"\nCouldn't start Bigtable Emulator. Make sure it is installed correctly."
)
_cleanup_emulator()
exit(1)

request.addfinalizer(_cleanup_emulator)


@pytest.fixture(scope="session")
def hbase_emulator():
"""Start an in-process mock HBase REST server for the session."""
_data, server, port = start_hbase_mock_server()
yield port
server.shutdown()


@pytest.fixture(scope="function", params=["bigtable", "hbase"])
def gen_graph(request, bigtable_emulator, hbase_emulator):
backend = request.param
def _backend_servers():
"""Start every available backend's local instance once for the session."""
with ExitStack() as stack:
handles = {}
for name, harness in _HARNESSES.items():
if harness.available():
handles[name] = stack.enter_context(harness.server())
yield handles


@pytest.fixture(scope="function", params=list(_HARNESSES))
def gen_graph(request, _backend_servers):
name = request.param
if name not in _backend_servers:
pytest.skip(f"backend {name!r} unavailable in this environment")
harness = _HARNESSES[name]
handle = _backend_servers[name]

def _cgraph(request, n_layers=10, atomic_chunk_bounds: np.ndarray = np.array([])):
if backend == "bigtable":
backend_client = {
"TYPE": "bigtable",
"CONFIG": {
"ADMIN": True,
"READ_ONLY": False,
"PROJECT": "IGNORE_ENVIRONMENT_PROJECT",
"INSTANCE": "emulated_instance",
"CREDENTIALS": credentials.AnonymousCredentials(),
"MAX_ROW_KEY_COUNT": 1000,
},
}
else:
backend_client = {
"TYPE": "hbase",
"CONFIG": {
"BASE_URL": f"http://127.0.0.1:{hbase_emulator}",
"MAX_ROW_KEY_COUNT": 1000,
},
}

config = {
"data_source": {
"EDGES": "gs://chunked-graph/minnie65_0/edges",
Expand All @@ -176,9 +55,9 @@ def _cgraph(request, n_layers=10, atomic_chunk_bounds: np.ndarray = np.array([])
"FANOUT": 2,
"SPATIAL_BITS": 10,
"ID_PREFIX": "",
"ROOT_LOCK_EXPIRY": timedelta(seconds=5),
"ROOT_LOCK_EXPIRY": timedelta(seconds=1),
},
"backend_client": backend_client,
"backend_client": harness.backend_client(handle),
"ingest_config": {},
}

Expand All @@ -196,123 +75,14 @@ def _cgraph(request, n_layers=10, atomic_chunk_bounds: np.ndarray = np.array([])
graph.create()

def fin():
_delete_test_table(graph, backend)
harness.delete_table(graph)

request.addfinalizer(fin)
return graph

return partial(_cgraph, request)


@pytest.fixture(scope="function", params=["bigtable", "hbase"])
def gen_graph_with_edges(request, tmp_path, bigtable_emulator, hbase_emulator):
"""Like gen_graph but with real edge/component I/O via local filesystem (file:// protocol)."""
backend = request.param

def _cgraph(request, n_layers=10, atomic_chunk_bounds: np.ndarray = np.array([])):
edges_dir = f"file://{tmp_path}/edges"
components_dir = f"file://{tmp_path}/components"

if backend == "bigtable":
backend_client = {
"TYPE": "bigtable",
"CONFIG": {
"ADMIN": True,
"READ_ONLY": False,
"PROJECT": "IGNORE_ENVIRONMENT_PROJECT",
"INSTANCE": "emulated_instance",
"CREDENTIALS": credentials.AnonymousCredentials(),
"MAX_ROW_KEY_COUNT": 1000,
},
}
else:
backend_client = {
"TYPE": "hbase",
"CONFIG": {
"BASE_URL": f"http://127.0.0.1:{hbase_emulator}",
"MAX_ROW_KEY_COUNT": 1000,
},
}

config = {
"data_source": {
"EDGES": edges_dir,
"COMPONENTS": components_dir,
"WATERSHED": "gs://microns-seunglab/minnie65/ws_minnie65_0",
},
"graph_config": {
"CHUNK_SIZE": [512, 512, 64],
"FANOUT": 2,
"SPATIAL_BITS": 10,
"ID_PREFIX": "",
"ROOT_LOCK_EXPIRY": timedelta(seconds=5),
},
"backend_client": backend_client,
"ingest_config": {},
}

meta, _, client_info, _ = bootstrap("test", config=config)
graph = ChunkedGraph(graph_id="test", meta=meta, client_info=client_info)
# No mock_edges - use real I/O via file:// protocol
graph.meta._ws_cv = CloudVolumeMock()
graph.meta._ws_info_d = mock_ws_info()
graph.meta.ws_ts_scale = lambda mip=0: TensorStoreMock()
graph.meta.layer_count = n_layers
graph.meta.layer_chunk_bounds = get_layer_chunk_bounds(
n_layers, atomic_chunk_bounds=atomic_chunk_bounds
)

graph.create()

def fin():
_delete_test_table(graph, backend)

request.addfinalizer(fin)
return graph

return partial(_cgraph, request)


@pytest.fixture(scope="function")
def gen_graph_simplequerytest(request, gen_graph):
"""
┌─────┬─────┬─────┐
│ A¹ │ B¹ │ C¹ │
│ 1 │ 3━2━┿━━4 │
│ │ │ │
└─────┴─────┴─────┘
"""
from math import inf

graph = gen_graph(n_layers=4)

# Chunk A
create_chunk(graph, vertices=[to_label(graph, 1, 0, 0, 0, 0)], edges=[])

# Chunk B
create_chunk(
graph,
vertices=[to_label(graph, 1, 1, 0, 0, 0), to_label(graph, 1, 1, 0, 0, 1)],
edges=[
(to_label(graph, 1, 1, 0, 0, 0), to_label(graph, 1, 1, 0, 0, 1), 0.5),
(to_label(graph, 1, 1, 0, 0, 0), to_label(graph, 1, 2, 0, 0, 0), inf),
],
)

# Chunk C
create_chunk(
graph,
vertices=[to_label(graph, 1, 2, 0, 0, 0)],
edges=[(to_label(graph, 1, 2, 0, 0, 0), to_label(graph, 1, 1, 0, 0, 0), inf)],
)

add_parent_chunk(graph, 3, [0, 0, 0], n_threads=1)
add_parent_chunk(graph, 3, [1, 0, 0], n_threads=1)
add_parent_chunk(graph, 4, [0, 0, 0], n_threads=1)

return graph


@pytest.fixture(scope="session")
def sv_data():
test_data_dir = "pychunkedgraph/tests/data"
Expand Down
Loading
Loading