1616 */
1717package org .apache .dubbo .remoting .http12 .netty4 .h2 ;
1818
19+ import org .apache .dubbo .common .logger .ErrorTypeAwareLogger ;
20+ import org .apache .dubbo .common .logger .LoggerFactory ;
1921import org .apache .dubbo .remoting .http12 .h2 .Http2Header ;
2022import org .apache .dubbo .remoting .http12 .h2 .Http2InputMessage ;
2123import org .apache .dubbo .remoting .http12 .h2 .Http2InputMessageFrame ;
2527import org .apache .dubbo .remoting .http12 .netty4 .NettyHttpHeaders ;
2628
2729import java .io .OutputStream ;
30+ import java .util .LinkedList ;
31+ import java .util .List ;
32+ import java .util .concurrent .TimeUnit ;
2833
2934import io .netty .buffer .ByteBuf ;
3035import io .netty .buffer .ByteBufInputStream ;
3742import io .netty .handler .codec .http2 .Http2DataFrame ;
3843import io .netty .handler .codec .http2 .Http2Headers ;
3944import io .netty .handler .codec .http2 .Http2HeadersFrame ;
45+ import io .netty .util .concurrent .ScheduledFuture ;
46+
47+ import static org .apache .dubbo .common .constants .LoggerCodeConstants .PROTOCOL_TIMEOUT_SERVER ;
4048
4149public class NettyHttp2FrameCodec extends ChannelDuplexHandler {
4250
51+ private static final ErrorTypeAwareLogger LOGGER =
52+ LoggerFactory .getErrorTypeAwareLogger (NettyHttp2FrameCodec .class );
53+
54+ private static final long SETTINGS_FRAME_ARRIVAL_TIMEOUT = 3 ;
55+
56+ private final NettyHttp2SettingsHandler nettyHttp2SettingsHandler ;
57+
58+ private final List <CachedMsg > cachedMsgList = new LinkedList <>();
59+
60+ private boolean settingsFrameArrived ;
61+
62+ private ScheduledFuture <?> settingsFrameArrivalTimeoutFuture ;
63+
64+ public NettyHttp2FrameCodec (NettyHttp2SettingsHandler nettyHttp2SettingsHandler ) {
65+ this .nettyHttp2SettingsHandler = nettyHttp2SettingsHandler ;
66+ if (!nettyHttp2SettingsHandler .subscribeSettingsFrameArrival (this )) {
67+ settingsFrameArrived = true ;
68+ }
69+ }
70+
4371 @ Override
4472 public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception {
4573 if (msg instanceof Http2HeadersFrame ) {
@@ -53,15 +81,76 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5381
5482 @ Override
5583 public void write (ChannelHandlerContext ctx , Object msg , ChannelPromise promise ) throws Exception {
56- if (msg instanceof Http2Header ) {
57- super .write (ctx , encodeHttp2HeadersFrame ((Http2Header ) msg ), promise );
58- } else if (msg instanceof Http2OutputMessage ) {
59- super .write (ctx , encodeHttp2DataFrame ((Http2OutputMessage ) msg ), promise );
60- } else {
61- super .write (ctx , msg , promise );
84+ if (settingsFrameArrived ) {
85+ if (msg instanceof Http2Header ) {
86+ super .write (ctx , encodeHttp2HeadersFrame ((Http2Header ) msg ), promise );
87+ } else if (msg instanceof Http2OutputMessage ) {
88+ super .write (ctx , encodeHttp2DataFrame ((Http2OutputMessage ) msg ), promise );
89+ } else {
90+ super .write (ctx , msg , promise );
91+ }
92+ return ;
93+ }
94+
95+ if (LOGGER .isDebugEnabled ()) {
96+ LOGGER .debug ("Cache writing msg before client connection preface arrival: {}" , msg );
97+ }
98+ cachedMsgList .add (new CachedMsg (ctx , msg , promise ));
99+
100+ if (settingsFrameArrivalTimeoutFuture == null ) {
101+ // close ctx and release resources if client connection preface does not arrive in time.
102+ settingsFrameArrivalTimeoutFuture = ctx .executor ()
103+ .schedule (
104+ () -> {
105+ LOGGER .error (
106+ PROTOCOL_TIMEOUT_SERVER ,
107+ "" ,
108+ "" ,
109+ "client connection preface does not arrive in time." );
110+ // send RST_STREAM instead of GO_AWAY by calling close method to avoid client hanging.
111+ ctx .close ();
112+ nettyHttp2SettingsHandler .unsubscribeSettingsFrameArrival (this );
113+ cachedMsgList .clear ();
114+ },
115+ SETTINGS_FRAME_ARRIVAL_TIMEOUT ,
116+ TimeUnit .SECONDS );
62117 }
63118 }
64119
120+ public void notifySettingsFrameArrival () throws Exception {
121+ if (settingsFrameArrived ) {
122+ return ;
123+ }
124+ settingsFrameArrived = true ;
125+
126+ if (settingsFrameArrivalTimeoutFuture != null ) {
127+ settingsFrameArrivalTimeoutFuture .cancel (false );
128+ }
129+
130+ if (LOGGER .isDebugEnabled ()) {
131+ LOGGER .debug ("Begin cached channel msg writing when client connection preface arrived." );
132+ }
133+
134+ for (CachedMsg cached : cachedMsgList ) {
135+ if (LOGGER .isDebugEnabled ()) {
136+ LOGGER .debug ("Cached channel msg writing, ctx: {} msg: {}" , cached .ctx , cached .msg );
137+ }
138+ if (cached .msg instanceof Http2Header ) {
139+ super .write (cached .ctx , encodeHttp2HeadersFrame (((Http2Header ) cached .msg )), cached .promise );
140+ } else if (cached .msg instanceof Http2OutputMessage ) {
141+ super .write (cached .ctx , encodeHttp2DataFrame (((Http2OutputMessage ) cached .msg )), cached .promise );
142+ } else {
143+ super .write (cached .ctx , cached .msg , cached .promise );
144+ }
145+ }
146+
147+ if (LOGGER .isDebugEnabled ()) {
148+ LOGGER .debug ("End cached channel msg writing." );
149+ }
150+
151+ cachedMsgList .clear ();
152+ }
153+
65154 private Http2Header onHttp2HeadersFrame (Http2HeadersFrame headersFrame ) {
66155 return new Http2MetadataFrame (
67156 headersFrame .stream ().id (), new DefaultHttpHeaders (headersFrame .headers ()), headersFrame .isEndStream ());
@@ -89,4 +178,16 @@ private Http2DataFrame encodeHttp2DataFrame(Http2OutputMessage outputMessage) {
89178 }
90179 throw new IllegalArgumentException ("Http2OutputMessage body must be ByteBufOutputStream" );
91180 }
181+
182+ private static class CachedMsg {
183+ private final ChannelHandlerContext ctx ;
184+ private final Object msg ;
185+ private final ChannelPromise promise ;
186+
187+ public CachedMsg (ChannelHandlerContext ctx , Object msg , ChannelPromise promise ) {
188+ this .ctx = ctx ;
189+ this .msg = msg ;
190+ this .promise = promise ;
191+ }
192+ }
92193}
0 commit comments