Skip to content

Commit 872c8cb

Browse files
authored
remove Future from FUTURES after timeout and add RejectedExecution test fix (#15695)
1 parent 50099c1 commit 872c8cb

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.CompletableFuture;
3939
import java.util.concurrent.ConcurrentHashMap;
4040
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.RejectedExecutionException;
4142
import java.util.concurrent.TimeUnit;
4243

4344
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -324,7 +325,12 @@ public void run(Timeout timeout) {
324325

325326
ExecutorService executor = future.getExecutor();
326327
if (executor != null && !executor.isShutdown()) {
327-
executor.execute(() -> notifyTimeout(future));
328+
try {
329+
executor.execute(() -> notifyTimeout(future));
330+
} catch (RejectedExecutionException e) {
331+
notifyTimeout(future);
332+
throw e;
333+
}
328334
} else {
329335
notifyTimeout(future);
330336
}

dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727

2828
import java.time.LocalDateTime;
2929
import java.time.format.DateTimeFormatter;
30+
import java.util.concurrent.ArrayBlockingQueue;
3031
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
3134
import java.util.concurrent.atomic.AtomicInteger;
3235

3336
import org.junit.jupiter.api.Assertions;
@@ -224,6 +227,43 @@ void testClose1() {
224227
Assertions.assertFalse(executor.isTerminated());
225228
}
226229

230+
@Test
231+
void testTimeoutWithRejectedExecution() throws Exception {
232+
// Create a ThreadPoolExecutor with a queue capacity of 1
233+
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
234+
1, // corePoolSize
235+
1, // maxPoolSize
236+
60L,
237+
TimeUnit.SECONDS,
238+
new ArrayBlockingQueue<>(1), // queue capacity is 1
239+
new ThreadPoolExecutor.AbortPolicy() // default rejection policy: throws exception
240+
);
241+
// Submit two tasks to occupy the thread and the queue
242+
customExecutor.submit(() -> {
243+
try {
244+
Thread.sleep(500); // occupy the thread for a while
245+
} catch (InterruptedException ignored) {
246+
}
247+
});
248+
customExecutor.submit(() -> {
249+
try {
250+
Thread.sleep(500); // occupy the queue
251+
} catch (InterruptedException ignored) {
252+
}
253+
});
254+
// Create a Dubbo Mock Channel and a request
255+
Channel channel = new MockedChannel();
256+
Request request = new Request(999);
257+
// Use Dubbo's newFuture and pass in the custom thread pool
258+
DefaultFuture future = DefaultFuture.newFuture(channel, request, 100, customExecutor);
259+
// Mark the request as sent
260+
DefaultFuture.sent(channel, request);
261+
// Wait for the timeout task to trigger
262+
Thread.sleep(300);
263+
Assertions.assertNull(DefaultFuture.getFuture(999), "Future should be removed from FUTURES after timeout");
264+
customExecutor.shutdown();
265+
}
266+
227267
@Test
228268
void testClose2() {
229269
Channel channel = new MockedChannel();

0 commit comments

Comments
 (0)