2323import org .apache .dubbo .rpc .protocol .tri .observer .CallStreamObserver ;
2424import org .apache .dubbo .rpc .stub .StubInvocationUtil ;
2525
26+ import java .time .Duration ;
2627import java .util .List ;
2728import java .util .concurrent .CountDownLatch ;
2829import java .util .concurrent .TimeUnit ;
3132
3233import io .smallrye .mutiny .Multi ;
3334import io .smallrye .mutiny .Uni ;
35+ import io .smallrye .mutiny .helpers .test .AssertSubscriber ;
3436import org .junit .jupiter .api .Assertions ;
3537import org .junit .jupiter .api .Test ;
3638import org .mockito .MockedStatic ;
@@ -91,6 +93,7 @@ void testOneToManyReturnsMultiAndEmitsItems() {
9193
9294 try (MockedStatic <StubInvocationUtil > mocked = Mockito .mockStatic (StubInvocationUtil .class )) {
9395 AtomicBoolean stubCalled = new AtomicBoolean (false );
96+ CountDownLatch subscribed = new CountDownLatch (1 );
9497
9598 mocked .when (() -> StubInvocationUtil .serverStreamCall (
9699 Mockito .eq (invoker ), Mockito .eq (method ), Mockito .eq ("testRequest" ), Mockito .any ()))
@@ -100,7 +103,9 @@ void testOneToManyReturnsMultiAndEmitsItems() {
100103
101104 CallStreamObserver <String > fakeSubscription = new CallStreamObserver <>() {
102105 @ Override
103- public void request (int n ) {}
106+ public void request (int n ) {
107+ /* no-op */
108+ }
104109
105110 @ Override
106111 public void setCompression (String compression ) {}
@@ -109,8 +114,8 @@ public void setCompression(String compression) {}
109114 public void disableAutoFlowControl () {}
110115
111116 @ Override
112- public void onNext (String value ) {
113- publisher .onNext (value );
117+ public void onNext (String v ) {
118+ publisher .onNext (v );
114119 }
115120
116121 @ Override
@@ -123,29 +128,47 @@ public void onCompleted() {
123128 publisher .onCompleted ();
124129 }
125130 };
126-
127131 publisher .onSubscribe (fakeSubscription );
128132
133+ // Wait for downstream subscription to complete before emitting data
129134 new Thread (() -> {
130- publisher .onNext ("item1" );
131- publisher .onNext ("item2" );
132- publisher .onCompleted ();
135+ try {
136+ if (subscribed .await (5 , TimeUnit .SECONDS )) {
137+ publisher .onNext ("item1" );
138+ publisher .onNext ("item2" );
139+ publisher .onCompleted ();
140+ } else {
141+ publisher .onError (
142+ new IllegalStateException ("Downstream subscription timeout" ));
143+ }
144+ } catch (InterruptedException e ) {
145+ Thread .currentThread ().interrupt ();
146+ publisher .onError (e );
147+ }
133148 })
134149 .start ();
135150
136151 return null ;
137152 });
138153
139154 Uni <String > uniRequest = Uni .createFrom ().item ("testRequest" );
140-
141155 Multi <String > multiResponse = MutinyClientCalls .oneToMany (invoker , uniRequest , method );
142156
143- List <String > collectedItems =
144- multiResponse .collect ().asList ().await ().indefinitely ();
157+ // Use AssertSubscriber to ensure proper subscription timing
158+ AssertSubscriber <String > subscriber = AssertSubscriber .create (Long .MAX_VALUE );
159+ multiResponse .subscribe ().withSubscriber (subscriber );
160+
161+ // Wait for subscription to be established
162+ subscriber .awaitSubscription ();
163+ subscribed .countDown (); // Signal that data emission can begin
164+
165+ // Wait for completion
166+ subscriber .awaitCompletion (Duration .ofSeconds (5 ));
145167
168+ // Verify results
146169 Assertions .assertTrue (stubCalled .get (), "StubInvocationUtil.serverStreamCall should be called" );
147- Assertions .assertEquals (2 , collectedItems . size ());
148- Assertions . assertEquals ( List . of ( "item1" , "item2" ), collectedItems );
170+ Assertions .assertEquals (List . of ( "item1" , "item2" ), subscriber . getItems ());
171+ subscriber . assertCompleted ( );
149172 }
150173 }
151174
0 commit comments