Skip to content

Commit 603612d

Browse files
committed
Task 5
1 parent 5e29c21 commit 603612d

File tree

4 files changed

+238
-23
lines changed

4 files changed

+238
-23
lines changed

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ResponseStreamFactory.cs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
using System;
1717
using System.Threading;
18+
using System.Threading.Tasks;
1819

1920
namespace Amazon.Lambda.RuntimeSupport
2021
{
@@ -57,19 +58,29 @@ public static IResponseStream CreateStream()
5758
context.Stream = stream;
5859
context.StreamCreated = true;
5960

61+
// Start the HTTP POST to the Runtime API.
62+
// This runs concurrently — SerializeToStreamAsync will block
63+
// until the handler finishes writing or reports an error.
64+
context.SendTask = context.RuntimeApiClient.StartStreamingResponseAsync(
65+
context.AwsRequestId, stream, context.CancellationToken);
66+
6067
return stream;
6168
}
6269

6370
// Internal methods for LambdaBootstrap to manage state
6471

65-
internal static void InitializeInvocation(string awsRequestId, long maxResponseSize, bool isMultiConcurrency)
72+
internal static void InitializeInvocation(
73+
string awsRequestId, long maxResponseSize, bool isMultiConcurrency,
74+
RuntimeApiClient runtimeApiClient, CancellationToken cancellationToken)
6675
{
6776
var context = new ResponseStreamContext
6877
{
6978
AwsRequestId = awsRequestId,
7079
MaxResponseSize = maxResponseSize,
7180
StreamCreated = false,
72-
Stream = null
81+
Stream = null,
82+
RuntimeApiClient = runtimeApiClient,
83+
CancellationToken = cancellationToken
7384
};
7485

7586
if (isMultiConcurrency)
@@ -88,6 +99,16 @@ internal static ResponseStream GetStreamIfCreated(bool isMultiConcurrency)
8899
return context?.Stream;
89100
}
90101

102+
/// <summary>
103+
/// Returns the Task for the in-flight HTTP send, or null if streaming wasn't started.
104+
/// LambdaBootstrap awaits this after the handler returns to ensure the HTTP request completes.
105+
/// </summary>
106+
internal static Task GetSendTask(bool isMultiConcurrency)
107+
{
108+
var context = isMultiConcurrency ? _asyncLocalContext.Value : _onDemandContext;
109+
return context?.SendTask;
110+
}
111+
91112
internal static void CleanupInvocation(bool isMultiConcurrency)
92113
{
93114
if (isMultiConcurrency)

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,47 @@ public Task ReportRestoreErrorAsync(Exception exception, String errorType = null
177177
#endif
178178

179179

180+
/// <summary>
181+
/// Start sending a streaming response to the Runtime API.
182+
/// This initiates the HTTP POST with streaming headers. The actual data
183+
/// is written by the handler via ResponseStream.WriteAsync, which flows
184+
/// through StreamingHttpContent to the HTTP connection.
185+
/// This Task completes when the stream is finalized (MarkCompleted or error).
186+
/// </summary>
187+
/// <param name="awsRequestId">The ID of the function request being responded to.</param>
188+
/// <param name="responseStream">The ResponseStream that will provide the streaming data.</param>
189+
/// <param name="cancellationToken">The optional cancellation token to use.</param>
190+
/// <returns>A Task representing the in-flight HTTP POST.</returns>
191+
internal virtual async Task StartStreamingResponseAsync(
192+
string awsRequestId, ResponseStream responseStream, CancellationToken cancellationToken = default)
193+
{
194+
if (awsRequestId == null) throw new ArgumentNullException(nameof(awsRequestId));
195+
if (responseStream == null) throw new ArgumentNullException(nameof(responseStream));
196+
197+
var url = $"http://{LambdaEnvironment.RuntimeServerHostAndPort}/2018-06-01/runtime/invocation/{awsRequestId}/response";
198+
199+
using (var request = new HttpRequestMessage(HttpMethod.Post, url))
200+
{
201+
request.Headers.Add(StreamingConstants.ResponseModeHeader, StreamingConstants.StreamingResponseMode);
202+
request.Headers.TransferEncodingChunked = true;
203+
204+
// Declare trailers upfront — we always declare them since we don't know
205+
// at request start time whether an error will occur mid-stream.
206+
request.Headers.Add("Trailer",
207+
$"{StreamingConstants.ErrorTypeTrailer}, {StreamingConstants.ErrorBodyTrailer}");
208+
209+
request.Content = new StreamingHttpContent(responseStream);
210+
211+
// SendAsync calls SerializeToStreamAsync, which blocks until the handler
212+
// finishes writing. This is why this method runs concurrently with the handler.
213+
var response = await _httpClient.SendAsync(
214+
request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
215+
response.EnsureSuccessStatusCode();
216+
}
217+
218+
responseStream.MarkCompleted();
219+
}
220+
180221
/// <summary>
181222
/// Send a response to a function invocation to the Runtime API as an asynchronous operation.
182223
/// </summary>

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamFactoryTests.cs

Lines changed: 114 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Threading;
1718
using System.Threading.Tasks;
1819
using Xunit;
1920

@@ -30,7 +31,40 @@ public void Dispose()
3031
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true);
3132
}
3233

33-
// --- Task 3.3: CreateStream tests ---
34+
/// <summary>
35+
/// A minimal RuntimeApiClient subclass for testing that overrides StartStreamingResponseAsync
36+
/// to avoid real HTTP calls while tracking invocations.
37+
/// </summary>
38+
private class MockStreamingRuntimeApiClient : RuntimeApiClient
39+
{
40+
public bool StartStreamingCalled { get; private set; }
41+
public string LastAwsRequestId { get; private set; }
42+
public ResponseStream LastResponseStream { get; private set; }
43+
public TaskCompletionSource<bool> SendTaskCompletion { get; } = new TaskCompletionSource<bool>();
44+
45+
public MockStreamingRuntimeApiClient()
46+
: base(new TestEnvironmentVariables(), new TestHelpers.NoOpInternalRuntimeApiClient())
47+
{
48+
}
49+
50+
internal override async Task StartStreamingResponseAsync(
51+
string awsRequestId, ResponseStream responseStream, CancellationToken cancellationToken = default)
52+
{
53+
StartStreamingCalled = true;
54+
LastAwsRequestId = awsRequestId;
55+
LastResponseStream = responseStream;
56+
await SendTaskCompletion.Task;
57+
}
58+
}
59+
60+
private void InitializeWithMock(string requestId, bool isMultiConcurrency, MockStreamingRuntimeApiClient mockClient)
61+
{
62+
ResponseStreamFactory.InitializeInvocation(
63+
requestId, MaxResponseSize, isMultiConcurrency,
64+
mockClient, CancellationToken.None);
65+
}
66+
67+
// --- Property 1: CreateStream Returns Valid Stream ---
3468

3569
/// <summary>
3670
/// Property 1: CreateStream Returns Valid Stream - on-demand mode.
@@ -39,7 +73,8 @@ public void Dispose()
3973
[Fact]
4074
public void CreateStream_OnDemandMode_ReturnsValidStream()
4175
{
42-
ResponseStreamFactory.InitializeInvocation("req-1", MaxResponseSize, isMultiConcurrency: false);
76+
var mock = new MockStreamingRuntimeApiClient();
77+
InitializeWithMock("req-1", isMultiConcurrency: false, mock);
4378

4479
var stream = ResponseStreamFactory.CreateStream();
4580

@@ -54,22 +89,26 @@ public void CreateStream_OnDemandMode_ReturnsValidStream()
5489
[Fact]
5590
public void CreateStream_MultiConcurrencyMode_ReturnsValidStream()
5691
{
57-
ResponseStreamFactory.InitializeInvocation("req-2", MaxResponseSize, isMultiConcurrency: true);
92+
var mock = new MockStreamingRuntimeApiClient();
93+
InitializeWithMock("req-2", isMultiConcurrency: true, mock);
5894

5995
var stream = ResponseStreamFactory.CreateStream();
6096

6197
Assert.NotNull(stream);
6298
Assert.IsAssignableFrom<IResponseStream>(stream);
6399
}
64100

101+
// --- Property 4: Single Stream Per Invocation ---
102+
65103
/// <summary>
66104
/// Property 4: Single Stream Per Invocation - calling CreateStream twice throws.
67105
/// Validates: Requirements 2.5, 2.6
68106
/// </summary>
69107
[Fact]
70108
public void CreateStream_CalledTwice_ThrowsInvalidOperationException()
71109
{
72-
ResponseStreamFactory.InitializeInvocation("req-3", MaxResponseSize, isMultiConcurrency: false);
110+
var mock = new MockStreamingRuntimeApiClient();
111+
InitializeWithMock("req-3", isMultiConcurrency: false, mock);
73112
ResponseStreamFactory.CreateStream();
74113

75114
Assert.Throws<InvalidOperationException>(() => ResponseStreamFactory.CreateStream());
@@ -82,25 +121,78 @@ public void CreateStream_OutsideInvocationContext_ThrowsInvalidOperationExceptio
82121
Assert.Throws<InvalidOperationException>(() => ResponseStreamFactory.CreateStream());
83122
}
84123

85-
// --- Task 3.5: Internal methods tests ---
124+
// --- CreateStream starts HTTP POST ---
125+
126+
/// <summary>
127+
/// Validates that CreateStream calls StartStreamingResponseAsync on the RuntimeApiClient.
128+
/// Validates: Requirements 1.3, 1.4, 2.2, 2.3, 2.4
129+
/// </summary>
130+
[Fact]
131+
public void CreateStream_CallsStartStreamingResponseAsync()
132+
{
133+
var mock = new MockStreamingRuntimeApiClient();
134+
InitializeWithMock("req-start", isMultiConcurrency: false, mock);
135+
136+
ResponseStreamFactory.CreateStream();
137+
138+
Assert.True(mock.StartStreamingCalled);
139+
Assert.Equal("req-start", mock.LastAwsRequestId);
140+
Assert.NotNull(mock.LastResponseStream);
141+
}
142+
143+
// --- GetSendTask ---
144+
145+
/// <summary>
146+
/// Validates that GetSendTask returns the task from the HTTP POST.
147+
/// Validates: Requirements 5.1, 7.3
148+
/// </summary>
149+
[Fact]
150+
public void GetSendTask_AfterCreateStream_ReturnsNonNullTask()
151+
{
152+
var mock = new MockStreamingRuntimeApiClient();
153+
InitializeWithMock("req-send", isMultiConcurrency: false, mock);
154+
155+
ResponseStreamFactory.CreateStream();
156+
157+
var sendTask = ResponseStreamFactory.GetSendTask(isMultiConcurrency: false);
158+
Assert.NotNull(sendTask);
159+
}
160+
161+
[Fact]
162+
public void GetSendTask_BeforeCreateStream_ReturnsNull()
163+
{
164+
var mock = new MockStreamingRuntimeApiClient();
165+
InitializeWithMock("req-nosend", isMultiConcurrency: false, mock);
166+
167+
var sendTask = ResponseStreamFactory.GetSendTask(isMultiConcurrency: false);
168+
Assert.Null(sendTask);
169+
}
170+
171+
[Fact]
172+
public void GetSendTask_NoContext_ReturnsNull()
173+
{
174+
Assert.Null(ResponseStreamFactory.GetSendTask(isMultiConcurrency: false));
175+
}
176+
177+
// --- Internal methods ---
86178

87179
[Fact]
88180
public void InitializeInvocation_OnDemand_SetsUpContext()
89181
{
90-
ResponseStreamFactory.InitializeInvocation("req-4", MaxResponseSize, isMultiConcurrency: false);
182+
var mock = new MockStreamingRuntimeApiClient();
183+
InitializeWithMock("req-4", isMultiConcurrency: false, mock);
91184

92-
// GetStreamIfCreated should return null since CreateStream hasn't been called
93185
Assert.Null(ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false));
94186

95-
// But CreateStream should work (proving context was set up)
96187
var stream = ResponseStreamFactory.CreateStream();
97188
Assert.NotNull(stream);
98189
}
99190

100191
[Fact]
101192
public void InitializeInvocation_MultiConcurrency_SetsUpContext()
102193
{
103-
ResponseStreamFactory.InitializeInvocation("req-5", MaxResponseSize, isMultiConcurrency: true);
194+
var mock = new MockStreamingRuntimeApiClient();
195+
InitializeWithMock("req-5", isMultiConcurrency: true, mock);
104196

105197
Assert.Null(ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true));
106198

@@ -111,11 +203,11 @@ public void InitializeInvocation_MultiConcurrency_SetsUpContext()
111203
[Fact]
112204
public void GetStreamIfCreated_AfterCreateStream_ReturnsStream()
113205
{
114-
ResponseStreamFactory.InitializeInvocation("req-6", MaxResponseSize, isMultiConcurrency: false);
115-
var created = ResponseStreamFactory.CreateStream();
206+
var mock = new MockStreamingRuntimeApiClient();
207+
InitializeWithMock("req-6", isMultiConcurrency: false, mock);
208+
ResponseStreamFactory.CreateStream();
116209

117210
var retrieved = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false);
118-
119211
Assert.NotNull(retrieved);
120212
}
121213

@@ -128,7 +220,8 @@ public void GetStreamIfCreated_NoContext_ReturnsNull()
128220
[Fact]
129221
public void CleanupInvocation_ClearsState()
130222
{
131-
ResponseStreamFactory.InitializeInvocation("req-7", MaxResponseSize, isMultiConcurrency: false);
223+
var mock = new MockStreamingRuntimeApiClient();
224+
InitializeWithMock("req-7", isMultiConcurrency: false, mock);
132225
ResponseStreamFactory.CreateStream();
133226

134227
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false);
@@ -137,24 +230,27 @@ public void CleanupInvocation_ClearsState()
137230
Assert.Throws<InvalidOperationException>(() => ResponseStreamFactory.CreateStream());
138231
}
139232

233+
// --- Property 16: State Isolation Between Invocations ---
234+
140235
/// <summary>
141236
/// Property 16: State Isolation Between Invocations - state from one invocation doesn't leak to the next.
142237
/// Validates: Requirements 6.5, 8.9
143238
/// </summary>
144239
[Fact]
145240
public void StateIsolation_SequentialInvocations_NoLeakage()
146241
{
242+
var mock = new MockStreamingRuntimeApiClient();
243+
147244
// First invocation - streaming
148-
ResponseStreamFactory.InitializeInvocation("req-8a", MaxResponseSize, isMultiConcurrency: false);
245+
InitializeWithMock("req-8a", isMultiConcurrency: false, mock);
149246
var stream1 = ResponseStreamFactory.CreateStream();
150247
Assert.NotNull(stream1);
151248
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false);
152249

153250
// Second invocation - should start fresh
154-
ResponseStreamFactory.InitializeInvocation("req-8b", MaxResponseSize, isMultiConcurrency: false);
251+
InitializeWithMock("req-8b", isMultiConcurrency: false, mock);
155252
Assert.Null(ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: false));
156253

157-
// Should be able to create a new stream
158254
var stream2 = ResponseStreamFactory.CreateStream();
159255
Assert.NotNull(stream2);
160256
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false);
@@ -167,17 +263,14 @@ public void StateIsolation_SequentialInvocations_NoLeakage()
167263
[Fact]
168264
public async Task StateIsolation_MultiConcurrency_UsesAsyncLocal()
169265
{
170-
// Initialize in multi-concurrency mode on main thread
171-
ResponseStreamFactory.InitializeInvocation("req-9", MaxResponseSize, isMultiConcurrency: true);
266+
var mock = new MockStreamingRuntimeApiClient();
267+
InitializeWithMock("req-9", isMultiConcurrency: true, mock);
172268
var stream = ResponseStreamFactory.CreateStream();
173269
Assert.NotNull(stream);
174270

175-
// A separate task should not see the main thread's context
176-
// (AsyncLocal flows to child tasks, but a fresh Task.Run with new initialization should override)
177271
bool childSawNull = false;
178272
await Task.Run(() =>
179273
{
180-
// Clean up the flowed context first
181274
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true);
182275
childSawNull = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true) == null;
183276
});

0 commit comments

Comments
 (0)