using System.Threading.Channels; namespace Manager.App.Extensions; public static class AsyncEnumerableExtensions { public static async IAsyncEnumerable Merge(IEnumerable> sources, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { var channel = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); var writerTasks = sources.Select(source => Task.Run(async () => { try { await foreach (var item in source.WithCancellation(cancellationToken)) { await channel.Writer.WriteAsync(item, cancellationToken); } } catch (OperationCanceledException) { } catch (Exception ex) { channel.Writer.TryComplete(ex); } }, cancellationToken)).ToArray(); _ = Task.Run(async () => { try { await Task.WhenAll(writerTasks); channel.Writer.TryComplete(); } catch { channel.Writer.TryComplete(); } }, cancellationToken); await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)) { yield return item; } } }