Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions framework.tck/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.17.5</version>
<version>1.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -132,12 +132,6 @@
<version>1.12.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.12.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.test.cases.framework</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions framework.tck/tck.bndrun
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
junit-platform-engine;version='[1.12.1,1.12.2)',\
org.opentest4j;version='[1.3.0,1.3.1)',\
junit-platform-launcher;version='[1.12.1,1.12.2)',\
assertj-core;version='[3.27.3,3.27.4)',\
biz.aQute.junit;version='[6.4.1,6.4.2)',\
junit-vintage-engine;version='[5.7.1,5.7.2)',\
net.bytebuddy.byte-buddy;version='[1.17.5,1.17.6)'
assertj-core;version='[3.27.7,3.27.8)',\
net.bytebuddy.byte-buddy;version='[1.18.0,1.18.1)'
147 changes: 137 additions & 10 deletions framework/src/main/java/org/apache/felix/framework/EventDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,22 @@ public class EventDispatcher
private final static String m_threadLock = "thread lock";
private static int m_references = 0;
private static volatile boolean m_stopping = false;
private static int m_totalRestarts = 0;
private static final int MAX_RESTARTS = 3;
private static long m_lastRestartTime = 0;
private static final long RESTART_COOLDOWN_MS = 10000; // 10 seconds
private static final long RESTART_BACKOFF_MS = 1000; // 1 second backoff
private static boolean m_limitLogged = false;

// List of requests.
private static final List<Request> m_requestList = new ArrayList<>();
// Pooled requests to avoid memory allocation.
private static final List<Request> m_requestPool = new ArrayList<>();

// Track listener failures and blocklist after threshold is reached.
private static final Map<EventListener, Integer> m_badListeners = new java.util.concurrent.ConcurrentHashMap<>();
private static final int MAX_LISTENER_FAILURES = 3;

private static final SecureAction m_secureAction = new SecureAction();

public EventDispatcher(Logger logger, ServiceRegistry registry)
Expand Down Expand Up @@ -763,16 +773,107 @@ else if (eh instanceof org.osgi.framework.hooks.bundle.EventHook)
return whitelist;
}

private static void triggerRecovery(EventDispatcher dispatcher)
{
synchronized (m_threadLock)
{
// Recheck state inside lock
if (m_stopping || (m_thread != null && m_thread.isAlive()))
{
return;
}

long now = System.currentTimeMillis();
if (m_totalRestarts >= MAX_RESTARTS)
{
if (!m_limitLogged)
{
dispatcher.m_logger.log((org.osgi.framework.Bundle) null, Logger.LOG_ERROR,
"EventDispatcher: FelixDispatchQueue thread has crashed repeatedly. Maximum restart limit (" + MAX_RESTARTS + ") reached. Auto-recovery disabled.");
m_limitLogged = true;
}
return;
}

if (now - m_lastRestartTime < RESTART_COOLDOWN_MS)
{
// Within cooldown period, do not trigger restart to prevent loops
return;
}

// Update state immediately to prevent other threads from triggering a restart
m_totalRestarts++;
m_lastRestartTime = now;
}

// Perform backoff sleep OUTSIDE the lock to avoid holding m_threadLock
try
{
Thread.sleep(RESTART_BACKOFF_MS);
}
catch (InterruptedException ex)
{
Thread.currentThread().interrupt();
}

synchronized (m_threadLock)
{
// Re-verify after sleep
if (!m_stopping && (m_thread == null || !m_thread.isAlive()))
{
dispatcher.m_logger.log((org.osgi.framework.Bundle) null, Logger.LOG_WARNING,
"EventDispatcher: FelixDispatchQueue thread died unexpectedly. Restarting (Attempt " + m_totalRestarts + "/" + MAX_RESTARTS + ")...");

m_stopping = false;
m_thread = new Thread(new Runnable() {
@Override
public void run()
{
try
{
EventDispatcher.run();
}
finally
{
synchronized (m_threadLock)
{
m_thread = null;
m_stopping = false;
m_references = 0;
m_threadLock.notifyAll();
}
}
}
}, "FelixDispatchQueue");
m_thread.start();

// Ensure reference count is at least 1 since we are actively dispatching events
if (m_references <= 0)
{
m_references = 1;
}
}
}
}

private static void fireEventAsynchronously(
EventDispatcher dispatcher, int type,
Map<BundleContext, List<ListenerInfo>> listeners,
EventObject event)
{
//TODO: should possibly check this within thread lock, seems to be ok though without
// If dispatch thread is stopped, then ignore dispatch request.
if (m_stopping || m_thread == null)
// Check if the thread is dead unexpectedly
if (!m_stopping && (m_thread == null || !m_thread.isAlive()))
{
return;
triggerRecovery(dispatcher);
}

synchronized (m_threadLock)
{
// If the dispatch thread is legitimately stopped/stopping, ignore the request.
if (m_stopping || m_thread == null)
{
return;
}
}

// First get a request from the pool or create one if necessary.
Expand Down Expand Up @@ -822,6 +923,12 @@ private static void fireEventImmediately(
Filter filter = info.getParsedFilter();
Object acc = info.getSecurityContext();

// Skip if the listener has been blocklisted
if (m_badListeners.getOrDefault(l, 0) >= MAX_LISTENER_FAILURES)
{
continue;
}

try
{
if (type == Request.FRAMEWORK_EVENT)
Expand All @@ -840,6 +947,17 @@ else if (type == Request.SERVICE_EVENT)
}
catch (Throwable th)
{
// Track failure and blocklist if needed
int failures = m_badListeners.getOrDefault(l, 0) + 1;
m_badListeners.put(l, failures);
if (failures >= MAX_LISTENER_FAILURES)
{
dispatcher.m_logger.log(bundle,
Logger.LOG_ERROR,
"EventDispatcher: Listener " + l.getClass().getName() +
" has failed " + failures + " times. Blacklisting it from further events.", th);
}

if ((type != Request.FRAMEWORK_EVENT)
|| (((FrameworkEvent) event).getType() != FrameworkEvent.ERROR))
{
Expand Down Expand Up @@ -1140,12 +1258,21 @@ private static void run()
// Deliver event outside of synchronized block
// so that we don't block other requests from being
// queued during event processing.
// NOTE: We don't catch any exceptions here, because
// the invoked method shields us from exceptions by
// catching Throwables when it invokes callbacks.
fireEventImmediately(
req.m_dispatcher, req.m_type, req.m_listeners,
req.m_event, null);
try
{
fireEventImmediately(
req.m_dispatcher, req.m_type, req.m_listeners,
req.m_event, null);
}
catch (Throwable th)
{
if (req.m_dispatcher != null)
{
req.m_dispatcher.m_logger.log((org.osgi.framework.Bundle) null,
Logger.LOG_ERROR,
"EventDispatcher: Unexpected error in dispatch loop.", th);
}
}

// Put dispatch request in cache.
synchronized (m_requestPool)
Expand Down