< Summary

Information
Class: AsiBackbone.Core.Outbox.AsiBackboneGovernanceOutboxDrain
Assembly: AsiBackbone.Core
File(s): /home/runner/work/AsiBackbone/AsiBackbone/src/AsiBackbone.Core/Outbox/AsiBackboneGovernanceOutboxDrain.cs
Line coverage
100%
Covered lines: 91
Uncovered lines: 0
Coverable lines: 91
Total lines: 177
Line coverage: 100%
Branch coverage
91%
Covered branches: 33
Total branches: 36
Branch coverage: 91.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%44100%
DrainAsync()91.66%1212100%
DrainEntryAsync()100%11100%
ApplyEmissionResultAsync()90%2020100%

File(s)

/home/runner/work/AsiBackbone/AsiBackbone/src/AsiBackbone.Core/Outbox/AsiBackboneGovernanceOutboxDrain.cs

#LineLine coverage
 1using AsiBackbone.Core.Emissions;
 2
 3namespace AsiBackbone.Core.Outbox;
 4
 5/// <summary>
 6/// Drains provider-neutral governance outbox entries through a configured governance emitter.
 7/// </summary>
 8/// <remarks>
 9/// This drain path is provider-neutral. It is suitable for tests, samples, local validation, and host-owned workers tha
 10/// </remarks>
 11/// <remarks>
 12/// Initializes a new instance of the <see cref="AsiBackboneGovernanceOutboxDrain" /> class.
 13/// </remarks>
 14/// <param name="outboxStore">The provider-neutral outbox store.</param>
 15/// <param name="emitter">The provider-neutral governance emitter.</param>
 2416public sealed class AsiBackboneGovernanceOutboxDrain(
 2417    IAsiBackboneGovernanceOutboxStore outboxStore,
 2418    IAsiBackboneGovernanceEmitter emitter)
 19{
 2620    private readonly IAsiBackboneGovernanceOutboxStore outboxStore = outboxStore ?? throw new ArgumentNullException(name
 2521    private readonly IAsiBackboneGovernanceEmitter emitter = emitter ?? throw new ArgumentNullException(nameof(emitter))
 22
 23    /// <summary>
 24    /// Drains pending and retry-ready outbox entries through the configured emitter.
 25    /// </summary>
 26    /// <param name="utcNow">The UTC timestamp used for retry-ready checks.</param>
 27    /// <param name="maxCount">The maximum number of entries to drain.</param>
 28    /// <param name="cancellationToken">A cancellation token.</param>
 29    /// <returns>The updated outbox entries that were attempted by the drain.</returns>
 30    public async ValueTask<IReadOnlyList<GovernanceOutboxEntry>> DrainAsync(
 31        DateTimeOffset? utcNow = null,
 32        int maxCount = 100,
 33        CancellationToken cancellationToken = default)
 34    {
 2435        if (maxCount <= 0)
 36        {
 137            throw new ArgumentOutOfRangeException(nameof(maxCount), maxCount, "Maximum count must be greater than zero."
 38        }
 39
 2340        cancellationToken.ThrowIfCancellationRequested();
 41
 2342        DateTimeOffset drainUtc = (utcNow ?? DateTimeOffset.UtcNow).ToUniversalTime();
 2343        IReadOnlyList<GovernanceOutboxEntry> pendingEntries = await outboxStore
 2344            .FindPendingAsync(maxCount, cancellationToken)
 2345            .ConfigureAwait(false);
 46
 2347        List<GovernanceOutboxEntry> entriesToDrain = [.. pendingEntries];
 48
 2349        if (entriesToDrain.Count < maxCount)
 50        {
 1251            IReadOnlyList<GovernanceOutboxEntry> retryReadyEntries = await outboxStore
 1252                .FindRetryReadyAsync(drainUtc, maxCount - entriesToDrain.Count, cancellationToken)
 1253                .ConfigureAwait(false);
 54
 1255            HashSet<string> existingEntryIds = new(
 1056                entriesToDrain.Select(entry => entry.OutboxEntryId),
 1257                StringComparer.Ordinal);
 58
 2859            foreach (GovernanceOutboxEntry retryReadyEntry in retryReadyEntries)
 60            {
 261                if (existingEntryIds.Add(retryReadyEntry.OutboxEntryId))
 62                {
 163                    entriesToDrain.Add(retryReadyEntry);
 64                }
 65            }
 66        }
 67
 2368        List<GovernanceOutboxEntry> updatedEntries = new(entriesToDrain.Count);
 69
 10170        foreach (GovernanceOutboxEntry entry in entriesToDrain)
 71        {
 2972            cancellationToken.ThrowIfCancellationRequested();
 2973            GovernanceOutboxEntry updatedEntry = await DrainEntryAsync(entry, drainUtc, cancellationToken).ConfigureAwai
 2674            updatedEntries.Add(updatedEntry);
 75        }
 76
 2077        return updatedEntries;
 2078    }
 79
 80    private async ValueTask<GovernanceOutboxEntry> DrainEntryAsync(
 81        GovernanceOutboxEntry entry,
 82        DateTimeOffset drainUtc,
 83        CancellationToken cancellationToken)
 84    {
 85        GovernanceEmissionResult result;
 86
 87        try
 88        {
 2989            result = await emitter.EmitAsync(entry.Envelope, cancellationToken).ConfigureAwait(false);
 2590        }
 191        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 92        {
 193            throw;
 94        }
 395        catch (Exception ex)
 96        {
 397            var governanceEmissionError = GovernanceEmissionError.Create(
 398                "emission.exception",
 399                $"Governance emission threw {ex.GetType().Name} during outbox drain.",
 3100                isRetryable: true,
 3101                providerErrorCode: ex.GetType().FullName);
 102
 3103            return await outboxStore.MarkFailedAsync(
 3104                entry.OutboxEntryId,
 3105                governanceEmissionError,
 3106                nextRetryUtc: drainUtc.AddMinutes(1),
 3107                cancellationToken)
 3108                .ConfigureAwait(false);
 109        }
 110
 25111        return await ApplyEmissionResultAsync(entry, result, drainUtc, cancellationToken).ConfigureAwait(false);
 26112    }
 113
 114    private async ValueTask<GovernanceOutboxEntry> ApplyEmissionResultAsync(
 115        GovernanceOutboxEntry entry,
 116        GovernanceEmissionResult result,
 117        DateTimeOffset drainUtc,
 118        CancellationToken cancellationToken)
 119    {
 25120        ArgumentNullException.ThrowIfNull(result);
 121
 25122        if (result.IsSuccess)
 123        {
 13124            return await outboxStore.MarkDeliveredAsync(
 13125                entry.OutboxEntryId,
 13126                result,
 13127                cancellationToken)
 13128                .ConfigureAwait(false);
 129        }
 130
 12131        if (result.Status is GovernanceEmissionStatus.DeadLettered)
 132        {
 1133            GovernanceEmissionError governanceEmissionError = result.Error ?? GovernanceEmissionError.Create(
 1134                "emission.deadlettered",
 1135                "Governance emission returned a dead-lettered result.",
 1136                providerName: result.ProviderName);
 137
 1138            return await outboxStore.MarkDeadLetteredAsync(
 1139                entry.OutboxEntryId,
 1140                governanceEmissionError,
 1141                governanceEmissionError.Message,
 1142                cancellationToken)
 1143                .ConfigureAwait(false);
 144        }
 145
 11146        if (result.Status is GovernanceEmissionStatus.Deferred or GovernanceEmissionStatus.Pending)
 147        {
 2148            GovernanceEmissionError? governanceEmissionError = result.Error ?? (result.Status is GovernanceEmissionStatu
 2149                ? GovernanceEmissionError.Create(
 2150                    "emission.pending",
 2151                    "Governance emission remained pending after the outbox drain attempt.",
 2152                    isRetryable: true,
 2153                    providerName: result.ProviderName)
 2154                : null);
 155
 2156            GovernanceOutboxEntry deferredEntry = entry.MarkDeferred(
 2157                governanceEmissionError,
 2158                result.RetryAfterUtc ?? drainUtc.AddMinutes(1),
 2159                drainUtc);
 160
 2161            return await outboxStore.SaveAsync(deferredEntry, cancellationToken).ConfigureAwait(false);
 162        }
 163
 9164        GovernanceEmissionError failure = result.Error ?? GovernanceEmissionError.Create(
 9165            "emission.failed",
 9166            "Governance emission returned a failed result without provider-neutral error details.",
 9167            isRetryable: result.ShouldRetry,
 9168            providerName: result.ProviderName);
 169
 9170        return await outboxStore.MarkFailedAsync(
 9171            entry.OutboxEntryId,
 9172            failure,
 9173            result.RetryAfterUtc,
 9174            cancellationToken)
 9175            .ConfigureAwait(false);
 23176    }
 177}