| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using AsiBackbone.Core.Emissions; |
| | | 3 | | using AsiBackbone.Core.Outbox; |
| | | 4 | | |
| | | 5 | | namespace AsiBackbone.Storage.InMemory.Outbox; |
| | | 6 | | |
| | | 7 | | /// <summary> |
| | | 8 | | /// In-memory governance outbox store for tests, samples, and development hosts. |
| | | 9 | | /// </summary> |
| | | 10 | | /// <remarks> |
| | | 11 | | /// This store is not durable across process restarts. Production hosts should use a durable provider such as EF Core or |
| | | 12 | | /// Same-entry status transitions use single-process compare-and-swap updates so tests and local validation do not accid |
| | | 13 | | /// </remarks> |
| | | 14 | | public sealed class InMemoryGovernanceOutboxStore : IAsiBackboneGovernanceOutboxStore |
| | | 15 | | { |
| | 10 | 16 | | private readonly ConcurrentDictionary<string, GovernanceOutboxEntry> entries = new(StringComparer.Ordinal); |
| | | 17 | | |
| | | 18 | | /// <inheritdoc /> |
| | | 19 | | public ValueTask<GovernanceOutboxEntry> EnqueueAsync( |
| | | 20 | | GovernanceEmissionEnvelope envelope, |
| | | 21 | | CancellationToken cancellationToken = default) |
| | | 22 | | { |
| | 10 | 23 | | ArgumentNullException.ThrowIfNull(envelope); |
| | 10 | 24 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 25 | | |
| | 10 | 26 | | var entry = GovernanceOutboxEntry.Create(envelope); |
| | | 27 | | |
| | 10 | 28 | | return !entries.TryAdd(entry.OutboxEntryId, entry) |
| | 10 | 29 | | ? throw new InvalidOperationException($"Outbox entry '{entry.OutboxEntryId}' already exists.") |
| | 10 | 30 | | : ValueTask.FromResult(entry); |
| | | 31 | | } |
| | | 32 | | |
| | | 33 | | /// <inheritdoc /> |
| | | 34 | | public ValueTask<GovernanceOutboxEntry> SaveAsync( |
| | | 35 | | GovernanceOutboxEntry entry, |
| | | 36 | | CancellationToken cancellationToken = default) |
| | | 37 | | { |
| | 1 | 38 | | ArgumentNullException.ThrowIfNull(entry); |
| | 1 | 39 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 40 | | |
| | 1 | 41 | | return ValueTask.FromResult(SaveEntry(entry, cancellationToken)); |
| | | 42 | | } |
| | | 43 | | |
| | | 44 | | /// <inheritdoc /> |
| | | 45 | | public ValueTask<GovernanceOutboxEntry?> FindByOutboxEntryIdAsync( |
| | | 46 | | string outboxEntryId, |
| | | 47 | | CancellationToken cancellationToken = default) |
| | | 48 | | { |
| | 6 | 49 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 6 | 50 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 51 | | |
| | 6 | 52 | | _ = entries.TryGetValue(outboxEntryId.Trim(), out GovernanceOutboxEntry? entry); |
| | | 53 | | |
| | 6 | 54 | | return ValueTask.FromResult(entry); |
| | | 55 | | } |
| | | 56 | | |
| | | 57 | | /// <inheritdoc /> |
| | | 58 | | public ValueTask<IReadOnlyList<GovernanceOutboxEntry>> FindPendingAsync( |
| | | 59 | | int maxCount = 100, |
| | | 60 | | CancellationToken cancellationToken = default) |
| | | 61 | | { |
| | 5 | 62 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 63 | | |
| | 5 | 64 | | int normalizedMaxCount = NormalizeMaxCount(maxCount); |
| | | 65 | | |
| | 5 | 66 | | IReadOnlyList<GovernanceOutboxEntry> matches = [.. entries.Values |
| | 5 | 67 | | .Where(entry => entry.Status is GovernanceEmissionStatus.Pending) |
| | 3 | 68 | | .OrderBy(entry => entry.CreatedUtc) |
| | 3 | 69 | | .ThenBy(entry => entry.OutboxEntryId) |
| | 5 | 70 | | .Take(normalizedMaxCount)]; |
| | | 71 | | |
| | 5 | 72 | | return ValueTask.FromResult(matches); |
| | | 73 | | } |
| | | 74 | | |
| | | 75 | | /// <inheritdoc /> |
| | | 76 | | public ValueTask<IReadOnlyList<GovernanceOutboxEntry>> FindRetryReadyAsync( |
| | | 77 | | DateTimeOffset utcNow, |
| | | 78 | | int maxCount = 100, |
| | | 79 | | CancellationToken cancellationToken = default) |
| | | 80 | | { |
| | 7 | 81 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 82 | | |
| | 7 | 83 | | int normalizedMaxCount = NormalizeMaxCount(maxCount); |
| | 7 | 84 | | DateTimeOffset normalizedUtcNow = utcNow.ToUniversalTime(); |
| | | 85 | | |
| | 7 | 86 | | IReadOnlyList<GovernanceOutboxEntry> matches = [.. entries.Values |
| | 7 | 87 | | .Where(entry => entry.IsRetryReady(normalizedUtcNow)) |
| | 2 | 88 | | .OrderBy(entry => entry.NextRetryUtc ?? entry.UpdatedUtc) |
| | 2 | 89 | | .ThenBy(entry => entry.OutboxEntryId) |
| | 7 | 90 | | .Take(normalizedMaxCount)]; |
| | | 91 | | |
| | 7 | 92 | | return ValueTask.FromResult(matches); |
| | | 93 | | } |
| | | 94 | | |
| | | 95 | | /// <inheritdoc /> |
| | | 96 | | public ValueTask<GovernanceOutboxEntry> MarkDeliveredAsync( |
| | | 97 | | string outboxEntryId, |
| | | 98 | | GovernanceEmissionResult result, |
| | | 99 | | CancellationToken cancellationToken = default) |
| | | 100 | | { |
| | 4 | 101 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 4 | 102 | | ArgumentNullException.ThrowIfNull(result); |
| | 4 | 103 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 104 | | |
| | 4 | 105 | | GovernanceOutboxEntry updatedEntry = UpdateExistingEntry( |
| | 4 | 106 | | outboxEntryId, |
| | 4 | 107 | | entry => IsTerminal(entry) ? entry : entry.MarkDelivered(result), |
| | 4 | 108 | | cancellationToken); |
| | | 109 | | |
| | 4 | 110 | | return ValueTask.FromResult(updatedEntry); |
| | | 111 | | } |
| | | 112 | | |
| | | 113 | | /// <inheritdoc /> |
| | | 114 | | public ValueTask<GovernanceOutboxEntry> MarkFailedAsync( |
| | | 115 | | string outboxEntryId, |
| | | 116 | | GovernanceEmissionError governanceEmissionError, |
| | | 117 | | DateTimeOffset? nextRetryUtc = null, |
| | | 118 | | CancellationToken cancellationToken = default) |
| | | 119 | | { |
| | 8 | 120 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 8 | 121 | | ArgumentNullException.ThrowIfNull(governanceEmissionError); |
| | 8 | 122 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 123 | | |
| | 8 | 124 | | GovernanceOutboxEntry updatedEntry = UpdateExistingEntry( |
| | 8 | 125 | | outboxEntryId, |
| | 8 | 126 | | entry => IsTerminal(entry) ? entry : entry.MarkFailed(governanceEmissionError, nextRetryUtc), |
| | 8 | 127 | | cancellationToken); |
| | | 128 | | |
| | 8 | 129 | | return ValueTask.FromResult(updatedEntry); |
| | | 130 | | } |
| | | 131 | | |
| | | 132 | | /// <inheritdoc /> |
| | | 133 | | public ValueTask<GovernanceOutboxEntry> MarkDeadLetteredAsync( |
| | | 134 | | string outboxEntryId, |
| | | 135 | | GovernanceEmissionError governanceEmissionError, |
| | | 136 | | string? deadLetterReason = null, |
| | | 137 | | CancellationToken cancellationToken = default) |
| | | 138 | | { |
| | 1 | 139 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 1 | 140 | | ArgumentNullException.ThrowIfNull(governanceEmissionError); |
| | 1 | 141 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 142 | | |
| | 1 | 143 | | GovernanceOutboxEntry updatedEntry = UpdateExistingEntry( |
| | 1 | 144 | | outboxEntryId, |
| | 1 | 145 | | entry => IsTerminal(entry) ? entry : entry.MarkDeadLettered(governanceEmissionError, deadLetterReason), |
| | 1 | 146 | | cancellationToken); |
| | | 147 | | |
| | 1 | 148 | | return ValueTask.FromResult(updatedEntry); |
| | | 149 | | } |
| | | 150 | | |
| | | 151 | | private GovernanceOutboxEntry SaveEntry( |
| | | 152 | | GovernanceOutboxEntry entry, |
| | | 153 | | CancellationToken cancellationToken) |
| | | 154 | | { |
| | | 155 | | while (true) |
| | | 156 | | { |
| | 1 | 157 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 158 | | |
| | 1 | 159 | | if (!entries.TryGetValue(entry.OutboxEntryId, out GovernanceOutboxEntry? currentEntry)) |
| | | 160 | | { |
| | 0 | 161 | | if (entries.TryAdd(entry.OutboxEntryId, entry)) |
| | | 162 | | { |
| | 0 | 163 | | return entry; |
| | | 164 | | } |
| | | 165 | | |
| | | 166 | | continue; |
| | | 167 | | } |
| | | 168 | | |
| | 1 | 169 | | if (IsTerminal(currentEntry)) |
| | | 170 | | { |
| | 1 | 171 | | return currentEntry; |
| | | 172 | | } |
| | | 173 | | |
| | 0 | 174 | | if (entries.TryUpdate(entry.OutboxEntryId, entry, currentEntry)) |
| | | 175 | | { |
| | 0 | 176 | | return entry; |
| | | 177 | | } |
| | | 178 | | } |
| | | 179 | | } |
| | | 180 | | |
| | | 181 | | private GovernanceOutboxEntry UpdateExistingEntry( |
| | | 182 | | string outboxEntryId, |
| | | 183 | | Func<GovernanceOutboxEntry, GovernanceOutboxEntry> updateEntry, |
| | | 184 | | CancellationToken cancellationToken) |
| | | 185 | | { |
| | 13 | 186 | | string normalizedOutboxEntryId = outboxEntryId.Trim(); |
| | | 187 | | |
| | | 188 | | while (true) |
| | | 189 | | { |
| | 13 | 190 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 191 | | |
| | 13 | 192 | | if (!entries.TryGetValue(normalizedOutboxEntryId, out GovernanceOutboxEntry? currentEntry)) |
| | | 193 | | { |
| | 0 | 194 | | throw new InvalidOperationException($"Outbox entry '{normalizedOutboxEntryId}' was not found."); |
| | | 195 | | } |
| | | 196 | | |
| | 13 | 197 | | GovernanceOutboxEntry updatedEntry = updateEntry(currentEntry); |
| | | 198 | | |
| | 13 | 199 | | if (ReferenceEquals(currentEntry, updatedEntry)) |
| | | 200 | | { |
| | 0 | 201 | | return currentEntry; |
| | | 202 | | } |
| | | 203 | | |
| | 13 | 204 | | if (entries.TryUpdate(normalizedOutboxEntryId, updatedEntry, currentEntry)) |
| | | 205 | | { |
| | 13 | 206 | | return updatedEntry; |
| | | 207 | | } |
| | | 208 | | } |
| | | 209 | | } |
| | | 210 | | |
| | | 211 | | private static bool IsTerminal(GovernanceOutboxEntry entry) |
| | | 212 | | { |
| | 14 | 213 | | return entry.IsDelivered || entry.IsDeadLettered; |
| | | 214 | | } |
| | | 215 | | |
| | | 216 | | private static int NormalizeMaxCount(int maxCount) |
| | | 217 | | { |
| | 12 | 218 | | return maxCount <= 0 |
| | 12 | 219 | | ? throw new ArgumentOutOfRangeException(nameof(maxCount), maxCount, "Maximum count must be greater than zero |
| | 12 | 220 | | : maxCount; |
| | | 221 | | } |
| | | 222 | | } |