Table of Contents

Interface IStreamPipelineBehavior<TRequest, TItem>

Namespace
SimpleMediator
Assembly
SimpleMediator.dll

Intercepts streaming handler execution to apply cross-cutting logic on each item.

public interface IStreamPipelineBehavior<TRequest, TItem> where TRequest : IStreamRequest<TItem>

Type Parameters

TRequest

Stream request type traversing the pipeline.

TItem

Type of each item yielded by the stream.

Examples

public sealed class StreamLoggingBehavior<TRequest, TItem> : IStreamPipelineBehavior<TRequest, TItem>
    where TRequest : IStreamRequest<TItem>
{
    public async IAsyncEnumerable<Either<MediatorError, TItem>> Handle(
        TRequest request,
        IRequestContext context,
        StreamHandlerCallback<TItem> nextStep,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        _logger.LogInformation("Stream started for {Request} (correlation: {CorrelationId})",
            typeof(TRequest).Name, context.CorrelationId);

        var count = 0;
        var errorCount = 0;

        await foreach (var item in nextStep().WithCancellation(cancellationToken))
        {
            item.Match(
                Left: _ => errorCount++,
                Right: _ => count++);

            yield return item;
        }

        _logger.LogInformation("Stream completed for {Request}: {Count} items, {ErrorCount} errors",
            typeof(TRequest).Name, count, errorCount);
    }
}

Remarks

Stream behaviors wrap the async enumerable returned by handlers or downstream behaviors, allowing inspection, transformation, or enrichment of each yielded item.

Unlike IPipelineBehavior<TRequest, TResponse>, stream behaviors operate on sequences rather than single values. Common use cases include logging item counts, applying rate limiting, enriching items with additional data, or filtering based on runtime conditions.

Behaviors are chained in reverse registration order. Each one can transform the stream from the next step before yielding items to the caller.

Methods

Handle(TRequest, IRequestContext, StreamHandlerCallback<TItem>, CancellationToken)

Executes the behavior logic around the next pipeline element in the stream.

IAsyncEnumerable<Either<MediatorError, TItem>> Handle(TRequest request, IRequestContext context, StreamHandlerCallback<TItem> nextStep, CancellationToken cancellationToken)

Parameters

request TRequest

Stream request being processed.

context IRequestContext

Ambient context with correlation ID, user info, tenant info, etc.

nextStep StreamHandlerCallback<TItem>

Callback to the next behavior or handler stream.

cancellationToken CancellationToken

Token to cancel stream iteration.

Returns

IAsyncEnumerable<Either<MediatorError, TItem>>

Async enumerable that yields items from the next step, potentially transformed or enriched.

Remarks

The nextStep callback returns the stream from the next behavior or handler. You must enumerate this stream (using await foreach) and yield its items to the caller.

Behaviors can:

  • Transform items: yield return item.Map(transform)
  • Filter items: only yield some items based on conditions
  • Enrich items: add metadata or context to each item
  • Log/monitor: track item counts, errors, or performance
  • Rate limit: introduce delays between items

Always use .WithCancellation(cancellationToken) when enumerating nextStep() to ensure proper cancellation propagation.

When implementing this interface, decorate the cancellation token parameter with [EnumeratorCancellation] attribute to ensure proper cancellation when iteration stops early.