Skip to content

Commit f0285ad

Browse files
authored
fix(mothership): fix superagent credentials (#4185)
* Fix * Fix ajv csp issue * Lint
1 parent a39dc15 commit f0285ad

File tree

9 files changed

+479
-57
lines changed

9 files changed

+479
-57
lines changed

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,16 @@ import {
5858
WorkspaceFile,
5959
WorkspaceFileOperation,
6060
} from '@/lib/copilot/generated/tool-catalog-v1'
61-
import { parsePersistedStreamEventEnvelopeJson } from '@/lib/copilot/request/session/contract'
61+
import {
62+
type ParseStreamEventEnvelopeFailure,
63+
parsePersistedStreamEventEnvelope,
64+
parsePersistedStreamEventEnvelopeJson,
65+
} from '@/lib/copilot/request/session/contract'
6266
import {
6367
type FilePreviewSession,
6468
isFilePreviewSession,
6569
} from '@/lib/copilot/request/session/file-preview-session-contract'
66-
import { isStreamBatchEvent, type StreamBatchEvent } from '@/lib/copilot/request/session/types'
70+
import type { StreamBatchEvent } from '@/lib/copilot/request/session/types'
6771
import {
6872
extractResourcesFromToolResult,
6973
isResourceToolName,
@@ -509,27 +513,75 @@ function isRecord(value: unknown): value is Record<string, unknown> {
509513
return Boolean(value) && typeof value === 'object' && !Array.isArray(value)
510514
}
511515

516+
const STREAM_SCHEMA_ENFORCEMENT_PREFIX = 'Client stream schema enforcement failed.'
517+
518+
class StreamSchemaValidationError extends Error {
519+
constructor(message: string) {
520+
super(message)
521+
this.name = 'StreamSchemaValidationError'
522+
}
523+
}
524+
525+
function createStreamSchemaValidationError(
526+
failure: ParseStreamEventEnvelopeFailure,
527+
context?: string
528+
): StreamSchemaValidationError {
529+
const details = failure.errors?.filter(Boolean).join('; ')
530+
return new StreamSchemaValidationError(
531+
[STREAM_SCHEMA_ENFORCEMENT_PREFIX, context, failure.message, details].filter(Boolean).join(' ')
532+
)
533+
}
534+
535+
function createBatchSchemaValidationError(message: string): StreamSchemaValidationError {
536+
return new StreamSchemaValidationError([STREAM_SCHEMA_ENFORCEMENT_PREFIX, message].join(' '))
537+
}
538+
539+
function isStreamSchemaValidationError(error: unknown): error is StreamSchemaValidationError {
540+
return error instanceof StreamSchemaValidationError
541+
}
542+
512543
function parseStreamBatchResponse(value: unknown): StreamBatchResponse {
513544
if (!isRecord(value)) {
514545
throw new Error('Invalid stream batch response')
515546
}
516547

517548
const rawEvents = Array.isArray(value.events) ? value.events : []
518549
const events: StreamBatchEvent[] = []
519-
for (const entry of rawEvents) {
520-
if (!isStreamBatchEvent(entry)) {
521-
throw new Error('Invalid stream batch event')
550+
for (const [index, entry] of rawEvents.entries()) {
551+
if (!isRecord(entry)) {
552+
throw createBatchSchemaValidationError(`Reconnect batch event ${index + 1} is not an object.`)
553+
}
554+
if (
555+
typeof entry.eventId !== 'number' ||
556+
!Number.isFinite(entry.eventId) ||
557+
typeof entry.streamId !== 'string'
558+
) {
559+
throw createBatchSchemaValidationError(
560+
`Reconnect batch event ${index + 1} is missing required metadata.`
561+
)
522562
}
523-
events.push(entry)
563+
564+
const parsedEvent = parsePersistedStreamEventEnvelope(entry.event)
565+
if (!parsedEvent.ok) {
566+
throw createStreamSchemaValidationError(parsedEvent, `Reconnect batch event ${index + 1}.`)
567+
}
568+
569+
events.push({
570+
eventId: entry.eventId,
571+
streamId: entry.streamId,
572+
event: parsedEvent.event,
573+
})
524574
}
525575

526576
const rawPreviewSessions = Array.isArray(value.previewSessions)
527577
? value.previewSessions
528578
: undefined
529579
const previewSessions =
530-
rawPreviewSessions?.map((session) => {
580+
rawPreviewSessions?.map((session, index) => {
531581
if (!isFilePreviewSession(session)) {
532-
throw new Error('Invalid stream preview session')
582+
throw createBatchSchemaValidationError(
583+
`Reconnect preview session ${index + 1} failed validation.`
584+
)
533585
}
534586
return session
535587
}) ?? undefined
@@ -1579,12 +1631,14 @@ export function useChat(
15791631

15801632
const parsedResult = parsePersistedStreamEventEnvelopeJson(raw)
15811633
if (!parsedResult.ok) {
1582-
logger.warn('Failed to parse chat SSE event', {
1634+
const error = createStreamSchemaValidationError(parsedResult, 'Live SSE event.')
1635+
logger.error('Rejected chat SSE event due to client-side schema enforcement', {
15831636
reason: parsedResult.reason,
15841637
message: parsedResult.message,
15851638
errors: parsedResult.errors,
1639+
error: error.message,
15861640
})
1587-
continue
1641+
throw error
15881642
}
15891643
const parsed = parsedResult.event
15901644

@@ -2533,6 +2587,17 @@ export function useChat(
25332587
}
25342588
return true
25352589
}
2590+
if (isStreamSchemaValidationError(err)) {
2591+
logger.error('Reconnect halted by client-side stream schema enforcement', {
2592+
streamId,
2593+
attempt: attempt + 1,
2594+
error: err.message,
2595+
})
2596+
if (streamGenRef.current === gen) {
2597+
setError(err.message)
2598+
}
2599+
return false
2600+
}
25362601
logger.warn('Reconnect attempt failed', {
25372602
streamId,
25382603
attempt: attempt + 1,
@@ -2892,6 +2957,13 @@ export function useChat(
28922957
}
28932958
} catch (err) {
28942959
if (err instanceof Error && err.name === 'AbortError') return consumedByTranscript
2960+
if (isStreamSchemaValidationError(err)) {
2961+
setError(err.message)
2962+
if (streamGenRef.current === gen) {
2963+
finalize({ error: true })
2964+
}
2965+
return consumedByTranscript
2966+
}
28952967

28962968
const activeStreamId = streamIdRef.current
28972969
if (activeStreamId && streamGenRef.current === gen) {

apps/sim/lib/copilot/request/session/contract.test.ts

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,92 @@ describe('stream session contract parser', () => {
4343
})
4444
})
4545

46+
it('accepts contract session chat events', () => {
47+
const event = {
48+
...BASE_ENVELOPE,
49+
type: 'session' as const,
50+
payload: { kind: 'chat' as const, chatId: 'chat-1' },
51+
}
52+
53+
expect(isContractStreamEventEnvelope(event)).toBe(true)
54+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
55+
})
56+
57+
it('accepts contract complete events', () => {
58+
const event = {
59+
...BASE_ENVELOPE,
60+
type: 'complete' as const,
61+
payload: { status: 'complete' as const },
62+
}
63+
64+
expect(isContractStreamEventEnvelope(event)).toBe(true)
65+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
66+
})
67+
68+
it('accepts contract error events', () => {
69+
const event = {
70+
...BASE_ENVELOPE,
71+
type: 'error' as const,
72+
payload: { message: 'something went wrong' },
73+
}
74+
75+
expect(isContractStreamEventEnvelope(event)).toBe(true)
76+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
77+
})
78+
79+
it('accepts contract tool call events', () => {
80+
const event = {
81+
...BASE_ENVELOPE,
82+
type: 'tool' as const,
83+
payload: {
84+
toolCallId: 'tc-1',
85+
toolName: 'read',
86+
phase: 'call' as const,
87+
executor: 'sim' as const,
88+
mode: 'sync' as const,
89+
},
90+
}
91+
92+
expect(isContractStreamEventEnvelope(event)).toBe(true)
93+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
94+
})
95+
96+
it('accepts contract span events', () => {
97+
const event = {
98+
...BASE_ENVELOPE,
99+
type: 'span' as const,
100+
payload: { kind: 'subagent' as const, event: 'start' as const, agent: 'file' },
101+
}
102+
103+
expect(isContractStreamEventEnvelope(event)).toBe(true)
104+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
105+
})
106+
107+
it('accepts contract resource events', () => {
108+
const event = {
109+
...BASE_ENVELOPE,
110+
type: 'resource' as const,
111+
payload: {
112+
op: 'upsert' as const,
113+
resource: { id: 'r-1', type: 'file', title: 'test.md' },
114+
},
115+
}
116+
117+
expect(isContractStreamEventEnvelope(event)).toBe(true)
118+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
119+
})
120+
121+
it('accepts contract run events', () => {
122+
const event = {
123+
...BASE_ENVELOPE,
124+
type: 'run' as const,
125+
payload: { kind: 'compaction_start' as const },
126+
}
127+
128+
expect(isContractStreamEventEnvelope(event)).toBe(true)
129+
expect(parsePersistedStreamEventEnvelope(event).ok).toBe(true)
130+
})
131+
46132
it('accepts synthetic file preview events', () => {
47133
const event = {
48134
...BASE_ENVELOPE,
@@ -82,7 +168,32 @@ describe('stream session contract parser', () => {
82168
throw new Error('expected invalid result')
83169
}
84170
expect(parsed.reason).toBe('invalid_stream_event')
85-
expect(parsed.errors?.length).toBeGreaterThan(0)
171+
})
172+
173+
it('rejects unknown event types', () => {
174+
const parsed = parsePersistedStreamEventEnvelope({
175+
...BASE_ENVELOPE,
176+
type: 'unknown_type',
177+
payload: {},
178+
})
179+
180+
expect(parsed.ok).toBe(false)
181+
if (parsed.ok) {
182+
throw new Error('expected invalid result')
183+
}
184+
expect(parsed.reason).toBe('invalid_stream_event')
185+
expect(parsed.errors).toContain('unknown type="unknown_type"')
186+
})
187+
188+
it('rejects non-object values', () => {
189+
const parsed = parsePersistedStreamEventEnvelope('not an object')
190+
191+
expect(parsed.ok).toBe(false)
192+
if (parsed.ok) {
193+
throw new Error('expected invalid result')
194+
}
195+
expect(parsed.reason).toBe('invalid_stream_event')
196+
expect(parsed.errors).toContain('value is not an object')
86197
})
87198

88199
it('reports invalid JSON separately from schema failures', () => {

0 commit comments

Comments
 (0)