Skip to content

Commit 5e29c21

Browse files
committed
Task 4 (after redesign)
1 parent 0aa892b commit 5e29c21

File tree

5 files changed

+546
-251
lines changed

5 files changed

+546
-251
lines changed

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ResponseStream.cs

Lines changed: 68 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,62 +14,70 @@
1414
*/
1515

1616
using System;
17-
using System.Collections.Generic;
18-
using System.Linq;
17+
using System.IO;
18+
using System.Text;
1919
using System.Threading;
2020
using System.Threading.Tasks;
2121

2222
namespace Amazon.Lambda.RuntimeSupport
2323
{
2424
/// <summary>
25-
/// Internal implementation of IResponseStream.
26-
/// Buffers written data as chunks for HTTP chunked transfer encoding.
25+
/// Internal implementation of IResponseStream with true streaming.
26+
/// Writes data directly to the HTTP output stream as chunked transfer encoding.
2727
/// </summary>
2828
internal class ResponseStream : IResponseStream
2929
{
30+
private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n");
31+
3032
private readonly long _maxResponseSize;
31-
private readonly List<byte[]> _chunks;
3233
private long _bytesWritten;
3334
private bool _isCompleted;
3435
private bool _hasError;
3536
private Exception _reportedError;
3637
private readonly object _lock = new object();
3738

39+
// The live HTTP output stream, set by StreamingHttpContent when SerializeToStreamAsync is called.
40+
private Stream _httpOutputStream;
41+
private readonly SemaphoreSlim _httpStreamReady = new SemaphoreSlim(0, 1);
42+
private readonly SemaphoreSlim _completionSignal = new SemaphoreSlim(0, 1);
43+
3844
public long BytesWritten => _bytesWritten;
3945
public bool IsCompleted => _isCompleted;
4046
public bool HasError => _hasError;
47+
internal Exception ReportedError => _reportedError;
4148

42-
internal IReadOnlyList<byte[]> Chunks
49+
public ResponseStream(long maxResponseSize)
4350
{
44-
get
45-
{
46-
lock (_lock)
47-
{
48-
return _chunks.ToList();
49-
}
50-
}
51+
_maxResponseSize = maxResponseSize;
5152
}
5253

53-
internal Exception ReportedError => _reportedError;
54+
/// <summary>
55+
/// Called by StreamingHttpContent.SerializeToStreamAsync to provide the HTTP output stream.
56+
/// </summary>
57+
internal void SetHttpOutputStream(Stream httpOutputStream)
58+
{
59+
_httpOutputStream = httpOutputStream;
60+
_httpStreamReady.Release();
61+
}
5462

55-
public ResponseStream(long maxResponseSize)
63+
/// <summary>
64+
/// Called by StreamingHttpContent.SerializeToStreamAsync to wait until the handler
65+
/// finishes writing (MarkCompleted or ReportErrorAsync).
66+
/// </summary>
67+
internal async Task WaitForCompletionAsync()
5668
{
57-
_maxResponseSize = maxResponseSize;
58-
_chunks = new List<byte[]>();
59-
_bytesWritten = 0;
60-
_isCompleted = false;
61-
_hasError = false;
69+
await _completionSignal.WaitAsync();
6270
}
6371

64-
public Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
72+
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
6573
{
6674
if (buffer == null)
6775
throw new ArgumentNullException(nameof(buffer));
6876

69-
return WriteAsync(buffer, 0, buffer.Length, cancellationToken);
77+
await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
7078
}
7179

72-
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
80+
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
7381
{
7482
if (buffer == null)
7583
throw new ArgumentNullException(nameof(buffer));
@@ -78,45 +86,45 @@ public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken c
7886
if (count < 0 || offset + count > buffer.Length)
7987
throw new ArgumentOutOfRangeException(nameof(count));
8088

81-
lock (_lock)
89+
// Wait for the HTTP stream to be ready (first write only blocks)
90+
await _httpStreamReady.WaitAsync(cancellationToken);
91+
try
8292
{
83-
ThrowIfCompletedOrError();
84-
85-
if (_bytesWritten + count > _maxResponseSize)
93+
lock (_lock)
8694
{
87-
throw new InvalidOperationException(
88-
$"Writing {count} bytes would exceed the maximum response size of {_maxResponseSize} bytes (20 MiB). " +
89-
$"Current size: {_bytesWritten} bytes.");
90-
}
91-
92-
var chunk = new byte[count];
93-
Array.Copy(buffer, offset, chunk, 0, count);
94-
_chunks.Add(chunk);
95-
_bytesWritten += count;
96-
}
95+
ThrowIfCompletedOrError();
9796

98-
return Task.CompletedTask;
99-
}
100-
101-
public Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
102-
{
103-
lock (_lock)
104-
{
105-
ThrowIfCompletedOrError();
97+
if (_bytesWritten + count > _maxResponseSize)
98+
{
99+
throw new InvalidOperationException(
100+
$"Writing {count} bytes would exceed the maximum response size of {_maxResponseSize} bytes (20 MiB). " +
101+
$"Current size: {_bytesWritten} bytes.");
102+
}
106103

107-
if (_bytesWritten + buffer.Length > _maxResponseSize)
108-
{
109-
throw new InvalidOperationException(
110-
$"Writing {buffer.Length} bytes would exceed the maximum response size of {_maxResponseSize} bytes (20 MiB). " +
111-
$"Current size: {_bytesWritten} bytes.");
104+
_bytesWritten += count;
112105
}
113106

114-
var chunk = buffer.ToArray();
115-
_chunks.Add(chunk);
116-
_bytesWritten += buffer.Length;
107+
// Write chunk directly to the HTTP stream: size(hex) + CRLF + data + CRLF
108+
var chunkSizeHex = count.ToString("X");
109+
var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
110+
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
111+
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
112+
await _httpOutputStream.WriteAsync(buffer, offset, count, cancellationToken);
113+
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
114+
await _httpOutputStream.FlushAsync(cancellationToken);
115+
}
116+
finally
117+
{
118+
// Re-release so subsequent writes don't block
119+
_httpStreamReady.Release();
117120
}
121+
}
118122

119-
return Task.CompletedTask;
123+
public async Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
124+
{
125+
// Convert to array and delegate — small overhead but keeps the API simple
126+
var array = buffer.ToArray();
127+
await WriteAsync(array, 0, array.Length, cancellationToken);
120128
}
121129

122130
public Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default)
@@ -135,6 +143,8 @@ public Task ReportErrorAsync(Exception exception, CancellationToken cancellation
135143
_reportedError = exception;
136144
}
137145

146+
// Signal completion so StreamingHttpContent can write error trailers and finish
147+
_completionSignal.Release();
138148
return Task.CompletedTask;
139149
}
140150

@@ -144,6 +154,8 @@ internal void MarkCompleted()
144154
{
145155
_isCompleted = true;
146156
}
157+
// Signal completion so StreamingHttpContent can write the final chunk and finish
158+
_completionSignal.Release();
147159
}
148160

149161
private void ThrowIfCompletedOrError()
@@ -156,7 +168,8 @@ private void ThrowIfCompletedOrError()
156168

157169
public void Dispose()
158170
{
159-
// Nothing to dispose - all data is in managed memory
171+
// Ensure completion is signaled if not already
172+
try { _completionSignal.Release(); } catch (SemaphoreFullException) { }
160173
}
161174
}
162175
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ResponseStreamContext.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16+
using System.Threading;
17+
using System.Threading.Tasks;
18+
1619
namespace Amazon.Lambda.RuntimeSupport
1720
{
1821
/// <summary>
@@ -39,5 +42,21 @@ internal class ResponseStreamContext
3942
/// The ResponseStream instance if created.
4043
/// </summary>
4144
public ResponseStream Stream { get; set; }
45+
46+
/// <summary>
47+
/// The RuntimeApiClient used to start the streaming HTTP POST.
48+
/// </summary>
49+
public RuntimeApiClient RuntimeApiClient { get; set; }
50+
51+
/// <summary>
52+
/// Cancellation token for the current invocation.
53+
/// </summary>
54+
public CancellationToken CancellationToken { get; set; }
55+
56+
/// <summary>
57+
/// The Task representing the in-flight HTTP POST to the Runtime API.
58+
/// Started when CreateStream() is called, completes when the stream is finalized.
59+
/// </summary>
60+
public Task SendTask { get; set; }
4261
}
4362
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingHttpContent.cs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,24 @@ public StreamingHttpContent(ResponseStream responseStream)
3939

4040
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
4141
{
42-
foreach (var chunk in _responseStream.Chunks)
43-
{
44-
await WriteChunkAsync(stream, chunk);
45-
}
42+
// Hand the HTTP output stream to ResponseStream so WriteAsync calls
43+
// can write chunks directly to it.
44+
_responseStream.SetHttpOutputStream(stream);
4645

47-
await WriteFinalChunkAsync(stream);
46+
// Wait for the handler to finish writing (MarkCompleted or ReportErrorAsync)
47+
await _responseStream.WaitForCompletionAsync();
4848

49+
// Write final chunk
50+
await stream.WriteAsync(FinalChunkBytes, 0, FinalChunkBytes.Length);
51+
52+
// Write error trailers if present
4953
if (_responseStream.HasError)
5054
{
5155
await WriteErrorTrailersAsync(stream, _responseStream.ReportedError);
5256
}
5357

58+
// Write final CRLF to end the chunked message
59+
await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length);
5460
await stream.FlushAsync();
5561
}
5662

@@ -60,22 +66,6 @@ protected override bool TryComputeLength(out long length)
6066
return false;
6167
}
6268

63-
private async Task WriteChunkAsync(Stream stream, byte[] data)
64-
{
65-
var chunkSizeHex = data.Length.ToString("X");
66-
var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
67-
68-
await stream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length);
69-
await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length);
70-
await stream.WriteAsync(data, 0, data.Length);
71-
await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length);
72-
}
73-
74-
private async Task WriteFinalChunkAsync(Stream stream)
75-
{
76-
await stream.WriteAsync(FinalChunkBytes, 0, FinalChunkBytes.Length);
77-
}
78-
7969
private async Task WriteErrorTrailersAsync(Stream stream, Exception exception)
8070
{
8171
var exceptionInfo = ExceptionInfo.GetExceptionInfo(exception);
@@ -88,8 +78,6 @@ private async Task WriteErrorTrailersAsync(Stream stream, Exception exception)
8878
var errorBodyHeader = $"{StreamingConstants.ErrorBodyTrailer}: {errorBodyJson}\r\n";
8979
var errorBodyBytes = Encoding.UTF8.GetBytes(errorBodyHeader);
9080
await stream.WriteAsync(errorBodyBytes, 0, errorBodyBytes.Length);
91-
92-
await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length);
9381
}
9482
}
9583
}

0 commit comments

Comments
 (0)