Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 81 additions & 15 deletions mac/Sources/CodeBurnMenubar/Data/DataClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Foundation
private let maxPayloadBytes = 20 * 1024 * 1024
private let maxStderrBytes = 256 * 1024
private let spawnTimeoutSeconds: UInt64 = 45
private let maxConcurrentSpawns = 6

enum DataClientError: Error {
case spawn(String)
Expand Down Expand Up @@ -55,7 +56,13 @@ struct DataClient {
let exitCode: Int32
}

/// Caps concurrent CLI spawns so a wake-burst of refreshes can't fan out into
/// dozens of node processes at once.
private static let spawnLimiter = AsyncSemaphore(maxConcurrentSpawns)

private static func runCLI(subcommand: [String]) async throws -> ProcessResult {
await spawnLimiter.acquire()
defer { Task { await spawnLimiter.release() } }
let process = CodeburnCLI.makeProcess(subcommand: subcommand)
return try await runProcess(process,
timeoutSeconds: spawnTimeoutSeconds,
Expand All @@ -65,15 +72,15 @@ struct DataClient {
/// Runs an already-configured process to completion, draining its output and
/// enforcing a hard timeout.
///
/// CRITICAL: neither the timeout nor the exit wait may run on Swift's
/// cooperative thread pool. `process.waitUntilExit()` is a blocking syscall;
/// on a 16-core machine, 16 concurrent slow CLIs would pin all 16 cooperative
/// threads inside waitUntilExit, exhausting the pool. A timeout living on that
/// same pool could then never be scheduled to kill the hung processes — the
/// menubar deadlocks on "Loading…" forever (confirmed via sample: 16/16
/// cooperative threads parked in waitUntilExit). So the timeout is a
/// DispatchSource on a global queue, and the exit wait is bridged through a
/// global (overcommit) queue instead of blocking the caller's executor.
/// CRITICAL: nothing here may block a worker thread waiting for the process.
/// `process.waitUntilExit()` is a blocking syscall. An earlier fix moved it
/// onto a global(qos:.utility) queue with the timeout on that SAME queue — but
/// under sustained load every utility worker ended up blocked in waitUntilExit,
/// so the timeout could never be scheduled to kill them and the menubar wedged
/// on "Loading…" forever (confirmed via sample: threads parked in
/// waitUntilExit, timeout never firing). Instead we await
/// `process.terminationHandler`, which fires on a Foundation-managed queue and
/// blocks nothing, so the timeout always has a free thread to fire on.
static func runProcess(_ process: Process,
timeoutSeconds: UInt64,
label: String) async throws -> ProcessResult {
Expand All @@ -82,6 +89,11 @@ struct DataClient {
process.standardOutput = outPipe
process.standardError = errPipe

// Bridge the process exit to an async signal set up BEFORE run(), so the
// exit can never be missed and the wait never blocks a worker thread.
let exitSignal = ProcessExitSignal()
process.terminationHandler = { _ in exitSignal.fulfill() }

do {
try process.run()
} catch {
Expand Down Expand Up @@ -111,12 +123,9 @@ struct DataClient {
}
try? outHandle.close()
try? errHandle.close()
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
DispatchQueue.global(qos: .utility).async {
process.waitUntilExit()
continuation.resume()
}
}
// Wait for exit via terminationHandler, never by parking a worker thread
// in waitUntilExit (see the doc comment above for why that wedged).
await exitSignal.wait()

if out.count >= maxPayloadBytes {
throw DataClientError.outputTooLarge
Expand Down Expand Up @@ -168,3 +177,60 @@ struct DataClient {
return buffer
}
}

/// One-shot async signal that bridges `Process.terminationHandler` (invoked on a
/// Foundation-internal queue) to an awaiting task without blocking a worker
/// thread. Safe against fulfill-before-wait.
final class ProcessExitSignal: @unchecked Sendable {
private let lock = NSLock()
private var fulfilled = false
private var continuation: CheckedContinuation<Void, Never>?

func fulfill() {
lock.lock()
if fulfilled { lock.unlock(); return }
fulfilled = true
let cont = continuation
continuation = nil
lock.unlock()
cont?.resume()
}

func wait() async {
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
lock.lock()
if fulfilled {
lock.unlock()
cont.resume()
} else {
continuation = cont
lock.unlock()
}
}
}
}

/// Minimal actor-based async semaphore. Caps concurrency without blocking a
/// thread (unlike DispatchSemaphore.wait()).
actor AsyncSemaphore {
private var available: Int
private var waiters: [CheckedContinuation<Void, Never>] = []

init(_ count: Int) { available = count }

func acquire() async {
if available > 0 {
available -= 1
return
}
await withCheckedContinuation { waiters.append($0) }
}

func release() {
if waiters.isEmpty {
available += 1
} else {
waiters.removeFirst().resume()
}
}
}
51 changes: 51 additions & 0 deletions mac/Tests/CodeBurnMenubarTests/DataClientProcessTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,55 @@ final class DataClientProcessTests: XCTestCase {
XCTAssertEqual(result.exitCode, 0)
XCTAssertEqual(String(data: result.stdout, encoding: .utf8), "hello\n")
}

/// Many NORMALLY-exiting processes, all at once, must every one complete
/// through the terminationHandler wait path. Guards against the wait path
/// leaking or wedging under concurrency (the production bug was the wait and
/// its timeout sharing one queue that saturated under sustained load).
func testManyNormalProcessesAllComplete() async {
let count = 50
let codes = await withTaskGroup(of: Int32?.self) { group -> [Int32?] in
for _ in 0..<count {
group.addTask {
let process = Process()
process.executableURL = URL(fileURLWithPath: "/bin/echo")
process.arguments = ["ok"]
return try? await DataClient.runProcess(process, timeoutSeconds: 5, label: "echo ok").exitCode
}
}
var out: [Int32?] = []
for await code in group { out.append(code) }
return out
}
XCTAssertEqual(codes.count, count)
XCTAssertTrue(codes.allSatisfy { $0 == 0 },
"every concurrent process should exit 0 via the terminationHandler wait path")
}

/// The async semaphore never lets more than its count run concurrently.
func testAsyncSemaphoreCapsConcurrency() async {
let sem = AsyncSemaphore(2)
let peak = PeakCounter()
await withTaskGroup(of: Void.self) { group in
for _ in 0..<12 {
group.addTask {
await sem.acquire()
await peak.enter()
try? await Task.sleep(nanoseconds: 8_000_000)
await peak.leave()
await sem.release()
}
}
}
let observed = await peak.peak
XCTAssertLessThanOrEqual(observed, 2, "semaphore should cap concurrency at 2, saw \(observed)")
XCTAssertGreaterThan(observed, 0)
}
}

private actor PeakCounter {
private var current = 0
private(set) var peak = 0
func enter() { current += 1; peak = max(peak, current) }
func leave() { current -= 1 }
}
Loading