diff --git a/src/SIL.Harmony.Core/HarmonyProgress.cs b/src/SIL.Harmony.Core/HarmonyProgress.cs new file mode 100644 index 0000000..63c88d2 --- /dev/null +++ b/src/SIL.Harmony.Core/HarmonyProgress.cs @@ -0,0 +1,3 @@ +namespace SIL.Harmony.Core; + +public record struct HarmonyProgress(SyncStage Stage, int? Current, int? Total); diff --git a/src/SIL.Harmony.Core/SyncStage.cs b/src/SIL.Harmony.Core/SyncStage.cs new file mode 100644 index 0000000..7d55c03 --- /dev/null +++ b/src/SIL.Harmony.Core/SyncStage.cs @@ -0,0 +1,13 @@ +namespace SIL.Harmony.Core; + +public enum SyncStage +{ + FetchingChanges, + FetchingChangesFinished, + ApplyingChanges, + ApplyingChangesFinished, + UploadingResources, + UploadingResourcesFinished, + UploadingChanges, + UploadingChangesFinished +} diff --git a/src/SIL.Harmony.Tests/ProgressTests.cs b/src/SIL.Harmony.Tests/ProgressTests.cs new file mode 100644 index 0000000..968a77d --- /dev/null +++ b/src/SIL.Harmony.Tests/ProgressTests.cs @@ -0,0 +1,103 @@ +using System.Collections.Concurrent; +using SIL.Harmony.Changes; +using SIL.Harmony.Sample; +using SIL.Harmony.Sample.Changes; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace SIL.Harmony.Tests; + +public class ProgressTests : DataModelTestBase +{ + private readonly ITestOutputHelper _output; + + public ProgressTests(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task AddManyChanges_ReportsProgress() + { + var clientId = Guid.NewGuid(); + var changes = Enumerable.Range(0, 10).Select(i => new SetTagChange(Guid.NewGuid(), $"Tag {i}")).ToArray(); + var progressReports = new ConcurrentQueue(); + var progress = new Progress(p => + { + _output.WriteLine($"Progress: {p.Current}/{p.Total} - {p.Stage}"); + progressReports.Enqueue(p); + }); + var reporter = new HarmonyProgressReporter(progress); + + await DataModel.AddManyChanges(clientId, changes, () => new CommitMetadata(), 2, reporter); + + // Wait a bit for Progress to report (it's often asynchronous) + await Task.Delay(100); + + Assert.NotEmpty(progressReports); + Assert.Contains(progressReports, p => p.Current == 10 && p.Total == 10 && p.Stage == SyncStage.ApplyingChanges); + Assert.Contains(progressReports, p => p.Stage == SyncStage.ApplyingChangesFinished); + } + + [Fact] + public async Task AddManyChanges_ReportsDetailedProgress() + { + var clientId = Guid.NewGuid(); + var changes = Enumerable.Range(0, 10).Select(i => new SetTagChange(Guid.NewGuid(), $"Tag {i}")).ToArray(); + var progressReports = new ConcurrentQueue(); + var progress = new Progress(p => + { + _output.WriteLine($"Detailed Progress: {p.Current}/{p.Total} - {p.Status} @ {p.DateTime}"); + progressReports.Enqueue(p); + }); + var reporter = new HarmonyProgressReporter(progress); + + await DataModel.AddManyChanges(clientId, changes, () => new CommitMetadata(), 2, reporter); + + await Task.Delay(100); + + Assert.NotEmpty(progressReports); + Assert.Contains(progressReports, p => p.Current == 10 && p.Total == 10 && p.Change is SetTagChange); + Assert.Contains(progressReports, p => p.Stage == SyncStage.ApplyingChangesFinished && p.Status == "Finished applying changes."); + Assert.All(progressReports, p => Assert.NotEmpty(p.Status)); + } + + [Fact] + public async Task SyncWith_ReportsProgress() + { + var model1 = DataModel; + var testBase2 = new DataModelTestBase(); + var model2 = testBase2.DataModel; + + var clientId = Guid.NewGuid(); + // remote changes to be downloaded by model1 + await model2.AddChanges(clientId, [new SetTagChange(Guid.NewGuid(), "Tag 1"), new SetTagChange(Guid.NewGuid(), "Tag 2")]); + // local changes to be uploaded to model2 + await model1.AddChanges(clientId, [new SetTagChange(Guid.NewGuid(), "Tag 3")]); + + var progressReports = new ConcurrentQueue(); + var progress = new Progress(p => + { + _output.WriteLine($"Sync Progress: {p.Current}/{p.Total} - {p.Status} ({p.Stage})"); + progressReports.Enqueue(p); + }); + var reporter = new HarmonyProgressReporter(progress); + + await model1.SyncWith(model2, reporter); + + await Task.Delay(500); + + _output.WriteLine($"Reports count: {progressReports.Count}"); + foreach(var report in progressReports) + { + _output.WriteLine($"- {report.Stage}: {report.Current}/{report.Total} {report.Status}"); + } + + Assert.NotEmpty(progressReports); + Assert.Contains(progressReports, p => p.Stage == SyncStage.FetchingChanges); + Assert.Contains(progressReports, p => p.Stage == SyncStage.FetchingChangesFinished); + // We expect UploadingChanges happens when sending local changes to remote + Assert.Contains(progressReports, p => p.Stage == SyncStage.UploadingChanges && p.Total == 1 && p.Status.Contains("Uploading 1 changes")); + Assert.Contains(progressReports, p => p.Stage == SyncStage.UploadingChangesFinished); + } +} diff --git a/src/SIL.Harmony/DataModel.cs b/src/SIL.Harmony/DataModel.cs index e120e1c..e1fe2df 100644 --- a/src/SIL.Harmony/DataModel.cs +++ b/src/SIL.Harmony/DataModel.cs @@ -69,7 +69,8 @@ public async Task AddChange( public async Task AddManyChanges(Guid clientId, IEnumerable changes, Func commitMetadata, - int changesPerCommitMax = 100) + int changesPerCommitMax = 100, + HarmonyProgressReporter? progress = null) { await using var repo = await _crdtRepositoryFactory.CreateRepository(); var commits = changes @@ -82,7 +83,7 @@ public async Task AddManyChanges(Guid clientId, await using var transaction = await repo.BeginTransactionAsync(); var updatedCommits = await repo.AddCommits(commits); - await UpdateSnapshots(repo, updatedCommits); + await UpdateSnapshots(repo, updatedCommits, progress); await ValidateCommits(repo); await transaction.CommitAsync(); } @@ -141,7 +142,7 @@ private static ChangeEntity ToChangeEntity(IChange change, int index) }; } - async Task ISyncable.AddRangeFromSync(IEnumerable commits) + async Task ISyncable.AddRangeFromSync(IEnumerable commits, HarmonyProgressReporter? progress) { commits = commits.ToArray(); try @@ -156,7 +157,7 @@ async Task ISyncable.AddRangeFromSync(IEnumerable commits) await using var transaction = await repo.BeginTransactionAsync(); var updatedCommits = await repo.AddCommits(newCommits); - await UpdateSnapshots(repo, updatedCommits); + await UpdateSnapshots(repo, updatedCommits, progress); await ValidateCommits(repo); await transaction.CommitAsync(); } @@ -193,7 +194,7 @@ ValueTask ISyncable.ShouldSync() return ValueTask.FromResult(true); } - private async Task UpdateSnapshots(CrdtRepository repo, SortedSet commitsToApply) + private async Task UpdateSnapshots(CrdtRepository repo, SortedSet commitsToApply, HarmonyProgressReporter? progress = null) { if (commitsToApply.Count == 0) return; var oldestAddedCommit = commitsToApply.First(); @@ -213,7 +214,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, SortedSet commit snapshotLookup = []; } - var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value); + var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value, progress); await snapshotWorker.UpdateSnapshots(commitsToApply); } @@ -372,13 +373,13 @@ public async Task> GetChanges(SyncState remoteState) return await repo.GetChanges(remoteState); } - public async Task SyncWith(ISyncable remoteModel) + public async Task SyncWith(ISyncable remoteModel, HarmonyProgressReporter? progress = null) { - return await SyncHelper.SyncWith(this, remoteModel, _serializerOptions); + return await SyncHelper.SyncWith(this, remoteModel, _serializerOptions, progress); } - public async Task SyncMany(ISyncable[] remotes) + public async Task SyncMany(ISyncable[] remotes, HarmonyProgressReporter? progress = null) { - await SyncHelper.SyncMany(this, remotes, _serializerOptions); + await SyncHelper.SyncMany(this, remotes, _serializerOptions, progress); } } diff --git a/src/SIL.Harmony/HarmonyDetailedProgress.cs b/src/SIL.Harmony/HarmonyDetailedProgress.cs new file mode 100644 index 0000000..cd9a72b --- /dev/null +++ b/src/SIL.Harmony/HarmonyDetailedProgress.cs @@ -0,0 +1,5 @@ +using SIL.Harmony.Changes; + +namespace SIL.Harmony; + +public record struct HarmonyDetailedProgress(SyncStage Stage, int? Current, int? Total, IChange? Change, string Status, DateTimeOffset DateTime); diff --git a/src/SIL.Harmony/HarmonyProgressReporter.cs b/src/SIL.Harmony/HarmonyProgressReporter.cs new file mode 100644 index 0000000..c18d10d --- /dev/null +++ b/src/SIL.Harmony/HarmonyProgressReporter.cs @@ -0,0 +1,71 @@ +using SIL.Harmony.Changes; + +namespace SIL.Harmony; + +public class HarmonyProgressReporter +{ + private readonly IProgress? _progress; + private readonly IProgress? _detailedProgress; + private int? _totalChanges; + + public HarmonyProgressReporter(IProgress progress) + { + _progress = progress; + } + + public HarmonyProgressReporter(IProgress detailedProgress) + { + _detailedProgress = detailedProgress; + } + + public void ReportFetchingChanges() => Report(SyncStage.FetchingChanges); + public void ReportFetchingChangesFinished() => Report(SyncStage.FetchingChangesFinished); + public void ReportUploadingResources() => Report(SyncStage.UploadingResources); + public void ReportUploadingResourcesFinished() => Report(SyncStage.UploadingResourcesFinished); + public void ReportUploadingChanges(int? count = null) => Report(SyncStage.UploadingChanges, total: count); + public void ReportUploadingChangesFinished() => Report(SyncStage.UploadingChangesFinished); + + public void ReportStartApplyingChanges(IEnumerable commits) + { + if (_progress is null && _detailedProgress is null) return; + _totalChanges = commits.Sum(c => c.ChangeEntities.Count); + Report(SyncStage.ApplyingChanges, 0, _totalChanges); + } + + public void ReportApplyingChange(int current, IChange change) + { + Report(SyncStage.ApplyingChanges, current, _totalChanges, change); + } + + public void ReportApplyingChangesFinished() => Report(SyncStage.ApplyingChangesFinished, _totalChanges, _totalChanges); + + private void Report(SyncStage stage, int? current = null, int? total = null, IChange? change = null) + { + if (_progress is not null) + { + _progress.Report(new HarmonyProgress(stage, current, total)); + } + else if (_detailedProgress is not null) + { + var status = GetStatus(stage, change, total); + _detailedProgress.Report(new HarmonyDetailedProgress(stage, current, total, change, status, DateTimeOffset.Now)); + } + } + + private static string GetStatus(SyncStage stage, IChange? change, int? total) + { + if (change != null) return $"Applying {change.GetType().Name}"; + return stage switch + { + SyncStage.FetchingChanges => "Fetching changes...", + SyncStage.FetchingChangesFinished => "Finished fetching changes.", + SyncStage.ApplyingChanges => "Applying changes...", + SyncStage.ApplyingChangesFinished => "Finished applying changes.", + SyncStage.UploadingResources => "Uploading resources...", + SyncStage.UploadingResourcesFinished => "Finished uploading resources.", + SyncStage.UploadingChanges => total.HasValue ? $"Uploading {total} changes to remote..." : "Uploading changes to remote...", + SyncStage.UploadingChangesFinished => "Finished uploading changes.", + _ => stage.ToString() + }; + } +} diff --git a/src/SIL.Harmony/ISyncable.cs b/src/SIL.Harmony/ISyncable.cs index e97b3ce..96f0460 100644 --- a/src/SIL.Harmony/ISyncable.cs +++ b/src/SIL.Harmony/ISyncable.cs @@ -2,11 +2,11 @@ namespace SIL.Harmony; public interface ISyncable { - Task AddRangeFromSync(IEnumerable commits); + Task AddRangeFromSync(IEnumerable commits, HarmonyProgressReporter? progress = null); Task GetSyncState(); Task> GetChanges(SyncState otherHeads); - Task SyncWith(ISyncable remoteModel); - Task SyncMany(ISyncable[] remotes); + Task SyncWith(ISyncable remoteModel, HarmonyProgressReporter? progress = null); + Task SyncMany(ISyncable[] remotes, HarmonyProgressReporter? progress = null); ValueTask ShouldSync(); } @@ -14,7 +14,7 @@ public class NullSyncable : ISyncable { public static readonly ISyncable Instance = new NullSyncable(); - public Task AddRangeFromSync(IEnumerable commits) + public Task AddRangeFromSync(IEnumerable commits, HarmonyProgressReporter? progress = null) { return Task.CompletedTask; } @@ -29,12 +29,12 @@ public Task> GetChanges(SyncState otherHeads) return Task.FromResult(ChangesResult.Empty); } - public Task SyncWith(ISyncable remoteModel) + public Task SyncWith(ISyncable remoteModel, HarmonyProgressReporter? progress = null) { return Task.FromResult(new SyncResults([], [], false)); } - public Task SyncMany(ISyncable[] remotes) + public Task SyncMany(ISyncable[] remotes, HarmonyProgressReporter? progress = null) { return Task.CompletedTask; } diff --git a/src/SIL.Harmony/SnapshotWorker.cs b/src/SIL.Harmony/SnapshotWorker.cs index c4fe70f..20ebf2e 100644 --- a/src/SIL.Harmony/SnapshotWorker.cs +++ b/src/SIL.Harmony/SnapshotWorker.cs @@ -16,36 +16,42 @@ internal class SnapshotWorker private readonly Dictionary _pendingSnapshots = []; private readonly Dictionary _rootSnapshots = []; private readonly List _newIntermediateSnapshots = []; + private readonly HarmonyProgressReporter? _progress; private SnapshotWorker(Dictionary snapshots, Dictionary snapshotLookup, CrdtRepository crdtRepository, - CrdtConfig crdtConfig) + CrdtConfig crdtConfig, + HarmonyProgressReporter? progress = null) { _pendingSnapshots = snapshots; _crdtRepository = crdtRepository; _snapshotLookup = snapshotLookup; _crdtConfig = crdtConfig; + _progress = progress; } internal static async Task> ApplyCommitsToSnapshots( Dictionary snapshots, CrdtRepository crdtRepository, SortedSet commits, - CrdtConfig crdtConfig) + CrdtConfig crdtConfig, + HarmonyProgressReporter? progress = null) { //we need to pass in the snapshots because we expect it to be modified, this is intended. //if the constructor makes a copy in the future this will need to be updated - await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits); + await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig, progress).ApplyCommitChanges(commits); return snapshots; } /// a dictionary of entity id to latest snapshot id /// /// + /// internal SnapshotWorker(Dictionary snapshotLookup, CrdtRepository crdtRepository, - CrdtConfig crdtConfig): this([], snapshotLookup, crdtRepository, crdtConfig) + CrdtConfig crdtConfig, + HarmonyProgressReporter? progress = null): this([], snapshotLookup, crdtRepository, crdtConfig, progress) { } @@ -63,11 +69,15 @@ private async ValueTask ApplyCommitChanges(SortedSet commits) { var intermediateSnapshots = new Dictionary(); var commitIndex = 0; + _progress?.ReportStartApplyingChanges(commits); + var currentChange = 0; foreach (var commit in commits) { commitIndex++; foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index)) { + currentChange++; + _progress?.ReportApplyingChange(currentChange, commitChange.Change); IObjectBase entity; var prevSnapshot = await GetSnapshot(commitChange.EntityId); var changeContext = new ChangeContext(commit, commitIndex, intermediateSnapshots, this, _crdtConfig); @@ -107,6 +117,7 @@ private async ValueTask ApplyCommitChanges(SortedSet commits) _newIntermediateSnapshots.AddRange(intermediateSnapshots.Values); intermediateSnapshots.Clear(); } + _progress?.ReportApplyingChangesFinished(); } /// diff --git a/src/SIL.Harmony/SyncHelper.cs b/src/SIL.Harmony/SyncHelper.cs index f9ca2c6..0859c25 100644 --- a/src/SIL.Harmony/SyncHelper.cs +++ b/src/SIL.Harmony/SyncHelper.cs @@ -8,10 +8,13 @@ public static async Task SyncWithResourceUpload(this DataModel loca ISyncable remoteModel, ResourceService resourceService, IRemoteResourceService remoteResourceService, - Guid localClientId) + Guid localClientId, + HarmonyProgressReporter? progress = null) { + progress?.ReportUploadingResources(); await resourceService.UploadPendingResources(localClientId, remoteResourceService); - return await localModel.SyncWith(remoteModel); + progress?.ReportUploadingResourcesFinished(); + return await localModel.SyncWith(remoteModel, progress); } /// @@ -23,14 +26,17 @@ public static async Task SyncWithResourceUpload(this DataModel loca /// internal static async Task SyncWith(ISyncable localModel, ISyncable remoteModel, - JsonSerializerOptions serializerOptions) + JsonSerializerOptions serializerOptions, + HarmonyProgressReporter? progress = null) { if (!await localModel.ShouldSync() || !await remoteModel.ShouldSync()) return new SyncResults([], [], false); + progress?.ReportFetchingChanges(); var localSyncState = await localModel.GetSyncState(); var (missingFromLocal, remoteSyncState) = await remoteModel.GetChanges(localSyncState); //todo abort if local and remote heads are the same var (missingFromRemote, _) = await localModel.GetChanges(remoteSyncState); + progress?.ReportFetchingChangesFinished(); if (localModel is DataModel && remoteModel is DataModel) { //cloning just to simulate the objects going over the wire @@ -39,14 +45,19 @@ internal static async Task SyncWith(ISyncable localModel, } if (missingFromLocal.Length > 0) - await localModel.AddRangeFromSync(missingFromLocal); + await localModel.AddRangeFromSync(missingFromLocal, progress); if (missingFromRemote.Length > 0) + { + progress?.ReportUploadingChanges(missingFromRemote.Sum(c => c.ChangeEntities.Count)); await remoteModel.AddRangeFromSync(missingFromRemote); + progress?.ReportUploadingChangesFinished(); + } return new SyncResults(missingFromLocal, missingFromRemote, true); } - internal static async Task SyncMany(ISyncable localModel, ISyncable[] remotes, JsonSerializerOptions serializerOptions) + internal static async Task SyncMany(ISyncable localModel, ISyncable[] remotes, JsonSerializerOptions serializerOptions, HarmonyProgressReporter? progress = null) { + progress?.ReportFetchingChanges(); var localSyncState = await localModel.GetSyncState(); var remoteSyncStates = new SyncState[remotes.Length]; for (var i = 0; i < remotes.Length; i++) @@ -59,9 +70,10 @@ internal static async Task SyncMany(ISyncable localModel, ISyncable[] remotes, J missingFromLocal = Clone(missingFromLocal, serializerOptions); } remoteSyncStates[i] = remoteSyncState; - await localModel.AddRangeFromSync(missingFromLocal); + await localModel.AddRangeFromSync(missingFromLocal, progress); } + progress?.ReportFetchingChangesFinished(); // Now the localModel has all the changes from all remotes, so all remotes will get the changes from the localModel as well as all other remotes for (var i = 0; i < remotes.Length; i++) { @@ -73,7 +85,12 @@ internal static async Task SyncMany(ISyncable localModel, ISyncable[] remotes, J //cloning just to simulate the objects going over the wire missingFromRemote = Clone(missingFromRemote, serializerOptions); } - await remote.AddRangeFromSync(missingFromRemote); + if (missingFromRemote.Length > 0) + { + progress?.ReportUploadingChanges(missingFromRemote.Sum(c => c.ChangeEntities.Count)); + await remote.AddRangeFromSync(missingFromRemote); + progress?.ReportUploadingChangesFinished(); + } } }