Skip to content

Commit d2c0478

Browse files
authored
Close channel handler context after the channel written operation is completed and Remove overflow check process(#15518)
* Close channel handler context when promise is done * Just log warning message when window size of connectionState is zero * Log debug message for Http2Headers and Data packets * Close channel handler context after pongHeader written operation is completed * Close channel handler context after goAwayFrame written operation is completed * Shutdown QUIC stream channel output after headers frame written operation is completed * Remove overflow check process to align with Netty http2 remote flow controller * Fix Http3ClientFrameCodec write method to set shutdownOutput promise for Http2HeadersFrame * Fix promise setting codes of Http3ClientFrameCodec and NettyHttp3FrameCodec
1 parent 4e14015 commit d2c0478

5 files changed

Lines changed: 79 additions & 43 deletions

File tree

dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030

3131
import io.netty.buffer.ByteBufInputStream;
3232
import io.netty.buffer.ByteBufOutputStream;
33-
import io.netty.buffer.Unpooled;
33+
import io.netty.channel.ChannelFuture;
34+
import io.netty.channel.ChannelFutureListener;
3435
import io.netty.channel.ChannelHandler.Sharable;
3536
import io.netty.channel.ChannelHandlerContext;
3637
import io.netty.channel.ChannelOutboundHandler;
@@ -67,8 +68,12 @@ private void pingReceived(ChannelHandlerContext ctx) {
6768
Http3Headers pongHeader = new DefaultHttp3Headers(false);
6869
pongHeader.set(TRI_PING, "0");
6970
pongHeader.set(PseudoHeaderName.STATUS.value(), HttpStatus.OK.getStatusString());
70-
ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
71-
ctx.close();
71+
ChannelFuture future = ctx.write(new DefaultHttp3HeadersFrame(pongHeader), ctx.newPromise());
72+
if (future.isDone()) {
73+
ctx.close();
74+
} else {
75+
future.addListener((ChannelFutureListener) f -> ctx.close());
76+
}
7277
}
7378

7479
@Override
@@ -91,32 +96,43 @@ protected void channelInputClosed(ChannelHandlerContext ctx) {
9196
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
9297
if (msg instanceof Http2Header) {
9398
Http2Header headers = (Http2Header) msg;
99+
if (headers.isEndStream()) {
100+
ChannelFuture future = ctx.write(
101+
new DefaultHttp3HeadersFrame(((NettyHttpHeaders<Http3Headers>) headers.headers()).getHeaders()),
102+
ctx.newPromise());
103+
if (future.isDone()) {
104+
ctx.close(promise);
105+
} else {
106+
future.addListener((ChannelFutureListener) f -> ctx.close(promise));
107+
}
108+
return;
109+
}
94110
ctx.write(
95111
new DefaultHttp3HeadersFrame(((NettyHttpHeaders<Http3Headers>) headers.headers()).getHeaders()),
96112
promise);
97-
if (headers.isEndStream()) {
98-
ctx.close();
99-
}
100113
} else if (msg instanceof Http2OutputMessage) {
101114
Http2OutputMessage message = (Http2OutputMessage) msg;
102-
try {
103-
OutputStream body = message.getBody();
115+
OutputStream body = message.getBody();
116+
assert body instanceof ByteBufOutputStream || body == null;
117+
if (message.isEndStream()) {
104118
if (body == null) {
105-
Http3DataFrame frame = new DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER);
106-
ctx.write(frame, promise);
119+
ctx.close(promise);
107120
return;
108121
}
109-
if (body instanceof ByteBufOutputStream) {
110-
Http3DataFrame frame = new DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer());
111-
ctx.write(frame, promise);
112-
return;
113-
}
114-
} finally {
115-
if (message.isEndStream()) {
116-
ctx.close();
122+
ChannelFuture future =
123+
ctx.write(new DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer()), ctx.newPromise());
124+
if (future.isDone()) {
125+
ctx.close(promise);
126+
} else {
127+
future.addListener((ChannelFutureListener) f -> ctx.close(promise));
117128
}
129+
return;
130+
}
131+
if (body == null) {
132+
promise.trySuccess();
133+
return;
118134
}
119-
throw new IllegalArgumentException("Http2OutputMessage body must be ByteBufOutputStream");
135+
ctx.write(new DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer()), promise);
120136
} else {
121137
ctx.write(msg, promise);
122138
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,8 @@ int writeAllocatedBytes(int allocated) {
407407
if (cancelled) {
408408
cancel(INTERNAL_ERROR, cause);
409409
}
410-
if (monitor.isOverFlowControl()) {
411-
cause = new Throwable();
412-
cancel(FLOW_CONTROL_ERROR, cause);
413-
}
410+
411+
// does not check overflow anymore: Let receiver continue receiving the pending bytes.
414412
}
415413
return writtenBytes;
416414
}
@@ -670,14 +668,6 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
670668
final boolean isWritableConnection() {
671669
return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
672670
}
673-
674-
final boolean isOverFlowControl() {
675-
if (connectionState.windowSize() == 0) {
676-
return true;
677-
} else {
678-
return false;
679-
}
680-
}
681671
}
682672

683673
/**
@@ -777,13 +767,8 @@ private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws
777767
checkAllWritabilityChanged();
778768
} else if (isWritable(state) != state.markedWritability()) {
779769
notifyWritabilityChanged(state);
780-
} else if (isOverFlowControl()) {
781-
throw streamError(
782-
state.stream().id(),
783-
FLOW_CONTROL_ERROR,
784-
"TotalPendingBytes size overflow for stream: %d",
785-
state.stream().id());
786770
}
771+
// does not check overflow anymore: Let receiver continue receiving the pending bytes.
787772
}
788773

789774
private void checkAllWritabilityChanged() throws Http2Exception {

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
import io.netty.buffer.Unpooled;
2828
import io.netty.channel.ChannelDuplexHandler;
29+
import io.netty.channel.ChannelFuture;
30+
import io.netty.channel.ChannelFutureListener;
2931
import io.netty.channel.ChannelHandler.Sharable;
3032
import io.netty.channel.ChannelHandlerContext;
3133
import io.netty.channel.ChannelPipeline;
@@ -99,14 +101,36 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
99101
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
100102
if (msg instanceof Http2HeadersFrame) {
101103
Http2HeadersFrame frame = (Http2HeadersFrame) msg;
102-
ctx.write(new DefaultHttp3HeadersFrame(new Http3HeadersAdapter(frame.headers())), promise);
103104
if (frame.isEndStream()) {
104-
((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
105+
ChannelFuture future = ctx.write(
106+
new DefaultHttp3HeadersFrame(new Http3HeadersAdapter(frame.headers())), ctx.newPromise());
107+
if (future.isDone()) {
108+
((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
109+
} else {
110+
future.addListener(
111+
(ChannelFutureListener) f -> ((QuicStreamChannel) ctx.channel()).shutdownOutput(promise));
112+
}
113+
return;
105114
}
115+
ctx.write(new DefaultHttp3HeadersFrame(new Http3HeadersAdapter(frame.headers())), promise);
106116
} else if (msg instanceof Http2DataFrame) {
107117
Http2DataFrame frame = (Http2DataFrame) msg;
108118
if (frame.isEndStream()) {
109-
((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
119+
if (Unpooled.EMPTY_BUFFER.equals(frame.content())) {
120+
((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
121+
return;
122+
}
123+
ChannelFuture future = ctx.write(new DefaultHttp3DataFrame(frame.content()), ctx.newPromise());
124+
if (future.isDone()) {
125+
((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
126+
} else {
127+
future.addListener(
128+
(ChannelFutureListener) f -> ((QuicStreamChannel) ctx.channel()).shutdownOutput(promise));
129+
}
130+
return;
131+
}
132+
if (Unpooled.EMPTY_BUFFER.equals(frame.content())) {
133+
promise.trySuccess();
110134
return;
111135
}
112136
ctx.write(new DefaultHttp3DataFrame(frame.content()), promise);

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,9 @@ private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
409409

410410
@Override
411411
public void onHeader(Http2Headers headers, boolean endStream) {
412+
if (LOGGER.isDebugEnabled()) {
413+
LOGGER.debug("endStream: {} HEADERS: {}", endStream, headers);
414+
}
412415
executor.execute(() -> {
413416
if (endStream) {
414417
if (!halfClosed) {
@@ -428,6 +431,9 @@ public void onHeader(Http2Headers headers, boolean endStream) {
428431

429432
@Override
430433
public void onData(ByteBuf data, boolean endStream) {
434+
if (LOGGER.isDebugEnabled()) {
435+
LOGGER.debug("endStream: {} DATA: {}", endStream, data.toString(StandardCharsets.UTF_8));
436+
}
431437
try {
432438
executor.execute(() -> doOnData(data, endStream));
433439
} catch (Throwable t) {

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.concurrent.TimeUnit;
2020

2121
import io.netty.buffer.ByteBufUtil;
22+
import io.netty.channel.ChannelFuture;
23+
import io.netty.channel.ChannelFutureListener;
2224
import io.netty.channel.ChannelHandlerContext;
2325
import io.netty.channel.ChannelPromise;
2426
import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
@@ -66,9 +68,12 @@ void secondGoAwayAndClose(ChannelHandlerContext ctx) {
6668
try {
6769
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(
6870
Http2Error.NO_ERROR, ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
69-
ctx.writeAndFlush(goAwayFrame);
70-
// TODO support customize graceful shutdown timeout mills
71-
ctx.close(originPromise);
71+
ChannelFuture future = ctx.writeAndFlush(goAwayFrame, ctx.newPromise());
72+
if (future.isDone()) {
73+
ctx.close(originPromise);
74+
} else {
75+
future.addListener((ChannelFutureListener) f -> ctx.close(originPromise));
76+
}
7277
} catch (Exception e) {
7378
ctx.fireExceptionCaught(e);
7479
}

0 commit comments

Comments
 (0)