| | | 1 | | using System.Collections.ObjectModel; |
| | | 2 | | using System.Text.Json; |
| | | 3 | | using AsiBackbone.Core.Emissions; |
| | | 4 | | using AsiBackbone.Core.Entities; |
| | | 5 | | using AsiBackbone.Core.Outbox; |
| | | 6 | | using AsiBackbone.EntityFrameworkCore.Persistence; |
| | | 7 | | using Microsoft.EntityFrameworkCore; |
| | | 8 | | |
| | | 9 | | namespace AsiBackbone.EntityFrameworkCore.Outbox; |
| | | 10 | | |
| | | 11 | | /// <summary> |
| | | 12 | | /// Entity Framework Core-backed governance outbox store that persists provider-neutral emission envelopes through a hos |
| | | 13 | | /// </summary> |
| | | 14 | | /// <remarks> |
| | | 15 | | /// This store provides durable local storage only. Provider delivery, telemetry export, SIEM routing, and cloud emissio |
| | | 16 | | /// </remarks> |
| | | 17 | | public sealed class EfCoreGovernanceOutboxStore : IAsiBackboneGovernanceOutboxStore |
| | | 18 | | { |
| | 2 | 19 | | private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); |
| | | 20 | | |
| | | 21 | | private readonly DbContext dbContext; |
| | | 22 | | |
| | | 23 | | /// <summary> |
| | | 24 | | /// Initializes a new instance of the <see cref="EfCoreGovernanceOutboxStore" /> class. |
| | | 25 | | /// </summary> |
| | | 26 | | /// <param name="dbContext">The host-owned database context.</param> |
| | 54 | 27 | | public EfCoreGovernanceOutboxStore(DbContext dbContext) |
| | | 28 | | { |
| | 54 | 29 | | ArgumentNullException.ThrowIfNull(dbContext); |
| | | 30 | | |
| | 54 | 31 | | this.dbContext = dbContext; |
| | 54 | 32 | | } |
| | | 33 | | |
| | | 34 | | /// <inheritdoc /> |
| | | 35 | | public async ValueTask<GovernanceOutboxEntry> EnqueueAsync( |
| | | 36 | | GovernanceEmissionEnvelope envelope, |
| | | 37 | | CancellationToken cancellationToken = default) |
| | | 38 | | { |
| | 54 | 39 | | ArgumentNullException.ThrowIfNull(envelope); |
| | 54 | 40 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 41 | | |
| | 54 | 42 | | var entry = GovernanceOutboxEntry.Create(envelope); |
| | | 43 | | |
| | 54 | 44 | | _ = await dbContext |
| | 54 | 45 | | .Set<AsiBackboneGovernanceOutboxEntryEntity>() |
| | 54 | 46 | | .AddAsync(ToEntity(entry), cancellationToken) |
| | 54 | 47 | | .ConfigureAwait(false); |
| | | 48 | | |
| | 54 | 49 | | _ = await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); |
| | | 50 | | |
| | 54 | 51 | | return entry; |
| | 54 | 52 | | } |
| | | 53 | | |
| | | 54 | | /// <inheritdoc /> |
| | | 55 | | public async ValueTask<GovernanceOutboxEntry> SaveAsync( |
| | | 56 | | GovernanceOutboxEntry entry, |
| | | 57 | | CancellationToken cancellationToken = default) |
| | | 58 | | { |
| | 22 | 59 | | ArgumentNullException.ThrowIfNull(entry); |
| | 22 | 60 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 61 | | |
| | 22 | 62 | | AsiBackboneGovernanceOutboxEntryEntity persistedEntity = ToEntity(entry); |
| | 22 | 63 | | AsiBackboneGovernanceOutboxEntryEntity? existingEntity = await dbContext |
| | 22 | 64 | | .Set<AsiBackboneGovernanceOutboxEntryEntity>() |
| | 22 | 65 | | .SingleOrDefaultAsync(entity => entity.OutboxEntryId == entry.OutboxEntryId, cancellationToken) |
| | 22 | 66 | | .ConfigureAwait(false); |
| | | 67 | | |
| | 22 | 68 | | if (existingEntity is null) |
| | | 69 | | { |
| | 0 | 70 | | _ = await dbContext |
| | 0 | 71 | | .Set<AsiBackboneGovernanceOutboxEntryEntity>() |
| | 0 | 72 | | .AddAsync(persistedEntity, cancellationToken) |
| | 0 | 73 | | .ConfigureAwait(false); |
| | | 74 | | } |
| | | 75 | | else |
| | | 76 | | { |
| | 22 | 77 | | persistedEntity.Id = existingEntity.Id; |
| | 22 | 78 | | persistedEntity.ConcurrencyStamp = AsiBackboneEntity.NewConcurrencyStamp(); |
| | 22 | 79 | | dbContext.Entry(existingEntity).CurrentValues.SetValues(persistedEntity); |
| | | 80 | | } |
| | | 81 | | |
| | 22 | 82 | | _ = await dbContext.SaveChangesAsync(cancellationToken).ConfigureAwait(false); |
| | | 83 | | |
| | 20 | 84 | | return entry; |
| | 20 | 85 | | } |
| | | 86 | | |
| | | 87 | | /// <inheritdoc /> |
| | | 88 | | public async ValueTask<GovernanceOutboxEntry?> FindByOutboxEntryIdAsync( |
| | | 89 | | string outboxEntryId, |
| | | 90 | | CancellationToken cancellationToken = default) |
| | | 91 | | { |
| | 28 | 92 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | | 93 | | |
| | 28 | 94 | | string normalizedOutboxEntryId = outboxEntryId.Trim(); |
| | | 95 | | |
| | 28 | 96 | | AsiBackboneGovernanceOutboxEntryEntity? entity = await OutboxEntries() |
| | 28 | 97 | | .Where(outboxEntry => outboxEntry.OutboxEntryId == normalizedOutboxEntryId) |
| | 28 | 98 | | .SingleOrDefaultAsync(cancellationToken) |
| | 28 | 99 | | .ConfigureAwait(false); |
| | | 100 | | |
| | 28 | 101 | | return entity is null ? null : ToEntry(entity); |
| | 28 | 102 | | } |
| | | 103 | | |
| | | 104 | | /// <inheritdoc /> |
| | | 105 | | public async ValueTask<IReadOnlyList<GovernanceOutboxEntry>> FindPendingAsync( |
| | | 106 | | int maxCount = 100, |
| | | 107 | | CancellationToken cancellationToken = default) |
| | | 108 | | { |
| | 12 | 109 | | int normalizedMaxCount = NormalizeMaxCount(maxCount); |
| | | 110 | | |
| | 12 | 111 | | List<AsiBackboneGovernanceOutboxEntryEntity> entities = await OutboxEntries() |
| | 12 | 112 | | .Where(outboxEntry => outboxEntry.Status == GovernanceEmissionStatus.Pending) |
| | 12 | 113 | | .ToListAsync(cancellationToken) |
| | 12 | 114 | | .ConfigureAwait(false); |
| | | 115 | | |
| | 12 | 116 | | return [.. ToEntries(entities) |
| | 46 | 117 | | .OrderBy(entry => entry.CreatedUtc) |
| | 46 | 118 | | .ThenBy(entry => entry.OutboxEntryId, StringComparer.Ordinal) |
| | 12 | 119 | | .Take(normalizedMaxCount)]; |
| | 12 | 120 | | } |
| | | 121 | | |
| | | 122 | | /// <inheritdoc /> |
| | | 123 | | public async ValueTask<IReadOnlyList<GovernanceOutboxEntry>> FindRetryReadyAsync( |
| | | 124 | | DateTimeOffset utcNow, |
| | | 125 | | int maxCount = 100, |
| | | 126 | | CancellationToken cancellationToken = default) |
| | | 127 | | { |
| | 6 | 128 | | int normalizedMaxCount = NormalizeMaxCount(maxCount); |
| | 6 | 129 | | DateTimeOffset normalizedUtcNow = utcNow.ToUniversalTime(); |
| | | 130 | | |
| | 6 | 131 | | List<AsiBackboneGovernanceOutboxEntryEntity> entities = await OutboxEntries() |
| | 6 | 132 | | .Where(outboxEntry => |
| | 6 | 133 | | outboxEntry.Status == GovernanceEmissionStatus.Deferred || |
| | 6 | 134 | | outboxEntry.Status == GovernanceEmissionStatus.Failed || |
| | 6 | 135 | | outboxEntry.Status == GovernanceEmissionStatus.RetryableFailure) |
| | 6 | 136 | | .ToListAsync(cancellationToken) |
| | 6 | 137 | | .ConfigureAwait(false); |
| | | 138 | | |
| | 6 | 139 | | return [.. ToEntries(entities) |
| | 22 | 140 | | .Where(entry => entry.IsRetryReady(normalizedUtcNow)) |
| | 14 | 141 | | .OrderBy(entry => entry.NextRetryUtc ?? entry.UpdatedUtc) |
| | 14 | 142 | | .ThenBy(entry => entry.OutboxEntryId, StringComparer.Ordinal) |
| | 6 | 143 | | .Take(normalizedMaxCount)]; |
| | 6 | 144 | | } |
| | | 145 | | |
| | | 146 | | /// <inheritdoc /> |
| | | 147 | | public async ValueTask<GovernanceOutboxEntry> MarkDeliveredAsync( |
| | | 148 | | string outboxEntryId, |
| | | 149 | | GovernanceEmissionResult result, |
| | | 150 | | CancellationToken cancellationToken = default) |
| | | 151 | | { |
| | 6 | 152 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 6 | 153 | | ArgumentNullException.ThrowIfNull(result); |
| | | 154 | | |
| | 6 | 155 | | GovernanceOutboxEntry entry = await RequireEntryAsync(outboxEntryId, cancellationToken).ConfigureAwait(false); |
| | 6 | 156 | | GovernanceOutboxEntry updatedEntry = entry.MarkDelivered(result); |
| | | 157 | | |
| | 6 | 158 | | return await SaveAsync(updatedEntry, cancellationToken).ConfigureAwait(false); |
| | 4 | 159 | | } |
| | | 160 | | |
| | | 161 | | /// <inheritdoc /> |
| | | 162 | | public async ValueTask<GovernanceOutboxEntry> MarkFailedAsync( |
| | | 163 | | string outboxEntryId, |
| | | 164 | | GovernanceEmissionError governanceEmissionError, |
| | | 165 | | DateTimeOffset? nextRetryUtc = null, |
| | | 166 | | CancellationToken cancellationToken = default) |
| | | 167 | | { |
| | 12 | 168 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 12 | 169 | | ArgumentNullException.ThrowIfNull(governanceEmissionError); |
| | | 170 | | |
| | 12 | 171 | | GovernanceOutboxEntry entry = await RequireEntryAsync(outboxEntryId, cancellationToken).ConfigureAwait(false); |
| | 12 | 172 | | GovernanceOutboxEntry updatedEntry = entry.MarkFailed(governanceEmissionError, nextRetryUtc); |
| | | 173 | | |
| | 12 | 174 | | return await SaveAsync(updatedEntry, cancellationToken).ConfigureAwait(false); |
| | 12 | 175 | | } |
| | | 176 | | |
| | | 177 | | /// <inheritdoc /> |
| | | 178 | | public async ValueTask<GovernanceOutboxEntry> MarkDeadLetteredAsync( |
| | | 179 | | string outboxEntryId, |
| | | 180 | | GovernanceEmissionError governanceEmissionError, |
| | | 181 | | string? deadLetterReason = null, |
| | | 182 | | CancellationToken cancellationToken = default) |
| | | 183 | | { |
| | 2 | 184 | | ArgumentException.ThrowIfNullOrWhiteSpace(outboxEntryId); |
| | 2 | 185 | | ArgumentNullException.ThrowIfNull(governanceEmissionError); |
| | | 186 | | |
| | 2 | 187 | | GovernanceOutboxEntry entry = await RequireEntryAsync(outboxEntryId, cancellationToken).ConfigureAwait(false); |
| | 2 | 188 | | GovernanceOutboxEntry updatedEntry = entry.MarkDeadLettered(governanceEmissionError, deadLetterReason); |
| | | 189 | | |
| | 2 | 190 | | return await SaveAsync(updatedEntry, cancellationToken).ConfigureAwait(false); |
| | 2 | 191 | | } |
| | | 192 | | |
| | | 193 | | private async ValueTask<GovernanceOutboxEntry> RequireEntryAsync( |
| | | 194 | | string outboxEntryId, |
| | | 195 | | CancellationToken cancellationToken) |
| | | 196 | | { |
| | 20 | 197 | | GovernanceOutboxEntry? entry = await FindByOutboxEntryIdAsync(outboxEntryId, cancellationToken).ConfigureAwait(f |
| | | 198 | | |
| | 20 | 199 | | return entry ?? throw new InvalidOperationException($"Outbox entry '{outboxEntryId.Trim()}' was not found."); |
| | 20 | 200 | | } |
| | | 201 | | |
| | | 202 | | private IQueryable<AsiBackboneGovernanceOutboxEntryEntity> OutboxEntries() |
| | | 203 | | { |
| | 46 | 204 | | return dbContext.Set<AsiBackboneGovernanceOutboxEntryEntity>().AsNoTracking(); |
| | | 205 | | } |
| | | 206 | | |
| | | 207 | | private static AsiBackboneGovernanceOutboxEntryEntity ToEntity(GovernanceOutboxEntry entry) |
| | | 208 | | { |
| | 76 | 209 | | GovernanceEmissionEnvelope envelope = entry.Envelope; |
| | 76 | 210 | | GovernanceEmissionPayload? payload = envelope.Payload; |
| | 76 | 211 | | GovernanceEmissionError? lastError = entry.LastError; |
| | | 212 | | |
| | 76 | 213 | | return new AsiBackboneGovernanceOutboxEntryEntity |
| | 76 | 214 | | { |
| | 76 | 215 | | OutboxEntryId = entry.OutboxEntryId, |
| | 76 | 216 | | Status = entry.Status, |
| | 76 | 217 | | CreatedUtc = entry.CreatedUtc, |
| | 76 | 218 | | UpdatedUtc = entry.UpdatedUtc, |
| | 76 | 219 | | DeliveredUtc = entry.Status is GovernanceEmissionStatus.Delivered ? entry.UpdatedUtc : null, |
| | 76 | 220 | | RetryCount = entry.RetryCount, |
| | 76 | 221 | | MaxRetryCount = entry.MaxRetryCount, |
| | 76 | 222 | | NextRetryUtc = entry.NextRetryUtc, |
| | 76 | 223 | | ProviderName = entry.ProviderName, |
| | 76 | 224 | | ProviderRecordId = entry.ProviderRecordId, |
| | 76 | 225 | | DeadLetterReason = entry.DeadLetterReason, |
| | 76 | 226 | | LastErrorCode = lastError?.Code, |
| | 76 | 227 | | LastErrorMessage = lastError?.Message, |
| | 76 | 228 | | LastErrorIsRetryable = lastError?.IsRetryable, |
| | 76 | 229 | | LastErrorProviderName = lastError?.ProviderName, |
| | 76 | 230 | | LastErrorProviderErrorCode = lastError?.ProviderErrorCode, |
| | 76 | 231 | | MetadataJson = JsonSerializer.Serialize(entry.Metadata, JsonOptions), |
| | 76 | 232 | | EnvelopeId = envelope.EnvelopeId, |
| | 76 | 233 | | EnvelopeSchemaVersion = envelope.SchemaVersion, |
| | 76 | 234 | | EnvelopeEventType = envelope.EventType, |
| | 76 | 235 | | EnvelopeEventId = envelope.EventId, |
| | 76 | 236 | | EnvelopeOccurredUtc = envelope.OccurredUtc, |
| | 76 | 237 | | EnvelopeCreatedUtc = envelope.CreatedUtc, |
| | 76 | 238 | | EnvelopeCorrelationId = envelope.CorrelationId, |
| | 76 | 239 | | EnvelopeAuditResidueId = envelope.AuditResidueId, |
| | 76 | 240 | | EnvelopeLifecycleStage = envelope.LifecycleStage, |
| | 76 | 241 | | EnvelopeLifecycleStageSequence = envelope.LifecycleStageSequence, |
| | 76 | 242 | | EnvelopePolicyVersion = envelope.PolicyVersion, |
| | 76 | 243 | | EnvelopePolicyHash = envelope.PolicyHash, |
| | 76 | 244 | | EnvelopeTraceId = envelope.TraceId, |
| | 76 | 245 | | EnvelopeSpanId = envelope.SpanId, |
| | 76 | 246 | | EnvelopeParentSpanId = envelope.ParentSpanId, |
| | 76 | 247 | | EnvelopeOperationName = envelope.OperationName, |
| | 76 | 248 | | EnvelopeOutcome = envelope.Outcome, |
| | 76 | 249 | | EnvelopeActorId = envelope.ActorId, |
| | 76 | 250 | | EnvelopeEmitterStatus = envelope.EmitterStatus, |
| | 76 | 251 | | EnvelopeEmitterProvider = envelope.EmitterProvider, |
| | 76 | 252 | | EnvelopeOutboxSequence = envelope.OutboxSequence, |
| | 76 | 253 | | EnvelopeGatewayExecutionId = envelope.GatewayExecutionId, |
| | 76 | 254 | | EnvelopeDecisionStage = envelope.DecisionStage, |
| | 76 | 255 | | EnvelopeMetadataJson = JsonSerializer.Serialize(envelope.Metadata, JsonOptions), |
| | 76 | 256 | | EnvelopePayloadType = payload?.PayloadType, |
| | 76 | 257 | | EnvelopePayloadSchemaVersion = payload?.SchemaVersion, |
| | 76 | 258 | | EnvelopePayloadContentType = payload?.ContentType, |
| | 76 | 259 | | EnvelopePayloadContentHash = payload?.ContentHash, |
| | 76 | 260 | | EnvelopePayloadSizeBytes = payload?.SizeBytes, |
| | 76 | 261 | | EnvelopePayloadMetadataJson = JsonSerializer.Serialize(payload?.Metadata ?? EmptyMetadata(), JsonOptions) |
| | 76 | 262 | | }; |
| | | 263 | | } |
| | | 264 | | |
| | | 265 | | private static GovernanceOutboxEntry[] ToEntries(IEnumerable<AsiBackboneGovernanceOutboxEntryEntity> entities) |
| | | 266 | | { |
| | 18 | 267 | | return [.. entities.Select(ToEntry)]; |
| | | 268 | | } |
| | | 269 | | |
| | | 270 | | private static GovernanceOutboxEntry ToEntry(AsiBackboneGovernanceOutboxEntryEntity entity) |
| | | 271 | | { |
| | 96 | 272 | | GovernanceEmissionPayload? payload = string.IsNullOrWhiteSpace(entity.EnvelopePayloadType) |
| | 96 | 273 | | ? null |
| | 96 | 274 | | : GovernanceEmissionPayload.Create( |
| | 96 | 275 | | entity.EnvelopePayloadType, |
| | 96 | 276 | | entity.EnvelopePayloadSchemaVersion, |
| | 96 | 277 | | entity.EnvelopePayloadContentType, |
| | 96 | 278 | | entity.EnvelopePayloadContentHash, |
| | 96 | 279 | | entity.EnvelopePayloadSizeBytes, |
| | 96 | 280 | | DeserializeMetadata(entity.EnvelopePayloadMetadataJson)); |
| | | 281 | | |
| | 96 | 282 | | var envelope = GovernanceEmissionEnvelope.Create( |
| | 96 | 283 | | entity.EnvelopeEventType, |
| | 96 | 284 | | entity.EnvelopeEventId, |
| | 96 | 285 | | entity.EnvelopeOccurredUtc, |
| | 96 | 286 | | entity.EnvelopeId, |
| | 96 | 287 | | entity.EnvelopeCreatedUtc, |
| | 96 | 288 | | entity.EnvelopeSchemaVersion, |
| | 96 | 289 | | entity.EnvelopeCorrelationId, |
| | 96 | 290 | | entity.EnvelopeAuditResidueId, |
| | 96 | 291 | | entity.EnvelopeLifecycleStage, |
| | 96 | 292 | | entity.EnvelopePolicyVersion, |
| | 96 | 293 | | entity.EnvelopePolicyHash, |
| | 96 | 294 | | entity.EnvelopeTraceId, |
| | 96 | 295 | | entity.EnvelopeSpanId, |
| | 96 | 296 | | entity.EnvelopeParentSpanId, |
| | 96 | 297 | | entity.EnvelopeOperationName, |
| | 96 | 298 | | entity.EnvelopeOutcome, |
| | 96 | 299 | | entity.EnvelopeActorId, |
| | 96 | 300 | | entity.EnvelopeEmitterStatus, |
| | 96 | 301 | | entity.EnvelopeEmitterProvider, |
| | 96 | 302 | | entity.EnvelopeOutboxSequence, |
| | 96 | 303 | | entity.EnvelopeGatewayExecutionId, |
| | 96 | 304 | | entity.EnvelopeDecisionStage, |
| | 96 | 305 | | payload, |
| | 96 | 306 | | DeserializeMetadata(entity.EnvelopeMetadataJson)); |
| | | 307 | | |
| | 96 | 308 | | GovernanceEmissionError? lastError = string.IsNullOrWhiteSpace(entity.LastErrorCode) || string.IsNullOrWhiteSpac |
| | 96 | 309 | | ? null |
| | 96 | 310 | | : GovernanceEmissionError.Create( |
| | 96 | 311 | | entity.LastErrorCode, |
| | 96 | 312 | | entity.LastErrorMessage, |
| | 96 | 313 | | entity.LastErrorIsRetryable ?? false, |
| | 96 | 314 | | entity.LastErrorProviderName, |
| | 96 | 315 | | entity.LastErrorProviderErrorCode); |
| | | 316 | | |
| | 96 | 317 | | return GovernanceOutboxEntry.Restore( |
| | 96 | 318 | | envelope, |
| | 96 | 319 | | entity.Status, |
| | 96 | 320 | | entity.OutboxEntryId, |
| | 96 | 321 | | entity.CreatedUtc, |
| | 96 | 322 | | entity.UpdatedUtc, |
| | 96 | 323 | | entity.RetryCount, |
| | 96 | 324 | | entity.MaxRetryCount, |
| | 96 | 325 | | entity.NextRetryUtc, |
| | 96 | 326 | | lastError, |
| | 96 | 327 | | entity.ProviderName, |
| | 96 | 328 | | entity.ProviderRecordId, |
| | 96 | 329 | | entity.DeadLetterReason, |
| | 96 | 330 | | DeserializeMetadata(entity.MetadataJson)); |
| | | 331 | | } |
| | | 332 | | |
| | | 333 | | private static ReadOnlyDictionary<string, string> DeserializeMetadata(string? json) |
| | | 334 | | { |
| | 214 | 335 | | if (string.IsNullOrWhiteSpace(json)) |
| | | 336 | | { |
| | 0 | 337 | | return EmptyMetadata(); |
| | | 338 | | } |
| | | 339 | | |
| | 214 | 340 | | Dictionary<string, string>? metadata = JsonSerializer.Deserialize<Dictionary<string, string>>(json, JsonOptions) |
| | | 341 | | |
| | 214 | 342 | | return metadata is null || metadata.Count == 0 |
| | 214 | 343 | | ? EmptyMetadata() |
| | 214 | 344 | | : new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(metadata, StringComparer.Ordinal)); |
| | | 345 | | } |
| | | 346 | | |
| | | 347 | | private static ReadOnlyDictionary<string, string> EmptyMetadata() |
| | | 348 | | { |
| | 146 | 349 | | return new ReadOnlyDictionary<string, string>(new Dictionary<string, string>(StringComparer.Ordinal)); |
| | | 350 | | } |
| | | 351 | | |
| | | 352 | | private static int NormalizeMaxCount(int maxCount) |
| | | 353 | | { |
| | 18 | 354 | | return maxCount <= 0 |
| | 18 | 355 | | ? throw new ArgumentOutOfRangeException(nameof(maxCount), maxCount, "Maximum count must be greater than zero |
| | 18 | 356 | | : maxCount; |
| | | 357 | | } |
| | | 358 | | } |