Interface IStreamPipelineBehavior<TRequest, TItem>
- Namespace
- SimpleMediator
- Assembly
- SimpleMediator.dll
Intercepts streaming handler execution to apply cross-cutting logic on each item.
public interface IStreamPipelineBehavior<TRequest, TItem> where TRequest : IStreamRequest<TItem>
Type Parameters
TRequestStream request type traversing the pipeline.
TItemType of each item yielded by the stream.
Examples
public sealed class StreamLoggingBehavior<TRequest, TItem> : IStreamPipelineBehavior<TRequest, TItem>
where TRequest : IStreamRequest<TItem>
{
public async IAsyncEnumerable<Either<MediatorError, TItem>> Handle(
TRequest request,
IRequestContext context,
StreamHandlerCallback<TItem> nextStep,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
_logger.LogInformation("Stream started for {Request} (correlation: {CorrelationId})",
typeof(TRequest).Name, context.CorrelationId);
var count = 0;
var errorCount = 0;
await foreach (var item in nextStep().WithCancellation(cancellationToken))
{
item.Match(
Left: _ => errorCount++,
Right: _ => count++);
yield return item;
}
_logger.LogInformation("Stream completed for {Request}: {Count} items, {ErrorCount} errors",
typeof(TRequest).Name, count, errorCount);
}
}
Remarks
Stream behaviors wrap the async enumerable returned by handlers or downstream behaviors, allowing inspection, transformation, or enrichment of each yielded item.
Unlike IPipelineBehavior<TRequest, TResponse>, stream behaviors operate on sequences rather than single values. Common use cases include logging item counts, applying rate limiting, enriching items with additional data, or filtering based on runtime conditions.
Behaviors are chained in reverse registration order. Each one can transform the stream from the next step before yielding items to the caller.
Methods
Handle(TRequest, IRequestContext, StreamHandlerCallback<TItem>, CancellationToken)
Executes the behavior logic around the next pipeline element in the stream.
IAsyncEnumerable<Either<MediatorError, TItem>> Handle(TRequest request, IRequestContext context, StreamHandlerCallback<TItem> nextStep, CancellationToken cancellationToken)
Parameters
requestTRequestStream request being processed.
contextIRequestContextAmbient context with correlation ID, user info, tenant info, etc.
nextStepStreamHandlerCallback<TItem>Callback to the next behavior or handler stream.
cancellationTokenCancellationTokenToken to cancel stream iteration.
Returns
- IAsyncEnumerable<Either<MediatorError, TItem>>
Async enumerable that yields items from the next step, potentially transformed or enriched.
Remarks
The nextStep callback returns the stream from the next behavior or handler.
You must enumerate this stream (using await foreach) and yield its items to the caller.
Behaviors can:
- Transform items:
yield return item.Map(transform) - Filter items: only yield some items based on conditions
- Enrich items: add metadata or context to each item
- Log/monitor: track item counts, errors, or performance
- Rate limit: introduce delays between items
Always use .WithCancellation(cancellationToken) when enumerating nextStep()
to ensure proper cancellation propagation.
When implementing this interface, decorate the cancellation token parameter with
[EnumeratorCancellation] attribute to ensure proper cancellation when iteration stops early.