@@ -54,6 +54,8 @@ public final class NettyConnectionClient extends AbstractNettyConnectionClient {
5454
5555 private Bootstrap bootstrap ;
5656
57+ private AtomicReference <Promise <Void >> channelInitializedPromiseRef ;
58+
5759 private AtomicReference <Promise <Void >> connectionPrefaceReceivedPromiseRef ;
5860
5961 public NettyConnectionClient (URL url , ChannelHandler handler ) throws RemotingException {
@@ -69,6 +71,7 @@ protected void initConnectionClient() {
6971 }
7072
7173 protected void initBootstrap () {
74+ channelInitializedPromiseRef = new AtomicReference <>();
7275 Bootstrap bootstrap = new Bootstrap ();
7376 bootstrap
7477 .group (NettyEventLoopFactory .NIO_EVENT_LOOP_GROUP .get ())
@@ -103,8 +106,16 @@ protected void initChannel(SocketChannel ch) {
103106 protocol .configClientPipeline (getUrl (), operator , nettySslContextOperator );
104107
105108 ChannelHandlerContext http2FrameCodecHandlerCtx = pipeline .context (Http2FrameCodec .class );
106- if (http2FrameCodecHandlerCtx != null ) {
107- connectionPrefaceReceivedPromiseRef = new AtomicReference <>();
109+ if (http2FrameCodecHandlerCtx == null ) {
110+ // set connection preface received promise to null.
111+ connectionPrefaceReceivedPromiseRef = null ;
112+ } else {
113+ // create connection preface received promise if necessary.
114+ if (connectionPrefaceReceivedPromiseRef == null ) {
115+ connectionPrefaceReceivedPromiseRef = new AtomicReference <>();
116+ }
117+ connectionPrefaceReceivedPromiseRef .compareAndSet (
118+ null , new DefaultPromise <>(GlobalEventExecutor .INSTANCE ));
108119 pipeline .addAfter (
109120 http2FrameCodecHandlerCtx .name (),
110121 "client-connection-preface-handler" ,
@@ -114,22 +125,28 @@ protected void initChannel(SocketChannel ch) {
114125 // set null but do not close this client, it will be reconnecting in the future
115126 ch .closeFuture ().addListener (channelFuture -> clearNettyChannel ());
116127 // TODO support Socks5
128+
129+ // set channel initialized promise to success if necessary.
130+ Promise <Void > channelInitializedPromise = channelInitializedPromiseRef .get ();
131+ if (channelInitializedPromise != null ) {
132+ channelInitializedPromise .trySuccess (null );
133+ }
117134 }
118135 });
119136 this .bootstrap = bootstrap ;
120137 }
121138
122139 @ Override
123140 protected ChannelFuture performConnect () {
124- if (connectionPrefaceReceivedPromiseRef != null ) {
125- connectionPrefaceReceivedPromiseRef .compareAndSet (null , new DefaultPromise <>(GlobalEventExecutor .INSTANCE ));
126- }
141+ // ChannelInitializer#initChannel will be invoked by Netty client work thread.
127142 return bootstrap .connect ();
128143 }
129144
130145 @ Override
131146 protected void doConnect () throws RemotingException {
132147 long start = System .currentTimeMillis ();
148+ // re-create channel initialized promise if necessary.
149+ channelInitializedPromiseRef .compareAndSet (null , new DefaultPromise <>(GlobalEventExecutor .INSTANCE ));
133150 super .doConnect ();
134151 waitConnectionPreface (start );
135152 }
@@ -154,38 +171,60 @@ protected void doConnect() throws RemotingException {
154171 * @param start start time of doConnect in milliseconds.
155172 */
156173 private void waitConnectionPreface (long start ) throws RemotingException {
174+ // await channel initialization to ensure connection preface received promise had been created when necessary.
175+ Promise <Void > channelInitializedPromise = channelInitializedPromiseRef .get ();
176+ long retainedTimeout = getConnectTimeout () - System .currentTimeMillis () + start ;
177+ boolean ret = channelInitializedPromise .awaitUninterruptibly (retainedTimeout , TimeUnit .MILLISECONDS );
178+ // destroy channel initialized promise after used.
179+ channelInitializedPromiseRef .set (null );
180+ if (!ret || !channelInitializedPromise .isSuccess ()) {
181+ // 6-2 Client-side channel initialization timeout
182+ RemotingException remotingException = new RemotingException (
183+ this ,
184+ "client(url: " + getUrl () + ") failed to connect to server " + getConnectAddress ()
185+ + " client-side channel initialization timeout " + getConnectTimeout () + "ms (elapsed: "
186+ + (System .currentTimeMillis () - start ) + "ms) from netty client "
187+ + NetUtils .getLocalHost () + " using dubbo version " + Version .getVersion ());
188+
189+ logger .error (
190+ TRANSPORT_CLIENT_CONNECT_TIMEOUT ,
191+ "provider crash" ,
192+ "" ,
193+ "Client-side channel initialization timeout" ,
194+ remotingException );
195+
196+ throw remotingException ;
197+ }
198+
199+ // await if connection preface received promise is not null.
157200 if (connectionPrefaceReceivedPromiseRef == null ) {
158201 return ;
159202 }
160203 Promise <Void > connectionPrefaceReceivedPromise = connectionPrefaceReceivedPromiseRef .get ();
161- if (connectionPrefaceReceivedPromise != null ) {
162- long retainedTimeout = getConnectTimeout () - System .currentTimeMillis () + start ;
163- boolean ret = connectionPrefaceReceivedPromise .awaitUninterruptibly (retainedTimeout , TimeUnit .MILLISECONDS );
164- // Only process once: destroy connectionPrefaceReceivedPromise after used
165- synchronized (this ) {
166- connectionPrefaceReceivedPromiseRef .set (null );
167- }
168- if (!ret || !connectionPrefaceReceivedPromise .isSuccess ()) {
169- // 6-2 Client-side connection preface timeout
170- RemotingException remotingException = new RemotingException (
171- this ,
172- "client(url: " + getUrl () + ") failed to connect to server " + getConnectAddress ()
173- + " client-side connection preface timeout " + getConnectTimeout ()
174- + "ms (elapsed: "
175- + (System .currentTimeMillis () - start ) + "ms) from netty client "
176- + NetUtils .getLocalHost ()
177- + " using dubbo version "
178- + Version .getVersion ());
179-
180- logger .error (
181- TRANSPORT_CLIENT_CONNECT_TIMEOUT ,
182- "provider crash" ,
183- "" ,
184- "Client-side connection preface timeout" ,
185- remotingException );
186-
187- throw remotingException ;
188- }
204+ retainedTimeout = getConnectTimeout () - System .currentTimeMillis () + start ;
205+ ret = connectionPrefaceReceivedPromise .awaitUninterruptibly (retainedTimeout , TimeUnit .MILLISECONDS );
206+ // destroy connection preface received promise after used.
207+ connectionPrefaceReceivedPromiseRef .set (null );
208+ if (!ret || !connectionPrefaceReceivedPromise .isSuccess ()) {
209+ // 6-2 Client-side connection preface timeout
210+ RemotingException remotingException = new RemotingException (
211+ this ,
212+ "client(url: " + getUrl () + ") failed to connect to server " + getConnectAddress ()
213+ + " client-side connection preface timeout " + getConnectTimeout ()
214+ + "ms (elapsed: "
215+ + (System .currentTimeMillis () - start ) + "ms) from netty client "
216+ + NetUtils .getLocalHost ()
217+ + " using dubbo version "
218+ + Version .getVersion ());
219+
220+ logger .error (
221+ TRANSPORT_CLIENT_CONNECT_TIMEOUT ,
222+ "provider crash" ,
223+ "" ,
224+ "Client-side connection preface timeout" ,
225+ remotingException );
226+
227+ throw remotingException ;
189228 }
190229 }
191230}
0 commit comments