From 1b0662722a07c5e393d24485466b2ba86800c807 Mon Sep 17 00:00:00 2001 From: chenyuxuan <458254969@qq.com> Date: Sun, 14 Jun 2026 15:44:42 +0800 Subject: [PATCH 1/2] fix(watcher): degrade on watch exhaustion and prolonged lock contention --- __tests__/watcher.test.ts | 87 ++++++++++++++++++++++++++++++++- src/sync/watcher.ts | 100 +++++++++++++++++++++++++++++++++----- 2 files changed, 174 insertions(+), 13 deletions(-) diff --git a/__tests__/watcher.test.ts b/__tests__/watcher.test.ts index b372fc3d6..9e4451bfa 100644 --- a/__tests__/watcher.test.ts +++ b/__tests__/watcher.test.ts @@ -18,6 +18,7 @@ */ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import type { EventEmitter } from 'events'; import * as fs from 'fs'; import * as path from 'path'; import * as os from 'os'; @@ -25,6 +26,7 @@ import { FileWatcher, LockUnavailableError, __emitWatchEventForTests, + __setFsWatchForTests, type WatchOptions, } from '../src/sync/watcher'; import CodeGraph from '../src/index'; @@ -69,6 +71,8 @@ describe('FileWatcher', () => { }); afterEach(() => { + __setFsWatchForTests(null); + vi.restoreAllMocks(); if (fs.existsSync(testDir)) { fs.rmSync(testDir, { recursive: true, force: true }); } @@ -87,6 +91,58 @@ describe('FileWatcher', () => { expect(watcher.isActive()).toBe(false); }); + it('should not start when fs.watch setup exhausts watch/file resources', () => { + const syncFn = vi.fn().mockResolvedValue({ filesChanged: 0, durationMs: 0 }); + const watcher = new FileWatcher(testDir, syncFn, { debounceMs: 100 }); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + __setFsWatchForTests(() => { + const err = new Error('too many open files') as NodeJS.ErrnoException; + err.code = 'EMFILE'; + throw err; + }); + + expect(watcher.start()).toBe(false); + expect(watcher.isActive()).toBe(false); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('File watcher disabled'), + expect.objectContaining({ + reason: expect.stringContaining('auto-sync disabled'), + }) + ); + }); + + it('should degrade once when the recursive watcher emits EMFILE at runtime', async () => { + const syncFn = vi.fn().mockResolvedValue({ filesChanged: 0, durationMs: 0 }); + const handlers = new Map void>>(); + const fakeWatcher = { + on: vi.fn((event: string, handler: (arg?: unknown) => void) => { + const list = handlers.get(event) ?? []; + list.push(handler); + handlers.set(event, list); + return fakeWatcher; + }), + close: vi.fn(), + } as unknown as fs.FSWatcher & EventEmitter; + __setFsWatchForTests(() => fakeWatcher); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const watcher = new FileWatcher(testDir, syncFn, { debounceMs: 100 }); + + expect(watcher.start()).toBe(true); + expect(watcher.isActive()).toBe(true); + + const err = new Error('too many open files') as NodeJS.ErrnoException; + err.code = 'EMFILE'; + for (const handler of handlers.get('error') ?? []) handler(err); + for (const handler of handlers.get('error') ?? []) handler(err); + + expect(watcher.isActive()).toBe(false); + expect(fakeWatcher.close).toHaveBeenCalledTimes(1); + const disableCalls = warnSpy.mock.calls.filter( + (call) => typeof call[0] === 'string' && String(call[0]).includes('File watcher disabled') + ); + expect(disableCalls).toHaveLength(1); + }); + it('should be idempotent on double start', () => { const syncFn = vi.fn().mockResolvedValue({ filesChanged: 0, durationMs: 0 }); const watcher = newWatcher(syncFn); @@ -306,7 +362,7 @@ describe('FileWatcher', () => { expect(onSyncError).not.toHaveBeenCalled(); expect(onSyncComplete).not.toHaveBeenCalled(); - await waitFor(() => syncFn.mock.calls.length >= 2); + await waitFor(() => syncFn.mock.calls.length >= 2, 3000); await waitFor( () => !watcher.getPendingFiles().some((p) => p.path === 'src/locked.ts'), ); @@ -317,6 +373,35 @@ describe('FileWatcher', () => { watcher.stop(); }); + + it('should disable auto-sync after prolonged LockUnavailableError contention', async () => { + const syncFn = vi.fn().mockRejectedValue(new LockUnavailableError()); + const onSyncComplete = vi.fn(); + const onSyncError = vi.fn(); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const watcher = newWatcher(syncFn, { + debounceMs: 25, + onSyncComplete, + onSyncError, + }); + watcher.start(); + await watcher.waitUntilReady(); + + __emitWatchEventForTests(testDir, 'src/long-lock.ts'); + + await waitFor(() => !watcher.isActive(), 8000, 20); + + expect(syncFn.mock.calls.length).toBeGreaterThanOrEqual(6); + expect(watcher.getPendingFiles()).toEqual([]); + expect(onSyncComplete).not.toHaveBeenCalled(); + expect(onSyncError).not.toHaveBeenCalled(); + const disableCalls = warnSpy.mock.calls.filter( + (call) => typeof call[0] === 'string' && String(call[0]).includes('File watcher disabled') + ); + expect(disableCalls).toHaveLength(1); + + warnSpy.mockRestore(); + }); }); describe('callbacks', () => { diff --git a/src/sync/watcher.ts b/src/sync/watcher.ts index 401fbd721..f8bc74340 100644 --- a/src/sync/watcher.ts +++ b/src/sync/watcher.ts @@ -39,6 +39,16 @@ import { normalizePath } from '../utils'; import { isCodeGraphDataDir } from '../directory'; import { watchDisabledReason } from './watch-policy'; +const MAX_LOCK_RETRIES = 5; +const MAX_LOCK_RETRY_DELAY_MS = 30000; + +function isWatchResourceExhaustion(err: unknown): boolean { + const e = err as NodeJS.ErrnoException | undefined; + if (e?.code === 'EMFILE' || e?.code === 'ENFILE') return true; + const msg = e?.message ?? String(err ?? ''); + return /EMFILE|ENFILE|too many open files/i.test(msg); +} + /** * Native recursive `fs.watch` is only reliable on macOS and Windows; on Linux * (and AIX) it throws `ERR_FEATURE_UNAVAILABLE_ON_PLATFORM`. We branch on this @@ -48,6 +58,14 @@ function supportsRecursiveWatch(): boolean { return process.platform === 'darwin' || process.platform === 'win32'; } +type WatchFn = typeof fs.watch; +let watchImpl: WatchFn = fs.watch; + +/** @internal Test-only seam to inject a fake fs.watch implementation. */ +export function __setFsWatchForTests(fn: WatchFn | null): void { + watchImpl = fn ?? fs.watch; +} + /** * Upper bound on simultaneously-watched directories on the Linux per-directory * path. Each is one inotify watch; the kernel's `fs.inotify.max_user_watches` @@ -164,6 +182,10 @@ export class FileWatcher { private dirWatchers = new Map(); /** Set once the per-directory watch cap is hit, so we log only once. */ private dirCapWarned = false; + /** One-way marker that live watching has been disabled due to runtime degradation. */ + private degradedReason: string | null = null; + /** Consecutive lock-contention retries for watcher-triggered syncs. */ + private lockRetryCount = 0; /** Test-only inert mode: started, but with no OS watcher installed. */ private inert = false; private debounceTimer: ReturnType | null = null; @@ -233,6 +255,8 @@ export class FileWatcher { start(): boolean { if (this.recursiveWatcher || this.dirWatchers.size > 0 || this.inert) return true; // Already watching this.stopped = false; + this.degradedReason = null; + this.lockRetryCount = 0; // Some environments make filesystem watching unusable — most notably // WSL2 /mnt/ drives, where the underlying fs.watch calls block long @@ -275,8 +299,15 @@ export class FileWatcher { return true; } catch (err) { // Watcher setup failed (e.g., permission denied, missing directory). - logWarn('Could not start file watcher', { error: String(err) }); - this.stop(); + if (isWatchResourceExhaustion(err)) { + this.degrade( + 'OS watch/file limit exhausted; auto-sync disabled. Run `codegraph sync` (or install git sync hooks) to refresh the graph after changes.', + { error: String(err) } + ); + } else { + logWarn('Could not start file watcher', { error: String(err) }); + this.stop(); + } return false; } } @@ -287,7 +318,7 @@ export class FileWatcher { * it maps straight to a project-relative path. */ private startRecursive(): void { - this.recursiveWatcher = fs.watch( + this.recursiveWatcher = watchImpl( this.projectRoot, { recursive: true, persistent: true }, (_event, filename) => { @@ -296,6 +327,13 @@ export class FileWatcher { } ); this.recursiveWatcher.on('error', (err: unknown) => { + if (isWatchResourceExhaustion(err)) { + this.degrade( + 'OS watch/file limit exhausted; auto-sync disabled. Run `codegraph sync` (or install git sync hooks) to refresh the graph after changes.', + { error: String(err) } + ); + return; + } logWarn('File watcher error', { error: String(err) }); }); } @@ -319,6 +357,7 @@ export class FileWatcher { * sync owns the baseline). */ private watchTree(dir: string, markExisting: boolean): void { + if (this.stopped || this.degradedReason) return; if (this.dirWatchers.has(dir)) return; if (this.dirWatchers.size >= maxDirWatches()) { if (!this.dirCapWarned) { @@ -332,14 +371,29 @@ export class FileWatcher { let w: fs.FSWatcher; try { - w = fs.watch(dir, { persistent: true }, (_event, filename) => + w = watchImpl(dir, { persistent: true }, (_event, filename) => this.handleDirEvent(dir, filename) ); - } catch { - // ENOENT / EACCES / too-many-open-files — skip this directory quietly. + } catch (err) { + if (isWatchResourceExhaustion(err)) { + this.degrade( + 'OS watch/file limit exhausted; auto-sync disabled. Run `codegraph sync` (or install git sync hooks) to refresh the graph after changes.', + { error: String(err), dir } + ); + } + // ENOENT / EACCES / too-many-open-files on one dir — skip non-fatal cases quietly. return; } - w.on('error', () => this.unwatchDir(dir)); + w.on('error', (err: unknown) => { + if (isWatchResourceExhaustion(err)) { + this.degrade( + 'OS watch/file limit exhausted; auto-sync disabled. Run `codegraph sync` (or install git sync hooks) to refresh the graph after changes.', + { error: String(err), dir } + ); + return; + } + this.unwatchDir(dir); + }); this.dirWatchers.set(dir, w); let entries: fs.Dirent[]; @@ -450,6 +504,14 @@ export class FileWatcher { return this.ignoreMatcher.ignores(rel + '/'); } + /** Disable live watching after a terminal runtime failure. */ + private degrade(reason: string, context: Record = {}): void { + if (this.degradedReason) return; + this.degradedReason = reason; + logWarn('File watcher disabled', { projectRoot: this.projectRoot, reason, ...context }); + this.stop(); + } + /** * Stop watching for file changes. */ @@ -478,6 +540,7 @@ export class FileWatcher { } this.dirWatchers.clear(); this.dirCapWarned = false; + this.lockRetryCount = 0; this.inert = false; this.pendingFiles.clear(); @@ -530,14 +593,14 @@ export class FileWatcher { /** * Schedule a debounced sync. */ - private scheduleSync(): void { + private scheduleSync(delayMs: number = this.debounceMs): void { if (this.debounceTimer) { clearTimeout(this.debounceTimer); } this.debounceTimer = setTimeout(() => { this.debounceTimer = null; this.flush(); - }, this.debounceMs); + }, delayMs); } /** @@ -561,6 +624,7 @@ export class FileWatcher { try { const result = await this.syncFn(); + this.lockRetryCount = 0; // Remove entries whose most recent event predates this sync — those // edits are now in the DB. Entries with lastSeenMs > syncStartedMs // arrived mid-sync; whether the in-flight sync captured them depends @@ -576,13 +640,22 @@ export class FileWatcher { this.onSyncComplete?.(result); } catch (err) { if (err instanceof LockUnavailableError) { + this.lockRetryCount += 1; // Lock-failure no-op (another writer holds the lock). pendingFiles - // stays intact and the `finally` block reschedules. Debug-only — - // a long external index would otherwise spam stderr every cycle. + // stays intact. Keep short contention quiet, but stop retrying forever + // at the normal debounce cadence when another writer is long-lived. logDebug('Watch sync skipped: file lock unavailable', { pendingFiles: this.pendingFiles.size, + retryCount: this.lockRetryCount, }); + if (this.lockRetryCount > MAX_LOCK_RETRIES) { + this.degrade( + 'File lock unavailable for too long; auto-sync disabled. Run `codegraph sync` after the writer finishes (or install git sync hooks) to refresh the graph.', + { pendingFiles: this.pendingFiles.size, retryCount: this.lockRetryCount } + ); + } } else { + this.lockRetryCount = 0; const error = err instanceof Error ? err : new Error(String(err)); logWarn('Watch sync failed', { error: error.message }); this.onSyncError?.(error); @@ -595,7 +668,10 @@ export class FileWatcher { // If pending files remain (mid-sync events, or this sync failed), // schedule another pass. if (this.pendingFiles.size > 0 && !this.stopped) { - this.scheduleSync(); + const delayMs = this.lockRetryCount > 0 + ? Math.min(this.debounceMs * (2 ** Math.max(0, this.lockRetryCount - 1)), MAX_LOCK_RETRY_DELAY_MS) + : this.debounceMs; + this.scheduleSync(delayMs); } } } From 07a22c1e0011aa65e2b8a476edb5e5e3ccf9521b Mon Sep 17 00:00:00 2001 From: chenyuxuan <458254969@qq.com> Date: Sun, 14 Jun 2026 16:05:41 +0800 Subject: [PATCH 2/2] fix(watcher): degrade cleanly on exhaustion and prolonged lock contention --- __tests__/watcher.test.ts | 13 ++++++++++-- src/sync/watcher.ts | 44 ++++++++++++++++++++++++++++++++------- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/__tests__/watcher.test.ts b/__tests__/watcher.test.ts index 9e4451bfa..ee8e6179e 100644 --- a/__tests__/watcher.test.ts +++ b/__tests__/watcher.test.ts @@ -93,7 +93,8 @@ describe('FileWatcher', () => { it('should not start when fs.watch setup exhausts watch/file resources', () => { const syncFn = vi.fn().mockResolvedValue({ filesChanged: 0, durationMs: 0 }); - const watcher = new FileWatcher(testDir, syncFn, { debounceMs: 100 }); + const onDegraded = vi.fn(); + const watcher = new FileWatcher(testDir, syncFn, { debounceMs: 100, onDegraded }); const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); __setFsWatchForTests(() => { const err = new Error('too many open files') as NodeJS.ErrnoException; @@ -103,6 +104,8 @@ describe('FileWatcher', () => { expect(watcher.start()).toBe(false); expect(watcher.isActive()).toBe(false); + expect(onDegraded).toHaveBeenCalledTimes(1); + expect(onDegraded).toHaveBeenCalledWith(expect.stringContaining('auto-sync disabled')); expect(warnSpy).toHaveBeenCalledWith( expect.stringContaining('File watcher disabled'), expect.objectContaining({ @@ -113,6 +116,7 @@ describe('FileWatcher', () => { it('should degrade once when the recursive watcher emits EMFILE at runtime', async () => { const syncFn = vi.fn().mockResolvedValue({ filesChanged: 0, durationMs: 0 }); + const onDegraded = vi.fn(); const handlers = new Map void>>(); const fakeWatcher = { on: vi.fn((event: string, handler: (arg?: unknown) => void) => { @@ -125,7 +129,7 @@ describe('FileWatcher', () => { } as unknown as fs.FSWatcher & EventEmitter; __setFsWatchForTests(() => fakeWatcher); const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); - const watcher = new FileWatcher(testDir, syncFn, { debounceMs: 100 }); + const watcher = new FileWatcher(testDir, syncFn, { debounceMs: 100, onDegraded }); expect(watcher.start()).toBe(true); expect(watcher.isActive()).toBe(true); @@ -136,6 +140,7 @@ describe('FileWatcher', () => { for (const handler of handlers.get('error') ?? []) handler(err); expect(watcher.isActive()).toBe(false); + expect(onDegraded).toHaveBeenCalledTimes(1); expect(fakeWatcher.close).toHaveBeenCalledTimes(1); const disableCalls = warnSpy.mock.calls.filter( (call) => typeof call[0] === 'string' && String(call[0]).includes('File watcher disabled') @@ -378,11 +383,13 @@ describe('FileWatcher', () => { const syncFn = vi.fn().mockRejectedValue(new LockUnavailableError()); const onSyncComplete = vi.fn(); const onSyncError = vi.fn(); + const onDegraded = vi.fn(); const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); const watcher = newWatcher(syncFn, { debounceMs: 25, onSyncComplete, onSyncError, + onDegraded, }); watcher.start(); await watcher.waitUntilReady(); @@ -395,6 +402,8 @@ describe('FileWatcher', () => { expect(watcher.getPendingFiles()).toEqual([]); expect(onSyncComplete).not.toHaveBeenCalled(); expect(onSyncError).not.toHaveBeenCalled(); + expect(onDegraded).toHaveBeenCalledTimes(1); + expect(onDegraded).toHaveBeenCalledWith(expect.stringContaining('auto-sync disabled')); const disableCalls = warnSpy.mock.calls.filter( (call) => typeof call[0] === 'string' && String(call[0]).includes('File watcher disabled') ); diff --git a/src/sync/watcher.ts b/src/sync/watcher.ts index f8bc74340..b6bc12030 100644 --- a/src/sync/watcher.ts +++ b/src/sync/watcher.ts @@ -45,8 +45,10 @@ const MAX_LOCK_RETRY_DELAY_MS = 30000; function isWatchResourceExhaustion(err: unknown): boolean { const e = err as NodeJS.ErrnoException | undefined; if (e?.code === 'EMFILE' || e?.code === 'ENFILE') return true; - const msg = e?.message ?? String(err ?? ''); - return /EMFILE|ENFILE|too many open files/i.test(msg); + if (!e?.code && e?.message) { + return /EMFILE|ENFILE|too many open files/i.test(e.message); + } + return false; } /** @@ -116,6 +118,11 @@ export interface WatchOptions { */ onSyncError?: (error: Error) => void; + /** + * Callback when live watching degrades permanently and auto-sync is disabled. + */ + onDegraded?: (reason: string) => void; + /** * Test-only. When true, `start()` installs NO OS-level fs.watch — the * watcher is "inert" and only the {@link __emitWatchEventForTests} / @@ -233,6 +240,7 @@ export class FileWatcher { private readonly syncFn: () => Promise<{ filesChanged: number; durationMs: number }>; private readonly onSyncComplete?: WatchOptions['onSyncComplete']; private readonly onSyncError?: WatchOptions['onSyncError']; + private readonly onDegraded?: WatchOptions['onDegraded']; private readonly inertForTests: boolean; constructor( @@ -245,6 +253,7 @@ export class FileWatcher { this.debounceMs = options.debounceMs ?? 2000; this.onSyncComplete = options.onSyncComplete; this.onSyncError = options.onSyncError; + this.onDegraded = options.onDegraded; this.inertForTests = options.inertForTests ?? false; } @@ -509,6 +518,7 @@ export class FileWatcher { if (this.degradedReason) return; this.degradedReason = reason; logWarn('File watcher disabled', { projectRoot: this.projectRoot, reason, ...context }); + this.onDegraded?.(reason); this.stop(); } @@ -591,9 +601,22 @@ export class FileWatcher { } /** - * Schedule a debounced sync. + * Schedule a normal debounced sync after a source edit. + */ + private scheduleSync(): void { + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + } + this.debounceTimer = setTimeout(() => { + this.debounceTimer = null; + this.flush(); + }, this.debounceMs); + } + + /** + * Schedule a retry sync after a recoverable failure such as lock contention. */ - private scheduleSync(delayMs: number = this.debounceMs): void { + private scheduleRetrySync(delayMs: number): void { if (this.debounceTimer) { clearTimeout(this.debounceTimer); } @@ -668,10 +691,15 @@ export class FileWatcher { // If pending files remain (mid-sync events, or this sync failed), // schedule another pass. if (this.pendingFiles.size > 0 && !this.stopped) { - const delayMs = this.lockRetryCount > 0 - ? Math.min(this.debounceMs * (2 ** Math.max(0, this.lockRetryCount - 1)), MAX_LOCK_RETRY_DELAY_MS) - : this.debounceMs; - this.scheduleSync(delayMs); + if (this.lockRetryCount > 0) { + const retryDelayMs = Math.min( + this.debounceMs * (2 ** Math.max(0, this.lockRetryCount - 1)), + MAX_LOCK_RETRY_DELAY_MS, + ); + this.scheduleRetrySync(retryDelayMs); + } else { + this.scheduleSync(); + } } } }