130 lines
4.2 KiB
C#
130 lines
4.2 KiB
C#
using LiteDB;
|
||
using LehrerApp.Sync.Models;
|
||
|
||
namespace LehrerApp.Sync;
|
||
|
||
/// <summary>
|
||
/// Lokale Event-Queue in LiteDB.
|
||
/// Jede Datenänderung erzeugt einen Event – dieser wird hier gepuffert
|
||
/// bis er erfolgreich zum Server gepusht wurde.
|
||
/// </summary>
|
||
public class EventQueue : IDisposable
|
||
{
|
||
private readonly LiteDatabase _db;
|
||
private readonly ILiteCollection<SyncEvent> _queue;
|
||
private readonly ILiteCollection<SyncMeta> _meta;
|
||
private readonly ILiteCollection<ConflictEntry> _conflicts;
|
||
private long _currentSequenceNr;
|
||
|
||
public EventQueue(string queueDbPath)
|
||
{
|
||
_db = new LiteDatabase(queueDbPath);
|
||
_queue = _db.GetCollection<SyncEvent>("queue");
|
||
_meta = _db.GetCollection<SyncMeta>("meta");
|
||
_conflicts = _db.GetCollection<ConflictEntry>("conflicts");
|
||
|
||
_queue.EnsureIndex(x => x.SequenceNr);
|
||
_queue.EnsureIndex(x => x.Timestamp);
|
||
|
||
// Letzte SequenceNr wiederherstellen
|
||
var meta = _meta.FindById("seq");
|
||
_currentSequenceNr = meta?.Value ?? 0;
|
||
}
|
||
|
||
// ── Events einreihen ──────────────────────────────────────────────────────
|
||
|
||
public SyncEvent Enqueue(
|
||
string entityType,
|
||
string entityId,
|
||
string operation,
|
||
string encryptedPayload,
|
||
string deviceId,
|
||
DeviceType deviceType)
|
||
{
|
||
var evt = new SyncEvent
|
||
{
|
||
DeviceId = deviceId,
|
||
DeviceType = deviceType,
|
||
Timestamp = DateTime.UtcNow,
|
||
SequenceNr = ++_currentSequenceNr,
|
||
EntityType = entityType,
|
||
EntityId = entityId,
|
||
Operation = operation,
|
||
Payload = encryptedPayload,
|
||
};
|
||
|
||
_queue.Insert(evt);
|
||
_meta.Upsert(new SyncMeta { Id = "seq", Value = _currentSequenceNr });
|
||
|
||
return evt;
|
||
}
|
||
|
||
// ── Ausstehende Events ────────────────────────────────────────────────────
|
||
|
||
public List<SyncEvent> GetPending(int maxBatch = 100) =>
|
||
_queue.Find(Query.All(nameof(SyncEvent.SequenceNr)))
|
||
.Take(maxBatch)
|
||
.ToList();
|
||
|
||
public int PendingCount() => _queue.Count();
|
||
|
||
// ── Bestätigung nach erfolgreichem Push ───────────────────────────────────
|
||
|
||
public void Acknowledge(IEnumerable<Guid> eventIds)
|
||
{
|
||
foreach (var id in eventIds)
|
||
_queue.Delete(id);
|
||
}
|
||
|
||
// ── Letzte Sync-Metadaten ─────────────────────────────────────────────────
|
||
|
||
public long GetLastServerSequenceNr() =>
|
||
_meta.FindById("serverSeq")?.Value ?? 0;
|
||
|
||
public void SetLastServerSequenceNr(long nr) =>
|
||
_meta.Upsert(new SyncMeta { Id = "serverSeq", Value = nr });
|
||
|
||
public DateTime? GetLastSyncAt()
|
||
{
|
||
var meta = _meta.FindById("lastSync");
|
||
return meta?.Timestamp;
|
||
}
|
||
|
||
public void SetLastSyncAt(DateTime timestamp) =>
|
||
_meta.Upsert(new SyncMeta
|
||
{
|
||
Id = "lastSync",
|
||
Value = 0,
|
||
Timestamp = timestamp
|
||
});
|
||
|
||
// ── Konflikte ─────────────────────────────────────────────────────────────
|
||
|
||
public void AddConflict(ConflictEntry conflict) =>
|
||
_conflicts.Insert(conflict);
|
||
|
||
public List<ConflictEntry> GetUnreviewedConflicts() =>
|
||
_conflicts.Find(c => !c.Reviewed).ToList();
|
||
|
||
public int ConflictCount() =>
|
||
_conflicts.Count(c => !c.Reviewed);
|
||
|
||
public void MarkConflictReviewed(Guid id)
|
||
{
|
||
var conflict = _conflicts.FindById(id);
|
||
if (conflict is null) return;
|
||
conflict.Reviewed = true;
|
||
_conflicts.Update(conflict);
|
||
}
|
||
|
||
public void Dispose() => _db.Dispose();
|
||
}
|
||
|
||
// Internes Hilfsdokument für Metadaten
|
||
internal class SyncMeta
|
||
{
|
||
public string Id { get; set; } = "";
|
||
public long Value { get; set; }
|
||
public DateTime? Timestamp { get; set; }
|
||
}
|