Skip to content

Commit 414a449

Browse files
committed
Refactoring
1 parent 0b8dd52 commit 414a449

18 files changed

+282
-309
lines changed

Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,8 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
363363
var runtimeApiClient = Client as RuntimeApiClient;
364364
if (runtimeApiClient != null)
365365
{
366-
ResponseStreamFactory.InitializeInvocation(
366+
LambdaResponseStreamFactory.InitializeInvocation(
367367
invocation.LambdaContext.AwsRequestId,
368-
StreamingConstants.MaxResponseSize,
369368
isMultiConcurrency,
370369
runtimeApiClient,
371370
cancellationToken);
@@ -386,7 +385,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
386385
{
387386
WriteUnhandledExceptionToLog(exception);
388387

389-
var streamIfCreated = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
388+
var streamIfCreated = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
390389
if (streamIfCreated != null && streamIfCreated.BytesWritten > 0)
391390
{
392391
// Midstream error — report via trailers on the already-open HTTP connection
@@ -404,10 +403,10 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
404403
}
405404

406405
// If streaming was started, await the HTTP send task to ensure it completes
407-
var sendTask = ResponseStreamFactory.GetSendTask(isMultiConcurrency);
406+
var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency);
408407
if (sendTask != null)
409408
{
410-
var stream = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
409+
var stream = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
411410
if (stream != null && !stream.IsCompleted && !stream.HasError)
412411
{
413412
// Handler returned successfully — signal stream completion
@@ -454,7 +453,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
454453
{
455454
if (runtimeApiClient != null)
456455
{
457-
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency);
456+
LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency);
458457
}
459458
invocation.Dispose();
460459
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/IResponseStream.cs renamed to Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ILambdaResponseStream.cs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace Amazon.Lambda.RuntimeSupport
2323
/// Interface for writing streaming responses in AWS Lambda functions.
2424
/// Obtained by calling ResponseStreamFactory.CreateStream() within a handler.
2525
/// </summary>
26-
public interface IResponseStream : IDisposable
26+
public interface ILambdaResponseStream : IDisposable
2727
{
2828
/// <summary>
2929
/// Asynchronously writes a byte array to the response stream.
@@ -32,7 +32,6 @@ public interface IResponseStream : IDisposable
3232
/// <param name="cancellationToken">Optional cancellation token.</param>
3333
/// <returns>A task representing the asynchronous operation.</returns>
3434
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
35-
/// <exception cref="InvalidOperationException">Thrown if writing would exceed the 20 MiB limit.</exception>
3635
Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default);
3736

3837
/// <summary>
@@ -44,19 +43,8 @@ public interface IResponseStream : IDisposable
4443
/// <param name="cancellationToken">Optional cancellation token.</param>
4544
/// <returns>A task representing the asynchronous operation.</returns>
4645
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
47-
/// <exception cref="InvalidOperationException">Thrown if writing would exceed the 20 MiB limit.</exception>
4846
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);
4947

50-
/// <summary>
51-
/// Asynchronously writes a memory buffer to the response stream.
52-
/// </summary>
53-
/// <param name="buffer">The memory buffer to write.</param>
54-
/// <param name="cancellationToken">Optional cancellation token.</param>
55-
/// <returns>A task representing the asynchronous operation.</returns>
56-
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
57-
/// <exception cref="InvalidOperationException">Thrown if writing would exceed the 20 MiB limit.</exception>
58-
Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);
59-
6048
/// <summary>
6149
/// Reports an error that occurred during streaming.
6250
/// This will send error information via HTTP trailing headers.

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/InvocationResponse.cs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,6 @@ public class InvocationResponse
3434
/// </summary>
3535
public bool DisposeOutputStream { get; private set; } = true;
3636

37-
/// <summary>
38-
/// Indicates whether this response uses streaming mode.
39-
/// Set internally by the runtime when ResponseStreamFactory.CreateStream() is called.
40-
/// </summary>
41-
internal bool IsStreaming { get; set; }
42-
43-
/// <summary>
44-
/// The ResponseStream instance if streaming mode is used.
45-
/// Set internally by the runtime.
46-
/// </summary>
47-
internal ResponseStream ResponseStream { get; set; }
48-
4937
/// <summary>
5038
/// Construct a InvocationResponse with an output stream that will be disposed by the Lambda Runtime Client.
5139
/// </summary>
@@ -64,20 +52,6 @@ public InvocationResponse(Stream outputStream, bool disposeOutputStream)
6452
{
6553
OutputStream = outputStream ?? throw new ArgumentNullException(nameof(outputStream));
6654
DisposeOutputStream = disposeOutputStream;
67-
IsStreaming = false;
68-
}
69-
70-
/// <summary>
71-
/// Creates an InvocationResponse for a streaming response.
72-
/// Used internally by the runtime.
73-
/// </summary>
74-
internal static InvocationResponse CreateStreamingResponse(ResponseStream responseStream)
75-
{
76-
return new InvocationResponse(Stream.Null, false)
77-
{
78-
IsStreaming = true,
79-
ResponseStream = responseStream
80-
};
8155
}
8256
}
8357
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ResponseStream.cs renamed to Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
namespace Amazon.Lambda.RuntimeSupport
2323
{
2424
/// <summary>
25-
/// Internal implementation of IResponseStream with true streaming.
26-
/// Writes data directly to the HTTP output stream as chunked transfer encoding.
25+
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
26+
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
2727
/// </summary>
28-
internal class ResponseStream : IResponseStream
28+
public partial class LambdaResponseStream : Stream, ILambdaResponseStream
2929
{
3030
private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n");
3131

32-
private readonly long _maxResponseSize;
3332
private long _bytesWritten;
3433
private bool _isCompleted;
3534
private bool _hasError;
@@ -41,14 +40,26 @@ internal class ResponseStream : IResponseStream
4140
private readonly SemaphoreSlim _httpStreamReady = new SemaphoreSlim(0, 1);
4241
private readonly SemaphoreSlim _completionSignal = new SemaphoreSlim(0, 1);
4342

43+
/// <summary>
44+
/// The number of bytes written to the Lambda response stream so far.
45+
/// </summary>
4446
public long BytesWritten => _bytesWritten;
47+
48+
/// <summary>
49+
/// Gets a value indicating whether the operation has completed.
50+
/// </summary>
4551
public bool IsCompleted => _isCompleted;
52+
53+
/// <summary>
54+
/// Gets a value indicating whether an error has occurred.
55+
/// </summary>
4656
public bool HasError => _hasError;
57+
58+
4759
internal Exception ReportedError => _reportedError;
4860

49-
public ResponseStream(long maxResponseSize)
61+
internal LambdaResponseStream()
5062
{
51-
_maxResponseSize = maxResponseSize;
5263
}
5364

5465
/// <summary>
@@ -69,6 +80,13 @@ internal async Task WaitForCompletionAsync()
6980
await _completionSignal.WaitAsync();
7081
}
7182

83+
/// <summary>
84+
/// Asynchronously writes a byte array to the response stream.
85+
/// </summary>
86+
/// <param name="buffer">The byte array to write.</param>
87+
/// <param name="cancellationToken">Optional cancellation token.</param>
88+
/// <returns>A task representing the asynchronous operation.</returns>
89+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
7290
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
7391
{
7492
if (buffer == null)
@@ -77,7 +95,16 @@ public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken
7795
await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
7896
}
7997

80-
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
98+
/// <summary>
99+
/// Asynchronously writes a portion of a byte array to the response stream.
100+
/// </summary>
101+
/// <param name="buffer">The byte array containing data to write.</param>
102+
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
103+
/// <param name="count">The number of bytes to write.</param>
104+
/// <param name="cancellationToken">Optional cancellation token.</param>
105+
/// <returns>A task representing the asynchronous operation.</returns>
106+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
107+
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
81108
{
82109
if (buffer == null)
83110
throw new ArgumentNullException(nameof(buffer));
@@ -93,14 +120,6 @@ public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationT
93120
lock (_lock)
94121
{
95122
ThrowIfCompletedOrError();
96-
97-
if (_bytesWritten + count > _maxResponseSize)
98-
{
99-
throw new InvalidOperationException(
100-
$"Writing {count} bytes would exceed the maximum response size of {_maxResponseSize} bytes (20 MiB). " +
101-
$"Current size: {_bytesWritten} bytes.");
102-
}
103-
104123
_bytesWritten += count;
105124
}
106125

@@ -120,13 +139,14 @@ public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationT
120139
}
121140
}
122141

123-
public async Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
124-
{
125-
// Convert to array and delegate — small overhead but keeps the API simple
126-
var array = buffer.ToArray();
127-
await WriteAsync(array, 0, array.Length, cancellationToken);
128-
}
129-
142+
/// <summary>
143+
/// Reports an error that occurred during streaming.
144+
/// This will send error information via HTTP trailing headers.
145+
/// </summary>
146+
/// <param name="exception">The exception to report.</param>
147+
/// <param name="cancellationToken">Optional cancellation token.</param>
148+
/// <returns>A task representing the asynchronous operation.</returns>
149+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has already been reported.</exception>
130150
public Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default)
131151
{
132152
if (exception == null)
@@ -145,6 +165,7 @@ public Task ReportErrorAsync(Exception exception, CancellationToken cancellation
145165

146166
// Signal completion so StreamingHttpContent can write error trailers and finish
147167
_completionSignal.Release();
168+
148169
return Task.CompletedTask;
149170
}
150171

@@ -166,10 +187,17 @@ private void ThrowIfCompletedOrError()
166187
throw new InvalidOperationException("Cannot write to a stream after an error has been reported.");
167188
}
168189

169-
public void Dispose()
190+
// ── Dispose ──────────────────────────────────────────────────────────
191+
192+
/// <inheritdoc/>
193+
protected override void Dispose(bool disposing)
170194
{
171-
// Ensure completion is signaled if not already
172-
try { _completionSignal.Release(); } catch (SemaphoreFullException) { }
195+
if (disposing)
196+
{
197+
try { _completionSignal.Release(); } catch (SemaphoreFullException) { }
198+
}
199+
200+
base.Dispose(disposing);
173201
}
174202
}
175203
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.IO;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
21+
namespace Amazon.Lambda.RuntimeSupport
22+
{
23+
/// <summary>
24+
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
25+
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
26+
/// Integrates with standard .NET stream consumers such as <see cref="System.IO.StreamWriter"/>.
27+
/// </summary>
28+
public partial class LambdaResponseStream : Stream, ILambdaResponseStream
29+
{
30+
// ── System.IO.Stream — capabilities ─────────────────────────────────
31+
32+
/// <summary>Gets a value indicating whether the stream supports reading. Always <c>false</c>.</summary>
33+
public override bool CanRead => false;
34+
35+
/// <summary>Gets a value indicating whether the stream supports seeking. Always <c>false</c>.</summary>
36+
public override bool CanSeek => false;
37+
38+
/// <summary>Gets a value indicating whether the stream supports writing. Always <c>true</c>.</summary>
39+
public override bool CanWrite => true;
40+
41+
// ── System.IO.Stream — Length / Position ────────────────────────────
42+
43+
/// <summary>
44+
/// Gets the total number of bytes written to the stream so far.
45+
/// Equivalent to <see cref="BytesWritten"/>.
46+
/// </summary>
47+
public override long Length => BytesWritten;
48+
49+
/// <summary>
50+
/// Getting or setting the position is not supported.
51+
/// </summary>
52+
/// <exception cref="NotSupportedException">Always thrown.</exception>
53+
public override long Position
54+
{
55+
get => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
56+
set => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
57+
}
58+
59+
// ── System.IO.Stream — seek / read (not supported) ──────────────────
60+
61+
/// <summary>Not supported.</summary>
62+
/// <exception cref="NotImplementedException">Always thrown.</exception>
63+
public override long Seek(long offset, SeekOrigin origin)
64+
=> throw new NotImplementedException("LambdaResponseStream does not support seeking.");
65+
66+
/// <summary>Not supported.</summary>
67+
/// <exception cref="NotImplementedException">Always thrown.</exception>
68+
public override int Read(byte[] buffer, int offset, int count)
69+
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");
70+
71+
/// <summary>Not supported.</summary>
72+
/// <exception cref="NotImplementedException">Always thrown.</exception>
73+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
74+
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");
75+
76+
// ── System.IO.Stream — write ─────────────────────────────────────────
77+
78+
/// <summary>
79+
/// Writes a sequence of bytes to the stream. Delegates to the async path synchronously.
80+
/// Prefer <see cref="WriteAsync(byte[], int, int, CancellationToken)"/> to avoid blocking.
81+
/// </summary>
82+
public override void Write(byte[] buffer, int offset, int count)
83+
=> WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
84+
85+
// ── System.IO.Stream — flush / set length ────────────────────────────
86+
87+
/// <summary>
88+
/// Flush is a no-op; data is sent to the Runtime API immediately on each write.
89+
/// </summary>
90+
public override void Flush() { }
91+
92+
/// <summary>Not supported.</summary>
93+
/// <exception cref="NotSupportedException">Always thrown.</exception>
94+
public override void SetLength(long value)
95+
=> throw new NotSupportedException("LambdaResponseStream does not support SetLength.");
96+
}
97+
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ResponseStreamContext.cs renamed to Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStreamContext.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,13 @@ namespace Amazon.Lambda.RuntimeSupport
2121
/// <summary>
2222
/// Internal context class used by ResponseStreamFactory to track per-invocation streaming state.
2323
/// </summary>
24-
internal class ResponseStreamContext
24+
internal class LambdaResponseStreamContext
2525
{
2626
/// <summary>
2727
/// The AWS request ID for the current invocation.
2828
/// </summary>
2929
public string AwsRequestId { get; set; }
3030

31-
/// <summary>
32-
/// Maximum allowed response size in bytes (20 MiB).
33-
/// </summary>
34-
public long MaxResponseSize { get; set; }
35-
3631
/// <summary>
3732
/// Whether CreateStream() has been called for this invocation.
3833
/// </summary>
@@ -41,7 +36,7 @@ internal class ResponseStreamContext
4136
/// <summary>
4237
/// The ResponseStream instance if created.
4338
/// </summary>
44-
public ResponseStream Stream { get; set; }
39+
public LambdaResponseStream Stream { get; set; }
4540

4641
/// <summary>
4742
/// The RuntimeApiClient used to start the streaming HTTP POST.

0 commit comments

Comments
 (0)