Skip to content

Commit 8e00c39

Browse files
ergunshOrKoN
authored andcommitted
chore: Implement ClearcutSender HTTP transport for telemetry
Implements the telemetry transport layer per the design doc: - Ring buffer with overflow handling (1000 events max) - HTTP transport using native fetch API with 30s timeout - Daisy-chain flush scheduling (15min default interval) - Transient vs permanent error classification (5xx/429 retry, 4xx drop) - Server-side rate limiting via next_request_wait_millis - Shutdown handling with 5s timeout for final flush
1 parent def02bd commit 8e00c39

5 files changed

Lines changed: 667 additions & 126 deletions

File tree

src/telemetry/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ export interface LogRequest {
4747
}>;
4848
}
4949

50+
export interface LogResponse {
51+
/**
52+
* If present, the client must wait this many milliseconds before
53+
* issuing the next HTTP request.
54+
*/
55+
next_request_wait_millis?: number;
56+
}
57+
5058
// Enums
5159
export enum OsType {
5260
OS_TYPE_UNSPECIFIED = 0,

src/telemetry/watchdog/clearcut-sender.ts

Lines changed: 197 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,42 @@
77
import crypto from 'node:crypto';
88

99
import {logger} from '../../logger.js';
10-
import type {ChromeDevToolsMcpExtension, OsType} from '../types.js';
10+
import type {
11+
ChromeDevToolsMcpExtension,
12+
LogRequest,
13+
LogResponse,
14+
OsType,
15+
} from '../types.js';
1116

17+
const INCLUDE_PID_HEADER = process.env.INCLUDE_PID_HEADER === 'true';
18+
const MAX_BUFFER_SIZE = parseInt(process.env.MAX_BUFFER_SIZE ?? '', 10) || 1000;
19+
const FLUSH_INTERVAL_MS =
20+
parseInt(process.env.FORCE_FLUSH_INTERVAL ?? '', 10) || 15 * 60 * 1000;
21+
const CLEARCUT_ENDPOINT =
22+
process.env.CLEARCUT_ENDPOINT ??
23+
'https://play.googleapis.com/log?format=json_proto';
24+
25+
const LOG_SOURCE = 2839;
26+
const CLIENT_TYPE = 47;
27+
const MIN_RATE_LIMIT_WAIT_MS = 30_000;
28+
const REQUEST_TIMEOUT_MS = 30_000;
29+
const SHUTDOWN_TIMEOUT_MS = 5_000;
1230
const SESSION_ROTATION_INTERVAL_MS = 24 * 60 * 60 * 1000;
1331

32+
interface BufferedEvent {
33+
event: ChromeDevToolsMcpExtension;
34+
timestamp: number;
35+
}
36+
1437
export class ClearcutSender {
1538
#appVersion: string;
1639
#osType: OsType;
1740
#sessionId: string;
1841
#sessionCreated: number;
42+
#buffer: BufferedEvent[] = [];
43+
#flushTimer: ReturnType<typeof setTimeout> | null = null;
44+
#isFlushing = false;
45+
#timerStarted = false;
1946

2047
constructor(appVersion: string, osType: OsType) {
2148
this.#appVersion = appVersion;
@@ -24,36 +51,187 @@ export class ClearcutSender {
2451
this.#sessionCreated = Date.now();
2552
}
2653

27-
async send(event: ChromeDevToolsMcpExtension): Promise<void> {
28-
this.#rotateSessionIfNeeded();
29-
const enrichedEvent = this.#enrichEvent(event);
30-
this.transport(enrichedEvent);
31-
}
54+
enqueueEvent(event: ChromeDevToolsMcpExtension): void {
55+
if (Date.now() - this.#sessionCreated > SESSION_ROTATION_INTERVAL_MS) {
56+
this.#sessionId = crypto.randomUUID();
57+
this.#sessionCreated = Date.now();
58+
}
59+
60+
this.#addToBuffer({
61+
...event,
62+
session_id: this.#sessionId,
63+
app_version: this.#appVersion,
64+
os_type: this.#osType,
65+
});
3266

33-
transport(event: ChromeDevToolsMcpExtension): void {
34-
logger('Telemetry event', JSON.stringify(event, null, 2));
67+
if (!this.#timerStarted) {
68+
this.#timerStarted = true;
69+
this.#scheduleFlush(FLUSH_INTERVAL_MS);
70+
}
3571
}
3672

3773
async sendShutdownEvent(): Promise<void> {
74+
if (this.#flushTimer) {
75+
clearTimeout(this.#flushTimer);
76+
this.#flushTimer = null;
77+
}
78+
3879
const shutdownEvent: ChromeDevToolsMcpExtension = {
3980
server_shutdown: {},
4081
};
41-
await this.send(shutdownEvent);
82+
this.enqueueEvent(shutdownEvent);
83+
84+
try {
85+
await Promise.race([
86+
this.#finalFlush(),
87+
new Promise(resolve => setTimeout(resolve, SHUTDOWN_TIMEOUT_MS)),
88+
]);
89+
} catch (error) {
90+
logger('Final flush failed:', error);
91+
}
4292
}
4393

44-
#rotateSessionIfNeeded(): void {
45-
if (Date.now() - this.#sessionCreated > SESSION_ROTATION_INTERVAL_MS) {
46-
this.#sessionId = crypto.randomUUID();
47-
this.#sessionCreated = Date.now();
94+
async #flush(): Promise<void> {
95+
if (this.#isFlushing) {
96+
return;
97+
}
98+
99+
if (this.#buffer.length === 0) {
100+
this.#scheduleFlush(FLUSH_INTERVAL_MS);
101+
return;
102+
}
103+
104+
this.#isFlushing = true;
105+
let nextDelayMs = FLUSH_INTERVAL_MS;
106+
107+
// Optimistically remove events from buffer before sending.
108+
// This prevents race conditions where a simultaneous #finalFlush would include these same events.
109+
const eventsToSend = [...this.#buffer];
110+
this.#buffer = [];
111+
112+
try {
113+
const result = await this.#sendBatch(eventsToSend);
114+
115+
if (result.success) {
116+
if (result.nextRequestWaitMs !== undefined) {
117+
nextDelayMs = Math.max(
118+
result.nextRequestWaitMs,
119+
MIN_RATE_LIMIT_WAIT_MS,
120+
);
121+
}
122+
} else if (result.isPermanentError) {
123+
logger(
124+
'Permanent error, dropped batch of',
125+
eventsToSend.length,
126+
'events',
127+
);
128+
} else {
129+
// Transient error: Requeue events at the front of the buffer
130+
// to maintain order and retry them later.
131+
this.#buffer = [...eventsToSend, ...this.#buffer];
132+
}
133+
} catch (error) {
134+
// Safety catch for unexpected errors, requeue events
135+
this.#buffer = [...eventsToSend, ...this.#buffer];
136+
logger('Flush failed unexpectedly:', error);
137+
} finally {
138+
this.#isFlushing = false;
139+
this.#scheduleFlush(nextDelayMs);
48140
}
49141
}
50142

51-
#enrichEvent(event: ChromeDevToolsMcpExtension): ChromeDevToolsMcpExtension {
52-
return {
53-
...event,
54-
session_id: this.#sessionId,
55-
app_version: this.#appVersion,
56-
os_type: this.#osType,
143+
#addToBuffer(event: ChromeDevToolsMcpExtension): void {
144+
if (this.#buffer.length >= MAX_BUFFER_SIZE) {
145+
this.#buffer.shift();
146+
logger('Telemetry buffer overflow: dropped oldest event');
147+
}
148+
this.#buffer.push({
149+
event,
150+
timestamp: Date.now(),
151+
});
152+
}
153+
154+
#scheduleFlush(delayMs: number): void {
155+
if (this.#flushTimer) {
156+
clearTimeout(this.#flushTimer);
157+
}
158+
this.#flushTimer = setTimeout(() => {
159+
this.#flush().catch(err => {
160+
logger('Flush error:', err);
161+
});
162+
}, delayMs);
163+
}
164+
165+
async #sendBatch(events: BufferedEvent[]): Promise<{
166+
success: boolean;
167+
isPermanentError?: boolean;
168+
nextRequestWaitMs?: number;
169+
}> {
170+
const requestBody: LogRequest = {
171+
log_source: LOG_SOURCE,
172+
request_time_ms: Date.now().toString(),
173+
client_info: {
174+
client_type: CLIENT_TYPE,
175+
},
176+
log_event: events.map(({event, timestamp}) => ({
177+
event_time_ms: timestamp.toString(),
178+
source_extension_json: JSON.stringify(event),
179+
})),
57180
};
181+
182+
const controller = new AbortController();
183+
const timeoutId = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS);
184+
try {
185+
const response = await fetch(CLEARCUT_ENDPOINT, {
186+
method: 'POST',
187+
headers: {
188+
'Content-Type': 'application/json',
189+
// Used in E2E tests to confirm that the watchdog process is killed
190+
...(INCLUDE_PID_HEADER ? {'X-Watchdog-Pid': process.pid.toString()} : {}),
191+
},
192+
body: JSON.stringify(requestBody),
193+
signal: controller.signal,
194+
});
195+
196+
clearTimeout(timeoutId);
197+
if (response.ok) {
198+
const data = (await response.json()) as LogResponse;
199+
return {
200+
success: true,
201+
nextRequestWaitMs: data.next_request_wait_millis,
202+
};
203+
}
204+
205+
const status = response.status;
206+
if (status >= 500 || status === 429) {
207+
return {success: false};
208+
}
209+
210+
logger('Telemetry permanent error:', status);
211+
return {success: false, isPermanentError: true};
212+
} catch {
213+
clearTimeout(timeoutId);
214+
return {success: false};
215+
}
216+
}
217+
218+
async #finalFlush(): Promise<void> {
219+
if (this.#buffer.length === 0) {
220+
return;
221+
}
222+
const eventsToSend = [...this.#buffer];
223+
await this.#sendBatch(eventsToSend);
224+
}
225+
226+
stopForTesting(): void {
227+
if (this.#flushTimer) {
228+
clearTimeout(this.#flushTimer);
229+
this.#flushTimer = null;
230+
}
231+
this.#timerStarted = false;
232+
}
233+
234+
get bufferSizeForTesting(): number {
235+
return this.#buffer.length;
58236
}
59237
}

src/telemetry/watchdog/main.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,7 @@ function main() {
107107

108108
const msg = JSON.parse(line);
109109
if (msg.type === WatchdogMessageType.LOG_EVENT && msg.payload) {
110-
sender.send(msg.payload).catch(err => {
111-
logger('Error sending event', err);
112-
});
110+
sender.enqueueEvent(msg.payload);
113111
}
114112
} catch (err) {
115113
logger('Failed to parse IPC message', err);

0 commit comments

Comments
 (0)