< Summary

Information
Class: AsiBackbone.Storage.InMemory.Outbox.InMemoryGovernanceOutboxStore
Assembly: AsiBackbone.Storage.InMemory
File(s): /home/runner/work/AsiBackbone/AsiBackbone/src/AsiBackbone.Storage.InMemory/Outbox/InMemoryGovernanceOutboxStore.cs
Line coverage
92%
Covered lines: 70
Uncovered lines: 6
Coverable lines: 76
Total lines: 222
Line coverage: 92.1%
Branch coverage
53%
Covered branches: 15
Total branches: 28
Branch coverage: 53.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor()100%11100%
EnqueueAsync(...)50%22100%
SaveAsync(...)100%11100%
FindByOutboxEntryIdAsync(...)100%11100%
FindPendingAsync(...)100%11100%
FindRetryReadyAsync(...)50%22100%
MarkDeliveredAsync(...)50%22100%
MarkFailedAsync(...)50%22100%
MarkDeadLetteredAsync(...)50%22100%
SaveEntry(...)50%16850%
UpdateExistingEntry(...)50%6677.77%
IsTerminal(...)100%22100%
NormalizeMaxCount(...)50%22100%

File(s)

/home/runner/work/AsiBackbone/AsiBackbone/src/AsiBackbone.Storage.InMemory/Outbox/InMemoryGovernanceOutboxStore.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using AsiBackbone.Core.Emissions;
 3using AsiBackbone.Core.Outbox;
 4
 5namespace AsiBackbone.Storage.InMemory.Outbox;
 6
 7/// <summary>
 8/// In-memory governance outbox store for tests, samples, and development hosts.
 9/// </summary>
 10/// <remarks>
 11/// This store is not durable across process restarts. Production hosts should use a durable provider such as EF Core or
 12/// Same-entry status transitions use single-process compare-and-swap updates so tests and local validation do not accid
 13/// </remarks>
 14public sealed class InMemoryGovernanceOutboxStore : IAsiBackboneGovernanceOutboxStore
 15{
 1016    private readonly ConcurrentDictionary<string, GovernanceOutboxEntry> entries = new(StringComparer.Ordinal);
 17
 18    /// <inheritdoc />
 19    public ValueTask<GovernanceOutboxEntry> EnqueueAsync(
 20        GovernanceEmissionEnvelope envelope,
 21        CancellationToken cancellationToken = default)
 22    {
 1023        ArgumentNullException.ThrowIfNull(envelope);
 1024        cancellationToken.ThrowIfCancellationRequested();
 25
 1026        var entry = GovernanceOutboxEntry.Create(envelope);
 27
 1028        return !entries.TryAdd(entry.OutboxEntryId, entry)
 1029            ? throw new InvalidOperationException($"Outbox entry '{entry.OutboxEntryId}' already exists.")
 1030            : ValueTask.FromResult(entry);
 31    }
 32
 33    /// <inheritdoc />
 34    public ValueTask<GovernanceOutboxEntry> SaveAsync(
 35        GovernanceOutboxEntry entry,
 36        CancellationToken cancellationToken = default)
 37    {
 138        ArgumentNullException.ThrowIfNull(entry);
 139        cancellationToken.ThrowIfCancellationRequested();
 40
 141        return ValueTask.FromResult(SaveEntry(entry, cancellationToken));
 42    }
 43
 44    /// <inheritdoc />
 45    public ValueTask<GovernanceOutboxEntry?> FindByOutboxEntryIdAsync(
 46        string outboxEntryId,
 47        CancellationToken cancellationToken = default)
 48    {
 649        ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId);
 650        cancellationToken.ThrowIfCancellationRequested();
 51
 652        _ = entries.TryGetValue(outboxEntryId.Trim(), out GovernanceOutboxEntry? entry);
 53
 654        return ValueTask.FromResult(entry);
 55    }
 56
 57    /// <inheritdoc />
 58    public ValueTask<IReadOnlyList<GovernanceOutboxEntry>> FindPendingAsync(
 59        int maxCount = 100,
 60        CancellationToken cancellationToken = default)
 61    {
 562        cancellationToken.ThrowIfCancellationRequested();
 63
 564        int normalizedMaxCount = NormalizeMaxCount(maxCount);
 65
 566        IReadOnlyList<GovernanceOutboxEntry> matches = [.. entries.Values
 567            .Where(entry => entry.Status is GovernanceEmissionStatus.Pending)
 368            .OrderBy(entry => entry.CreatedUtc)
 369            .ThenBy(entry => entry.OutboxEntryId)
 570            .Take(normalizedMaxCount)];
 71
 572        return ValueTask.FromResult(matches);
 73    }
 74
 75    /// <inheritdoc />
 76    public ValueTask<IReadOnlyList<GovernanceOutboxEntry>> FindRetryReadyAsync(
 77        DateTimeOffset utcNow,
 78        int maxCount = 100,
 79        CancellationToken cancellationToken = default)
 80    {
 781        cancellationToken.ThrowIfCancellationRequested();
 82
 783        int normalizedMaxCount = NormalizeMaxCount(maxCount);
 784        DateTimeOffset normalizedUtcNow = utcNow.ToUniversalTime();
 85
 786        IReadOnlyList<GovernanceOutboxEntry> matches = [.. entries.Values
 787            .Where(entry => entry.IsRetryReady(normalizedUtcNow))
 288            .OrderBy(entry => entry.NextRetryUtc ?? entry.UpdatedUtc)
 289            .ThenBy(entry => entry.OutboxEntryId)
 790            .Take(normalizedMaxCount)];
 91
 792        return ValueTask.FromResult(matches);
 93    }
 94
 95    /// <inheritdoc />
 96    public ValueTask<GovernanceOutboxEntry> MarkDeliveredAsync(
 97        string outboxEntryId,
 98        GovernanceEmissionResult result,
 99        CancellationToken cancellationToken = default)
 100    {
 4101        ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId);
 4102        ArgumentNullException.ThrowIfNull(result);
 4103        cancellationToken.ThrowIfCancellationRequested();
 104
 4105        GovernanceOutboxEntry updatedEntry = UpdateExistingEntry(
 4106            outboxEntryId,
 4107            entry => IsTerminal(entry) ? entry : entry.MarkDelivered(result),
 4108            cancellationToken);
 109
 4110        return ValueTask.FromResult(updatedEntry);
 111    }
 112
 113    /// <inheritdoc />
 114    public ValueTask<GovernanceOutboxEntry> MarkFailedAsync(
 115        string outboxEntryId,
 116        GovernanceEmissionError governanceEmissionError,
 117        DateTimeOffset? nextRetryUtc = null,
 118        CancellationToken cancellationToken = default)
 119    {
 8120        ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId);
 8121        ArgumentNullException.ThrowIfNull(governanceEmissionError);
 8122        cancellationToken.ThrowIfCancellationRequested();
 123
 8124        GovernanceOutboxEntry updatedEntry = UpdateExistingEntry(
 8125            outboxEntryId,
 8126            entry => IsTerminal(entry) ? entry : entry.MarkFailed(governanceEmissionError, nextRetryUtc),
 8127            cancellationToken);
 128
 8129        return ValueTask.FromResult(updatedEntry);
 130    }
 131
 132    /// <inheritdoc />
 133    public ValueTask<GovernanceOutboxEntry> MarkDeadLetteredAsync(
 134        string outboxEntryId,
 135        GovernanceEmissionError governanceEmissionError,
 136        string? deadLetterReason = null,
 137        CancellationToken cancellationToken = default)
 138    {
 1139        ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId);
 1140        ArgumentNullException.ThrowIfNull(governanceEmissionError);
 1141        cancellationToken.ThrowIfCancellationRequested();
 142
 1143        GovernanceOutboxEntry updatedEntry = UpdateExistingEntry(
 1144            outboxEntryId,
 1145            entry => IsTerminal(entry) ? entry : entry.MarkDeadLettered(governanceEmissionError, deadLetterReason),
 1146            cancellationToken);
 147
 1148        return ValueTask.FromResult(updatedEntry);
 149    }
 150
 151    private GovernanceOutboxEntry SaveEntry(
 152        GovernanceOutboxEntry entry,
 153        CancellationToken cancellationToken)
 154    {
 155        while (true)
 156        {
 1157            cancellationToken.ThrowIfCancellationRequested();
 158
 1159            if (!entries.TryGetValue(entry.OutboxEntryId, out GovernanceOutboxEntry? currentEntry))
 160            {
 0161                if (entries.TryAdd(entry.OutboxEntryId, entry))
 162                {
 0163                    return entry;
 164                }
 165
 166                continue;
 167            }
 168
 1169            if (IsTerminal(currentEntry))
 170            {
 1171                return currentEntry;
 172            }
 173
 0174            if (entries.TryUpdate(entry.OutboxEntryId, entry, currentEntry))
 175            {
 0176                return entry;
 177            }
 178        }
 179    }
 180
 181    private GovernanceOutboxEntry UpdateExistingEntry(
 182        string outboxEntryId,
 183        Func<GovernanceOutboxEntry, GovernanceOutboxEntry> updateEntry,
 184        CancellationToken cancellationToken)
 185    {
 13186        string normalizedOutboxEntryId = outboxEntryId.Trim();
 187
 188        while (true)
 189        {
 13190            cancellationToken.ThrowIfCancellationRequested();
 191
 13192            if (!entries.TryGetValue(normalizedOutboxEntryId, out GovernanceOutboxEntry? currentEntry))
 193            {
 0194                throw new InvalidOperationException($"Outbox entry '{normalizedOutboxEntryId}' was not found.");
 195            }
 196
 13197            GovernanceOutboxEntry updatedEntry = updateEntry(currentEntry);
 198
 13199            if (ReferenceEquals(currentEntry, updatedEntry))
 200            {
 0201                return currentEntry;
 202            }
 203
 13204            if (entries.TryUpdate(normalizedOutboxEntryId, updatedEntry, currentEntry))
 205            {
 13206                return updatedEntry;
 207            }
 208        }
 209    }
 210
 211    private static bool IsTerminal(GovernanceOutboxEntry entry)
 212    {
 14213        return entry.IsDelivered || entry.IsDeadLettered;
 214    }
 215
 216    private static int NormalizeMaxCount(int maxCount)
 217    {
 12218        return maxCount <= 0
 12219            ? throw new ArgumentOutOfRangeException(nameof(maxCount), maxCount, "Maximum count must be greater than zero
 12220            : maxCount;
 221    }
 222}