Skip to content

Commit 5e0f810

Browse files
committed
Task 2
1 parent 22d1469 commit 5e0f810

File tree

3 files changed

+373
-3
lines changed

3 files changed

+373
-3
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Generic;
18+
using System.Linq;
19+
using System.Threading;
20+
using System.Threading.Tasks;
21+
22+
namespace Amazon.Lambda.RuntimeSupport
23+
{
24+
/// <summary>
25+
/// Internal implementation of IResponseStream.
26+
/// Buffers written data as chunks for HTTP chunked transfer encoding.
27+
/// </summary>
28+
internal class ResponseStream : IResponseStream
29+
{
30+
private readonly long _maxResponseSize;
31+
private readonly List<byte[]> _chunks;
32+
private long _bytesWritten;
33+
private bool _isCompleted;
34+
private bool _hasError;
35+
private Exception _reportedError;
36+
private readonly object _lock = new object();
37+
38+
public long BytesWritten => _bytesWritten;
39+
public bool IsCompleted => _isCompleted;
40+
public bool HasError => _hasError;
41+
42+
internal IReadOnlyList<byte[]> Chunks
43+
{
44+
get
45+
{
46+
lock (_lock)
47+
{
48+
return _chunks.ToList();
49+
}
50+
}
51+
}
52+
53+
internal Exception ReportedError => _reportedError;
54+
55+
public ResponseStream(long maxResponseSize)
56+
{
57+
_maxResponseSize = maxResponseSize;
58+
_chunks = new List<byte[]>();
59+
_bytesWritten = 0;
60+
_isCompleted = false;
61+
_hasError = false;
62+
}
63+
64+
public Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
65+
{
66+
if (buffer == null)
67+
throw new ArgumentNullException(nameof(buffer));
68+
69+
return WriteAsync(buffer, 0, buffer.Length, cancellationToken);
70+
}
71+
72+
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
73+
{
74+
if (buffer == null)
75+
throw new ArgumentNullException(nameof(buffer));
76+
if (offset < 0 || offset > buffer.Length)
77+
throw new ArgumentOutOfRangeException(nameof(offset));
78+
if (count < 0 || offset + count > buffer.Length)
79+
throw new ArgumentOutOfRangeException(nameof(count));
80+
81+
lock (_lock)
82+
{
83+
ThrowIfCompletedOrError();
84+
85+
if (_bytesWritten + count > _maxResponseSize)
86+
{
87+
throw new InvalidOperationException(
88+
$"Writing {count} bytes would exceed the maximum response size of {_maxResponseSize} bytes (20 MiB). " +
89+
$"Current size: {_bytesWritten} bytes.");
90+
}
91+
92+
var chunk = new byte[count];
93+
Array.Copy(buffer, offset, chunk, 0, count);
94+
_chunks.Add(chunk);
95+
_bytesWritten += count;
96+
}
97+
98+
return Task.CompletedTask;
99+
}
100+
101+
public Task WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
102+
{
103+
lock (_lock)
104+
{
105+
ThrowIfCompletedOrError();
106+
107+
if (_bytesWritten + buffer.Length > _maxResponseSize)
108+
{
109+
throw new InvalidOperationException(
110+
$"Writing {buffer.Length} bytes would exceed the maximum response size of {_maxResponseSize} bytes (20 MiB). " +
111+
$"Current size: {_bytesWritten} bytes.");
112+
}
113+
114+
var chunk = buffer.ToArray();
115+
_chunks.Add(chunk);
116+
_bytesWritten += buffer.Length;
117+
}
118+
119+
return Task.CompletedTask;
120+
}
121+
122+
public Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default)
123+
{
124+
if (exception == null)
125+
throw new ArgumentNullException(nameof(exception));
126+
127+
lock (_lock)
128+
{
129+
if (_isCompleted)
130+
throw new InvalidOperationException("Cannot report an error after the stream has been completed.");
131+
if (_hasError)
132+
throw new InvalidOperationException("An error has already been reported for this stream.");
133+
134+
_hasError = true;
135+
_reportedError = exception;
136+
}
137+
138+
return Task.CompletedTask;
139+
}
140+
141+
internal void MarkCompleted()
142+
{
143+
lock (_lock)
144+
{
145+
_isCompleted = true;
146+
}
147+
}
148+
149+
private void ThrowIfCompletedOrError()
150+
{
151+
if (_isCompleted)
152+
throw new InvalidOperationException("Cannot write to a completed stream.");
153+
if (_hasError)
154+
throw new InvalidOperationException("Cannot write to a stream after an error has been reported.");
155+
}
156+
157+
public void Dispose()
158+
{
159+
// Nothing to dispose - all data is in managed memory
160+
}
161+
}
162+
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ResponseStreamContext.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ internal class ResponseStreamContext
3636
public bool StreamCreated { get; set; }
3737

3838
/// <summary>
39-
/// The IResponseStream instance if created. Typed as IResponseStream for now;
40-
/// will be used with the concrete ResponseStream internally.
39+
/// The ResponseStream instance if created.
4140
/// </summary>
42-
public IResponseStream Stream { get; set; }
41+
public ResponseStream Stream { get; set; }
4342
}
4443
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Threading.Tasks;
18+
using Xunit;
19+
20+
namespace Amazon.Lambda.RuntimeSupport.UnitTests
21+
{
22+
public class ResponseStreamTests
23+
{
24+
private const long MaxResponseSize = 20 * 1024 * 1024; // 20 MiB
25+
26+
[Fact]
27+
public void Constructor_InitializesStateCorrectly()
28+
{
29+
var stream = new ResponseStream(MaxResponseSize);
30+
31+
Assert.Equal(0, stream.BytesWritten);
32+
Assert.False(stream.IsCompleted);
33+
Assert.False(stream.HasError);
34+
Assert.Empty(stream.Chunks);
35+
Assert.Null(stream.ReportedError);
36+
}
37+
38+
[Fact]
39+
public async Task WriteAsync_ByteArray_BuffersDataCorrectly()
40+
{
41+
var stream = new ResponseStream(MaxResponseSize);
42+
var data = new byte[] { 1, 2, 3, 4, 5 };
43+
44+
await stream.WriteAsync(data);
45+
46+
Assert.Equal(5, stream.BytesWritten);
47+
Assert.Single(stream.Chunks);
48+
Assert.Equal(data, stream.Chunks[0]);
49+
}
50+
51+
[Fact]
52+
public async Task WriteAsync_WithOffset_BuffersCorrectSlice()
53+
{
54+
var stream = new ResponseStream(MaxResponseSize);
55+
var data = new byte[] { 0, 1, 2, 3, 0 };
56+
57+
await stream.WriteAsync(data, 1, 3);
58+
59+
Assert.Equal(3, stream.BytesWritten);
60+
Assert.Equal(new byte[] { 1, 2, 3 }, stream.Chunks[0]);
61+
}
62+
63+
[Fact]
64+
public async Task WriteAsync_ReadOnlyMemory_BuffersDataCorrectly()
65+
{
66+
var stream = new ResponseStream(MaxResponseSize);
67+
var data = new ReadOnlyMemory<byte>(new byte[] { 10, 20, 30 });
68+
69+
await stream.WriteAsync(data);
70+
71+
Assert.Equal(3, stream.BytesWritten);
72+
Assert.Equal(new byte[] { 10, 20, 30 }, stream.Chunks[0]);
73+
}
74+
75+
[Fact]
76+
public async Task WriteAsync_MultipleWrites_AccumulatesBytesWritten()
77+
{
78+
var stream = new ResponseStream(MaxResponseSize);
79+
80+
await stream.WriteAsync(new byte[100]);
81+
await stream.WriteAsync(new byte[200]);
82+
await stream.WriteAsync(new byte[300]);
83+
84+
Assert.Equal(600, stream.BytesWritten);
85+
Assert.Equal(3, stream.Chunks.Count);
86+
}
87+
88+
[Fact]
89+
public async Task WriteAsync_CopiesData_AvoidingBufferReuseIssues()
90+
{
91+
var stream = new ResponseStream(MaxResponseSize);
92+
var buffer = new byte[] { 1, 2, 3 };
93+
94+
await stream.WriteAsync(buffer);
95+
buffer[0] = 99; // mutate original
96+
97+
Assert.Equal(1, stream.Chunks[0][0]); // chunk should be unaffected
98+
}
99+
100+
/// <summary>
101+
/// Property 6: Size Limit Enforcement - Writing beyond 20 MiB throws InvalidOperationException.
102+
/// Validates: Requirements 3.6, 3.7
103+
/// </summary>
104+
[Theory]
105+
[InlineData(21 * 1024 * 1024)] // Single write exceeding limit
106+
public async Task SizeLimit_SingleWriteExceedingLimit_Throws(int writeSize)
107+
{
108+
var stream = new ResponseStream(MaxResponseSize);
109+
var data = new byte[writeSize];
110+
111+
await Assert.ThrowsAsync<InvalidOperationException>(() => stream.WriteAsync(data));
112+
}
113+
114+
/// <summary>
115+
/// Property 6: Size Limit Enforcement - Multiple writes exceeding 20 MiB throws.
116+
/// Validates: Requirements 3.6, 3.7
117+
/// </summary>
118+
[Fact]
119+
public async Task SizeLimit_MultipleWritesExceedingLimit_Throws()
120+
{
121+
var stream = new ResponseStream(MaxResponseSize);
122+
123+
await stream.WriteAsync(new byte[10 * 1024 * 1024]);
124+
await Assert.ThrowsAsync<InvalidOperationException>(
125+
() => stream.WriteAsync(new byte[11 * 1024 * 1024]));
126+
}
127+
128+
[Fact]
129+
public async Task SizeLimit_ExactlyAtLimit_Succeeds()
130+
{
131+
var stream = new ResponseStream(MaxResponseSize);
132+
var data = new byte[20 * 1024 * 1024];
133+
134+
await stream.WriteAsync(data);
135+
136+
Assert.Equal(MaxResponseSize, stream.BytesWritten);
137+
}
138+
139+
/// <summary>
140+
/// Property 19: Writes After Completion Rejected - Writes after completion throw InvalidOperationException.
141+
/// Validates: Requirements 8.8
142+
/// </summary>
143+
[Fact]
144+
public async Task WriteAsync_AfterMarkCompleted_Throws()
145+
{
146+
var stream = new ResponseStream(MaxResponseSize);
147+
await stream.WriteAsync(new byte[] { 1 });
148+
stream.MarkCompleted();
149+
150+
await Assert.ThrowsAsync<InvalidOperationException>(
151+
() => stream.WriteAsync(new byte[] { 2 }));
152+
}
153+
154+
[Fact]
155+
public async Task WriteAsync_AfterReportError_Throws()
156+
{
157+
var stream = new ResponseStream(MaxResponseSize);
158+
await stream.WriteAsync(new byte[] { 1 });
159+
await stream.ReportErrorAsync(new Exception("test"));
160+
161+
await Assert.ThrowsAsync<InvalidOperationException>(
162+
() => stream.WriteAsync(new byte[] { 2 }));
163+
}
164+
165+
// --- Error handling tests (2.6) ---
166+
167+
[Fact]
168+
public async Task ReportErrorAsync_SetsErrorState()
169+
{
170+
var stream = new ResponseStream(MaxResponseSize);
171+
var exception = new InvalidOperationException("something broke");
172+
173+
await stream.ReportErrorAsync(exception);
174+
175+
Assert.True(stream.HasError);
176+
Assert.Same(exception, stream.ReportedError);
177+
}
178+
179+
[Fact]
180+
public async Task ReportErrorAsync_AfterCompleted_Throws()
181+
{
182+
var stream = new ResponseStream(MaxResponseSize);
183+
stream.MarkCompleted();
184+
185+
await Assert.ThrowsAsync<InvalidOperationException>(
186+
() => stream.ReportErrorAsync(new Exception("test")));
187+
}
188+
189+
[Fact]
190+
public async Task ReportErrorAsync_CalledTwice_Throws()
191+
{
192+
var stream = new ResponseStream(MaxResponseSize);
193+
await stream.ReportErrorAsync(new Exception("first"));
194+
195+
await Assert.ThrowsAsync<InvalidOperationException>(
196+
() => stream.ReportErrorAsync(new Exception("second")));
197+
}
198+
199+
[Fact]
200+
public void MarkCompleted_SetsCompletionState()
201+
{
202+
var stream = new ResponseStream(MaxResponseSize);
203+
204+
stream.MarkCompleted();
205+
206+
Assert.True(stream.IsCompleted);
207+
}
208+
}
209+
}

0 commit comments

Comments
 (0)