Skip to content

Commit f23bf69

Browse files
committed
Refactored HandleOpenLog to use channels instead of run jobs and semaphore to throttle threads for when multi logs are opened
1 parent 192f471 commit f23bf69

File tree

1 file changed

+68
-19
lines changed

1 file changed

+68
-19
lines changed

src/EventLogExpert.UI/Store/EventLog/EventLogEffects.cs

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
using EventLogExpert.UI.Store.StatusBar;
1313
using Fluxor;
1414
using Microsoft.Extensions.DependencyInjection;
15-
using System.Collections.Concurrent;
1615
using System.Collections.Immutable;
16+
using System.Threading.Channels;
1717
using IDispatcher = Fluxor.IDispatcher;
1818

1919
namespace EventLogExpert.UI.Store.EventLog;
@@ -26,6 +26,9 @@ public sealed class EventLogEffects(
2626
IEventResolverCache resolverCache,
2727
IServiceScopeFactory serviceScopeFactory)
2828
{
29+
private static readonly int s_maxGlobalConcurrency = Math.Max(1, Environment.ProcessorCount - 1);
30+
private static readonly SemaphoreSlim s_resolutionThrottle = new(s_maxGlobalConcurrency, s_maxGlobalConcurrency);
31+
2932
private readonly IState<EventLogState> _eventLogState = eventLogState;
3033
private readonly IFilterService _filterService = filterService;
3134
private readonly ITraceLogger _logger = logger;
@@ -135,56 +138,103 @@ public async Task HandleOpenLog(EventLogAction.OpenLog action, IDispatcher dispa
135138
var activityId = Guid.NewGuid();
136139
string? lastEvent;
137140
int failed = 0;
141+
int resolved = 0;
138142

139143
dispatcher.Dispatch(new EventTableAction.AddTable(logData));
140144

141-
ConcurrentQueue<DisplayEventModel> events = new();
145+
var channel = Channel.CreateBounded<EventRecord[]>(new BoundedChannelOptions(s_maxGlobalConcurrency * 2)
146+
{
147+
SingleWriter = true,
148+
FullMode = BoundedChannelFullMode.Wait
149+
});
150+
151+
List<DisplayEventModel> events = [];
142152

143153
await using Timer timer = new(
144-
_ => { dispatcher.Dispatch(new StatusBarAction.SetEventsLoading(activityId, events.Count, failed)); },
154+
_ => { dispatcher.Dispatch(new StatusBarAction.SetEventsLoading(activityId, Volatile.Read(ref resolved), failed)); },
145155
null,
146156
TimeSpan.Zero,
147157
TimeSpan.FromSeconds(3));
148158

149159
using var reader = new EventLogReader(action.LogName, action.PathType, filterState?.Value.IsXmlEnabled ?? false);
150160

161+
// Producer: single thread reads batches from EventLogReader
162+
var producerTask = Task.Run(async () =>
163+
{
164+
try
165+
{
166+
while (reader.TryGetEvents(out EventRecord[] batch))
167+
{
168+
action.Token.ThrowIfCancellationRequested();
169+
170+
if (batch.Length == 0) { continue; }
171+
172+
await channel.Writer.WriteAsync(batch, action.Token);
173+
}
174+
}
175+
catch (Exception ex)
176+
{
177+
channel.Writer.Complete(ex);
178+
179+
throw;
180+
}
181+
182+
channel.Writer.Complete();
183+
}, action.Token);
184+
151185
try
152186
{
187+
// Consumers: parallel resolution of event batches from the channel.
188+
// The global semaphore limits total concurrent resolution threads across
189+
// all HandleOpenLog calls, preventing CPU saturation when loading multiple logs.
153190
await Parallel.ForEachAsync(
154-
Enumerable.Range(1, 8),
155-
action.Token,
156-
(_, token) =>
191+
channel.Reader.ReadAllAsync(action.Token),
192+
new ParallelOptions
157193
{
158-
while (reader.TryGetEvents(out EventRecord[]? eventRecords))
159-
{
160-
token.ThrowIfCancellationRequested();
161-
162-
if (eventRecords.Length == 0) { continue; }
194+
CancellationToken = action.Token,
195+
MaxDegreeOfParallelism = s_maxGlobalConcurrency
196+
},
197+
async (batch, token) =>
198+
{
199+
await s_resolutionThrottle.WaitAsync(token);
163200

164-
foreach (var @event in eventRecords)
201+
try
202+
{
203+
foreach (var @event in batch)
165204
{
205+
token.ThrowIfCancellationRequested();
206+
166207
try
167208
{
168-
if (!@event.IsSuccess) {
209+
if (!@event.IsSuccess)
210+
{
169211
Interlocked.Increment(ref failed);
170212

171213
_logger?.Warn($"{@event.PathName}: Bad Event: {@event.Error}");
172214

173215
continue;
174216
}
175217

176-
events.Enqueue(eventResolver.ResolveEvent(@event));
218+
var resolvedEvent = eventResolver.ResolveEvent(@event);
219+
220+
lock (events) { events.Add(resolvedEvent); }
221+
222+
Interlocked.Increment(ref resolved);
177223
}
178224
catch (Exception ex)
179225
{
180226
_logger?.Warn($"Failed to resolve RecordId: {@event.RecordId}, {ex.Message}");
181227
}
182228
}
183229
}
184-
185-
return ValueTask.CompletedTask;
230+
finally
231+
{
232+
s_resolutionThrottle.Release();
233+
}
186234
});
187235

236+
await producerTask;
237+
188238
lastEvent = reader.LastBookmark;
189239
}
190240
catch (TaskCanceledException)
@@ -195,10 +245,9 @@ await Parallel.ForEachAsync(
195245
return;
196246
}
197247

198-
var sortedEvents = events.ToList();
199-
sortedEvents.Sort((a, b) => Comparer<long?>.Default.Compare(b.RecordId, a.RecordId));
248+
events.Sort((a, b) => Comparer<long?>.Default.Compare(b.RecordId, a.RecordId));
200249

201-
dispatcher.Dispatch(new EventLogAction.LoadEvents(logData, sortedEvents));
250+
dispatcher.Dispatch(new EventLogAction.LoadEvents(logData, events));
202251

203252
dispatcher.Dispatch(new StatusBarAction.SetEventsLoading(activityId, 0, 0));
204253

0 commit comments

Comments
 (0)