Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/SIL.Harmony.Core/HarmonyProgress.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace SIL.Harmony.Core;

public record struct HarmonyProgress(SyncStage Stage, int? Current, int? Total);
13 changes: 13 additions & 0 deletions src/SIL.Harmony.Core/SyncStage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SIL.Harmony.Core;

public enum SyncStage
{
FetchingChanges,
FetchingChangesFinished,
ApplyingChanges,
ApplyingChangesFinished,
UploadingResources,
UploadingResourcesFinished,
UploadingChanges,
UploadingChangesFinished
}
103 changes: 103 additions & 0 deletions src/SIL.Harmony.Tests/ProgressTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System.Collections.Concurrent;
using SIL.Harmony.Changes;
using SIL.Harmony.Sample;
using SIL.Harmony.Sample.Changes;
using Microsoft.Extensions.DependencyInjection;
using Xunit;

namespace SIL.Harmony.Tests;

public class ProgressTests : DataModelTestBase
{
private readonly ITestOutputHelper _output;

public ProgressTests(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task AddManyChanges_ReportsProgress()
{
var clientId = Guid.NewGuid();
var changes = Enumerable.Range(0, 10).Select(i => new SetTagChange(Guid.NewGuid(), $"Tag {i}")).ToArray();
var progressReports = new ConcurrentQueue<HarmonyProgress>();
var progress = new Progress<HarmonyProgress>(p =>
{
_output.WriteLine($"Progress: {p.Current}/{p.Total} - {p.Stage}");
progressReports.Enqueue(p);
});
var reporter = new HarmonyProgressReporter(progress);

await DataModel.AddManyChanges(clientId, changes, () => new CommitMetadata(), 2, reporter);

// Wait a bit for Progress to report (it's often asynchronous)
await Task.Delay(100);

Check warning on line 35 in src/SIL.Harmony.Tests/ProgressTests.cs

View workflow job for this annotation

GitHub Actions / build

Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken to allow test cancellation to be more responsive. (https://xunit.net/xunit.analyzers/rules/xUnit1051)

Check warning on line 35 in src/SIL.Harmony.Tests/ProgressTests.cs

View workflow job for this annotation

GitHub Actions / build

Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken to allow test cancellation to be more responsive. (https://xunit.net/xunit.analyzers/rules/xUnit1051)

Assert.NotEmpty(progressReports);
Assert.Contains(progressReports, p => p.Current == 10 && p.Total == 10 && p.Stage == SyncStage.ApplyingChanges);
Assert.Contains(progressReports, p => p.Stage == SyncStage.ApplyingChangesFinished);
}

[Fact]
public async Task AddManyChanges_ReportsDetailedProgress()
{
var clientId = Guid.NewGuid();
var changes = Enumerable.Range(0, 10).Select(i => new SetTagChange(Guid.NewGuid(), $"Tag {i}")).ToArray();
var progressReports = new ConcurrentQueue<HarmonyDetailedProgress>();
var progress = new Progress<HarmonyDetailedProgress>(p =>
{
_output.WriteLine($"Detailed Progress: {p.Current}/{p.Total} - {p.Status} @ {p.DateTime}");
progressReports.Enqueue(p);
});
var reporter = new HarmonyProgressReporter(progress);

await DataModel.AddManyChanges(clientId, changes, () => new CommitMetadata(), 2, reporter);

await Task.Delay(100);

Check warning on line 57 in src/SIL.Harmony.Tests/ProgressTests.cs

View workflow job for this annotation

GitHub Actions / build

Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken to allow test cancellation to be more responsive. (https://xunit.net/xunit.analyzers/rules/xUnit1051)

Check warning on line 57 in src/SIL.Harmony.Tests/ProgressTests.cs

View workflow job for this annotation

GitHub Actions / build

Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken to allow test cancellation to be more responsive. (https://xunit.net/xunit.analyzers/rules/xUnit1051)

Assert.NotEmpty(progressReports);
Assert.Contains(progressReports, p => p.Current == 10 && p.Total == 10 && p.Change is SetTagChange);
Assert.Contains(progressReports, p => p.Stage == SyncStage.ApplyingChangesFinished && p.Status == "Finished applying changes.");
Assert.All(progressReports, p => Assert.NotEmpty(p.Status));
}

[Fact]
public async Task SyncWith_ReportsProgress()
{
var model1 = DataModel;
var testBase2 = new DataModelTestBase();
var model2 = testBase2.DataModel;

var clientId = Guid.NewGuid();
// remote changes to be downloaded by model1
await model2.AddChanges(clientId, [new SetTagChange(Guid.NewGuid(), "Tag 1"), new SetTagChange(Guid.NewGuid(), "Tag 2")]);
// local changes to be uploaded to model2
await model1.AddChanges(clientId, [new SetTagChange(Guid.NewGuid(), "Tag 3")]);

var progressReports = new ConcurrentQueue<HarmonyDetailedProgress>();
var progress = new Progress<HarmonyDetailedProgress>(p =>
{
_output.WriteLine($"Sync Progress: {p.Current}/{p.Total} - {p.Status} ({p.Stage})");
progressReports.Enqueue(p);
});
var reporter = new HarmonyProgressReporter(progress);

await model1.SyncWith(model2, reporter);

await Task.Delay(500);

Check warning on line 88 in src/SIL.Harmony.Tests/ProgressTests.cs

View workflow job for this annotation

GitHub Actions / build

Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken to allow test cancellation to be more responsive. (https://xunit.net/xunit.analyzers/rules/xUnit1051)

Check warning on line 88 in src/SIL.Harmony.Tests/ProgressTests.cs

View workflow job for this annotation

GitHub Actions / build

Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken to allow test cancellation to be more responsive. (https://xunit.net/xunit.analyzers/rules/xUnit1051)

_output.WriteLine($"Reports count: {progressReports.Count}");
foreach(var report in progressReports)
{
_output.WriteLine($"- {report.Stage}: {report.Current}/{report.Total} {report.Status}");
}

Assert.NotEmpty(progressReports);
Assert.Contains(progressReports, p => p.Stage == SyncStage.FetchingChanges);
Assert.Contains(progressReports, p => p.Stage == SyncStage.FetchingChangesFinished);
// We expect UploadingChanges happens when sending local changes to remote
Assert.Contains(progressReports, p => p.Stage == SyncStage.UploadingChanges && p.Total == 1 && p.Status.Contains("Uploading 1 changes"));
Assert.Contains(progressReports, p => p.Stage == SyncStage.UploadingChangesFinished);
}
}
21 changes: 11 additions & 10 deletions src/SIL.Harmony/DataModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public async Task<Commit> AddChange(
public async Task AddManyChanges(Guid clientId,
IEnumerable<IChange> changes,
Func<CommitMetadata?> commitMetadata,
int changesPerCommitMax = 100)
int changesPerCommitMax = 100,
HarmonyProgressReporter? progress = null)
{
await using var repo = await _crdtRepositoryFactory.CreateRepository();
var commits = changes
Expand All @@ -82,7 +83,7 @@ public async Task AddManyChanges(Guid clientId,

await using var transaction = await repo.BeginTransactionAsync();
var updatedCommits = await repo.AddCommits(commits);
await UpdateSnapshots(repo, updatedCommits);
await UpdateSnapshots(repo, updatedCommits, progress);
await ValidateCommits(repo);
await transaction.CommitAsync();
}
Expand Down Expand Up @@ -141,7 +142,7 @@ private static ChangeEntity<IChange> ToChangeEntity(IChange change, int index)
};
}

async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits)
async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits, HarmonyProgressReporter? progress)
{
commits = commits.ToArray();
try
Expand All @@ -156,7 +157,7 @@ async Task ISyncable.AddRangeFromSync(IEnumerable<Commit> commits)

await using var transaction = await repo.BeginTransactionAsync();
var updatedCommits = await repo.AddCommits(newCommits);
await UpdateSnapshots(repo, updatedCommits);
await UpdateSnapshots(repo, updatedCommits, progress);
await ValidateCommits(repo);
await transaction.CommitAsync();
}
Expand Down Expand Up @@ -193,7 +194,7 @@ ValueTask<bool> ISyncable.ShouldSync()
return ValueTask.FromResult(true);
}

private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commitsToApply)
private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commitsToApply, HarmonyProgressReporter? progress = null)
{
if (commitsToApply.Count == 0) return;
var oldestAddedCommit = commitsToApply.First();
Expand All @@ -213,7 +214,7 @@ private async Task UpdateSnapshots(CrdtRepository repo, SortedSet<Commit> commit
snapshotLookup = [];
}

var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value);
var snapshotWorker = new SnapshotWorker(snapshotLookup, repo, _crdtConfig.Value, progress);
await snapshotWorker.UpdateSnapshots(commitsToApply);
}

Expand Down Expand Up @@ -372,13 +373,13 @@ public async Task<ChangesResult<Commit>> GetChanges(SyncState remoteState)
return await repo.GetChanges(remoteState);
}

public async Task<SyncResults> SyncWith(ISyncable remoteModel)
public async Task<SyncResults> SyncWith(ISyncable remoteModel, HarmonyProgressReporter? progress = null)
{
return await SyncHelper.SyncWith(this, remoteModel, _serializerOptions);
return await SyncHelper.SyncWith(this, remoteModel, _serializerOptions, progress);
}

public async Task SyncMany(ISyncable[] remotes)
public async Task SyncMany(ISyncable[] remotes, HarmonyProgressReporter? progress = null)
{
await SyncHelper.SyncMany(this, remotes, _serializerOptions);
await SyncHelper.SyncMany(this, remotes, _serializerOptions, progress);
}
}
5 changes: 5 additions & 0 deletions src/SIL.Harmony/HarmonyDetailedProgress.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using SIL.Harmony.Changes;

namespace SIL.Harmony;

public record struct HarmonyDetailedProgress(SyncStage Stage, int? Current, int? Total, IChange? Change, string Status, DateTimeOffset DateTime);
71 changes: 71 additions & 0 deletions src/SIL.Harmony/HarmonyProgressReporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using SIL.Harmony.Changes;

namespace SIL.Harmony;

public class HarmonyProgressReporter
{
private readonly IProgress<HarmonyProgress>? _progress;
private readonly IProgress<HarmonyDetailedProgress>? _detailedProgress;
private int? _totalChanges;

public HarmonyProgressReporter(IProgress<HarmonyProgress> progress)
{
_progress = progress;
}

public HarmonyProgressReporter(IProgress<HarmonyDetailedProgress> detailedProgress)
{
_detailedProgress = detailedProgress;
}

public void ReportFetchingChanges() => Report(SyncStage.FetchingChanges);
public void ReportFetchingChangesFinished() => Report(SyncStage.FetchingChangesFinished);
public void ReportUploadingResources() => Report(SyncStage.UploadingResources);
public void ReportUploadingResourcesFinished() => Report(SyncStage.UploadingResourcesFinished);
public void ReportUploadingChanges(int? count = null) => Report(SyncStage.UploadingChanges, total: count);
public void ReportUploadingChangesFinished() => Report(SyncStage.UploadingChangesFinished);

public void ReportStartApplyingChanges(IEnumerable<Commit> commits)
{
if (_progress is null && _detailedProgress is null) return;
_totalChanges = commits.Sum(c => c.ChangeEntities.Count);
Report(SyncStage.ApplyingChanges, 0, _totalChanges);
}

public void ReportApplyingChange(int current, IChange change)
{
Report(SyncStage.ApplyingChanges, current, _totalChanges, change);
}

public void ReportApplyingChangesFinished() => Report(SyncStage.ApplyingChangesFinished, _totalChanges, _totalChanges);

private void Report(SyncStage stage, int? current = null, int? total = null, IChange? change = null)
{
if (_progress is not null)
{
_progress.Report(new HarmonyProgress(stage, current, total));
}
else if (_detailedProgress is not null)
{
var status = GetStatus(stage, change, total);
_detailedProgress.Report(new HarmonyDetailedProgress(stage, current, total, change, status, DateTimeOffset.Now));
}
}

private static string GetStatus(SyncStage stage, IChange? change, int? total)
{
if (change != null) return $"Applying {change.GetType().Name}";
return stage switch
{
SyncStage.FetchingChanges => "Fetching changes...",
SyncStage.FetchingChangesFinished => "Finished fetching changes.",
SyncStage.ApplyingChanges => "Applying changes...",
SyncStage.ApplyingChangesFinished => "Finished applying changes.",
SyncStage.UploadingResources => "Uploading resources...",
SyncStage.UploadingResourcesFinished => "Finished uploading resources.",
SyncStage.UploadingChanges => total.HasValue ? $"Uploading {total} changes to remote..." : "Uploading changes to remote...",
SyncStage.UploadingChangesFinished => "Finished uploading changes.",
_ => stage.ToString()
};
}
}
12 changes: 6 additions & 6 deletions src/SIL.Harmony/ISyncable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ namespace SIL.Harmony;

public interface ISyncable
{
Task AddRangeFromSync(IEnumerable<Commit> commits);
Task AddRangeFromSync(IEnumerable<Commit> commits, HarmonyProgressReporter? progress = null);
Task<SyncState> GetSyncState();
Task<ChangesResult<Commit>> GetChanges(SyncState otherHeads);
Task<SyncResults> SyncWith(ISyncable remoteModel);
Task SyncMany(ISyncable[] remotes);
Task<SyncResults> SyncWith(ISyncable remoteModel, HarmonyProgressReporter? progress = null);
Task SyncMany(ISyncable[] remotes, HarmonyProgressReporter? progress = null);
ValueTask<bool> ShouldSync();
}

public class NullSyncable : ISyncable
{
public static readonly ISyncable Instance = new NullSyncable();

public Task AddRangeFromSync(IEnumerable<Commit> commits)
public Task AddRangeFromSync(IEnumerable<Commit> commits, HarmonyProgressReporter? progress = null)
{
return Task.CompletedTask;
}
Expand All @@ -29,12 +29,12 @@ public Task<ChangesResult<Commit>> GetChanges(SyncState otherHeads)
return Task.FromResult(ChangesResult<Commit>.Empty);
}

public Task<SyncResults> SyncWith(ISyncable remoteModel)
public Task<SyncResults> SyncWith(ISyncable remoteModel, HarmonyProgressReporter? progress = null)
{
return Task.FromResult(new SyncResults([], [], false));
}

public Task SyncMany(ISyncable[] remotes)
public Task SyncMany(ISyncable[] remotes, HarmonyProgressReporter? progress = null)
{
return Task.CompletedTask;
}
Expand Down
19 changes: 15 additions & 4 deletions src/SIL.Harmony/SnapshotWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,42 @@ internal class SnapshotWorker
private readonly Dictionary<Guid, ObjectSnapshot> _pendingSnapshots = [];
private readonly Dictionary<Guid, ObjectSnapshot> _rootSnapshots = [];
private readonly List<ObjectSnapshot> _newIntermediateSnapshots = [];
private readonly HarmonyProgressReporter? _progress;

private SnapshotWorker(Dictionary<Guid, ObjectSnapshot> snapshots,
Dictionary<Guid, Guid?> snapshotLookup,
CrdtRepository crdtRepository,
CrdtConfig crdtConfig)
CrdtConfig crdtConfig,
HarmonyProgressReporter? progress = null)
{
_pendingSnapshots = snapshots;
_crdtRepository = crdtRepository;
_snapshotLookup = snapshotLookup;
_crdtConfig = crdtConfig;
_progress = progress;
}

internal static async Task<Dictionary<Guid, ObjectSnapshot>> ApplyCommitsToSnapshots(
Dictionary<Guid, ObjectSnapshot> snapshots,
CrdtRepository crdtRepository,
SortedSet<Commit> commits,
CrdtConfig crdtConfig)
CrdtConfig crdtConfig,
HarmonyProgressReporter? progress = null)
{
//we need to pass in the snapshots because we expect it to be modified, this is intended.
//if the constructor makes a copy in the future this will need to be updated
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig).ApplyCommitChanges(commits);
await new SnapshotWorker(snapshots, [], crdtRepository, crdtConfig, progress).ApplyCommitChanges(commits);
return snapshots;
}

/// <param name="snapshotLookup">a dictionary of entity id to latest snapshot id</param>
/// <param name="crdtRepository"></param>
/// <param name="crdtConfig"></param>
/// <param name="progress"></param>
internal SnapshotWorker(Dictionary<Guid, Guid?> snapshotLookup,
CrdtRepository crdtRepository,
CrdtConfig crdtConfig): this([], snapshotLookup, crdtRepository, crdtConfig)
CrdtConfig crdtConfig,
HarmonyProgressReporter? progress = null): this([], snapshotLookup, crdtRepository, crdtConfig, progress)
{
}

Expand All @@ -63,11 +69,15 @@ private async ValueTask ApplyCommitChanges(SortedSet<Commit> commits)
{
var intermediateSnapshots = new Dictionary<Guid, ObjectSnapshot>();
var commitIndex = 0;
_progress?.ReportStartApplyingChanges(commits);
var currentChange = 0;
foreach (var commit in commits)
{
commitIndex++;
foreach (var commitChange in commit.ChangeEntities.OrderBy(c => c.Index))
{
currentChange++;
_progress?.ReportApplyingChange(currentChange, commitChange.Change);
IObjectBase entity;
var prevSnapshot = await GetSnapshot(commitChange.EntityId);
var changeContext = new ChangeContext(commit, commitIndex, intermediateSnapshots, this, _crdtConfig);
Expand Down Expand Up @@ -107,6 +117,7 @@ private async ValueTask ApplyCommitChanges(SortedSet<Commit> commits)
_newIntermediateSnapshots.AddRange(intermediateSnapshots.Values);
intermediateSnapshots.Clear();
}
_progress?.ReportApplyingChangesFinished();
}

/// <summary>
Expand Down
Loading
Loading