当前位置: 首页 > news >正文

【.NET并发编程 - 16】IAsyncEnumerable 异步流:边加载边处理的优雅之道

16. IAsyncEnumerable 异步流:边加载边处理的优雅之道

本章 GitHub 仓库:csharp-concurrency-cookbook ⭐

欢迎 Star 和 Fork!所有代码示例都可以在仓库中找到并运行。


🎯 本章导读

📌 本文目标:掌握 IAsyncEnumerable 的使用场景、核心语法和性能优化技巧,实现真正的流式异步数据处理。

你是否遇到过这样的场景:

  • 从数据库查询 10 万条记录,要等全部加载完才能开始处理?
  • 调用分页 API 获取数据,必须等待所有页都下载完?
  • 实时日志流、传感器数据流,如何边接收边处理?
  • 大文件读取时,如何避免一次性加载到内存导致 OOM?

今天,我们就来彻底搞懂 IAsyncEnumerable——.NET 异步流的标准解决方案(C# 8.0 引入,.NET Core 3.0+ 支持)。

⚠️ 前置知识:本文涉及 async/await、yield return、Task 等概念,建议先掌握前面章节的异步编程基础(第 03-06 章)。


0️⃣ 一个真实的故事:GitHub API 的分页加载困境

0.1 场景重现:获取某个仓库的所有 Issue

假设你要写一个工具,获取 GitHub 某个热门仓库的所有 Issue(可能有上万个),然后统计标签分布、关键词频率等。

GitHub API 是分页的:每次请求最多返回 100 条,需要通过 page 参数逐页获取。

你写出了第一版代码:

// ❌ 方案1:全部加载完再处理 —— 慢,还占内存
public async Task<List<GitHubIssue>> GetAllIssuesAsync(string repo)
{var allIssues = new List<GitHubIssue>();int page = 1;while (true){var issues = await FetchPageAsync(repo, page); // 等待每一页if (issues.Count == 0) break;allIssues.AddRange(issues); // 堆积在内存里page++;}return allIssues; // 全部返回后才能开始处理
}// 调用方必须等待所有数据加载完
var allIssues = await GetAllIssuesAsync("dotnet/runtime"); // 等了 30 秒...
foreach (var issue in allIssues)
{ProcessIssue(issue); // 终于可以开始处理了
}

0.2 这个实现的问题

运行后你发现了几个问题:

  1. ❌ 响应慢:用户要等待所有数据加载完(可能几十秒)才能看到第一条结果
  2. ❌ 内存占用高:1 万条 Issue 全部加载到内存,可能占用几十 MB
  3. ❌ 不够流式:数据是"批量"的,不是"流式"的,无法边加载边处理
  4. ❌ 无法提前取消:如果用户只需要前 100 条,也要等全部加载完

你开始思考:能不能边加载边返回?就像水龙头一样,一边出水一边用,而不是先接满一桶再倒出来?

0.3 IAsyncEnumerable 的登场

.NET 提供了一套完整的异步流机制:IAsyncEnumerable

使用 IAsyncEnumerable 重写后:

// ✅ 方案2:流式返回 —— 快,省内存
public async IAsyncEnumerable<GitHubIssue> GetAllIssuesStreamAsync(string repo)
{int page = 1;while (true){var issues = await FetchPageAsync(repo, page);if (issues.Count == 0) break;foreach (var issue in issues){yield return issue; // 逐个返回,不等全部加载完}page++;}
}// 调用方立刻就能开始处理第一条数据
await foreach (var issue in GetAllIssuesStreamAsync("dotnet/runtime"))
{ProcessIssue(issue); // 边加载边处理!
}

神奇的效果

  • 响应快:第一页加载完(1 秒)就开始处理,无需等待全部
  • 内存低:同时在内存中的只有当前处理的那一页(100 条)
  • 真正的流式:数据像水流一样源源不断,边来边处理
  • 可以提前结束:用 breakreturn 立刻停止,后续页不会再加载

看到这里,你是不是开始心动了?别急,我们从原理讲起。


1️⃣ IAsyncEnumerable 是什么?它与 IEnumerable 有什么区别?

1.1 回顾:IEnumerable 的同步世界

我们先回顾一下 IEnumerable<T>,它是 .NET 中最常见的"可枚举"接口:

// 同步版本:生成 1 到 10 的数字
public IEnumerable<int> GetNumbers()
{for (int i = 1; i <= 10; i++){yield return i; // 每次返回一个数字}
}// 消费方式
foreach (var num in GetNumbers())
{Console.WriteLine(num); // 同步逐个处理
}

IEnumerable 的特点

  • 惰性求值:不调用就不执行,调用时才逐个生成
  • 节省内存:不需要一次性生成所有元素
  • 完全同步yield returnforeach 都是同步的,无法处理异步操作

问题来了:如果数据生成需要异步操作(比如从 API 获取、数据库查询),怎么办?

// ❌ 错误示范:IEnumerable 里不能用 await
public IEnumerable<GitHubIssue> GetIssues()
{var issues = await FetchPageAsync(1); // ❌ 编译错误:yield return 方法不能是 asyncforeach (var issue in issues)yield return issue;
}

解决方案 1(传统做法):先全部加载再返回

// ❌ 不够优雅:失去了"流式"的优势
public async Task<IEnumerable<GitHubIssue>> GetIssuesAsync()
{var allIssues = new List<GitHubIssue>();var issues = await FetchPageAsync(1);allIssues.AddRange(issues);return allIssues; // 一次性返回
}

这就是 IAsyncEnumerable 的设计初衷:让异步操作也能流式返回!

1.2 IAsyncEnumerable:异步世界的流式迭代

IAsyncEnumerable<T>IEnumerable<T> 的异步版本,定义如下:

public interface IAsyncEnumerable<out T>
{IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}public interface IAsyncEnumerator<out T> : IAsyncDisposable
{T Current { get; }ValueTask<bool> MoveNextAsync(); // 注意:这里是 ValueTask,不是 Task
}

核心区别MoveNextAsync() 返回的是 ValueTask,表示"获取下一个元素"是一个异步操作。

特性 IEnumerable IAsyncEnumerable
迭代方式 foreach await foreach
生成方式 yield return yield return + async
获取下一个元素 同步(立即返回) 异步(可能需要等待)
支持取消 ❌ 不支持 ✅ 支持 CancellationToken
适用场景 内存集合、同步数据源 API 分页、数据库流、实时数据

1.3 核心语法:async + yield return

// ✅ 正确示范:异步流式返回
public async IAsyncEnumerable<int> GetNumbersAsync()
{for (int i = 1; i <= 10; i++){await Task.Delay(100); // 模拟异步操作(如 API 调用)yield return i; // 每次返回一个数字}
}// 消费方式:await foreach
await foreach (var num in GetNumbersAsync())
{Console.WriteLine(num); // 边加载边处理
}

关键要点

  1. 方法签名async IAsyncEnumerable<T>(注意没有 Task
  2. 返回元素:用 yield return,和同步版本一样
  3. 异步操作:可以在方法体内使用 await
  4. 消费方式:用 await foreach,而不是普通的 foreach

2️⃣ 流式数据的优势:响应速度、内存占用、用户体验

2.1 对比测试:Task<List> vs IAsyncEnumerable

我们用一个实际例子来对比两种方案的差异:从 API 获取 10 页数据(每页 100 条,共 1000 条),每页加载需要 200ms。

方案 1:传统方式(一次性加载)

public async Task<List<int>> GetDataTraditionalAsync()
{var result = new List<int>();for (int page = 1; page <= 10; page++){await Task.Delay(200); // 模拟 API 延迟for (int i = 1; i <= 100; i++)result.Add(page * 100 + i);}return result;
}// 使用方式
var sw = Stopwatch.StartNew();
var data = await GetDataTraditionalAsync(); // 等待 2 秒...
Console.WriteLine($"开始处理第一条数据,已过 {sw.ElapsedMilliseconds}ms");foreach (var item in data)ProcessItem(item); // 终于可以开始了

运行结果

开始处理第一条数据,已过 2000ms
处理完成,总耗时 2500ms

方案 2:流式加载(IAsyncEnumerable)

public async IAsyncEnumerable<int> GetDataStreamAsync()
{for (int page = 1; page <= 10; page++){await Task.Delay(200); // 模拟 API 延迟for (int i = 1; i <= 100; i++)yield return page * 100 + i;}
}// 使用方式
var sw = Stopwatch.StartNew();
await foreach (var item in GetDataStreamAsync())
{if (sw.ElapsedMilliseconds < 500) // 只记录前 500ms 的日志Console.WriteLine($"处理第 {item} 条数据,已过 {sw.ElapsedMilliseconds}ms");ProcessItem(item);
}

运行结果

处理第 101 条数据,已过 201ms
处理第 102 条数据,已过 201ms
...(第一页的数据立刻就开始处理了)
处理第 201 条数据,已过 401ms
处理完成,总耗时 2500ms

2.2 性能对比表

对比维度 Task<List> IAsyncEnumerable
首次响应时间 2000ms(等待全部加载) 200ms(第一页加载完就开始)
峰值内存占用 1000 条 × 对象大小 100 条 × 对象大小(只缓存当前页)
用户体验 长时间白屏/加载中 立刻看到数据,逐步加载
可取消性 必须等待全部加载完 随时可以 break/return 停止
代码复杂度 简单(适合小数据量) 稍复杂(但 .NET 提供了很好的语法糖)

2.3 什么时候用 IAsyncEnumerable?

✅ 适合的场景

场景 为什么选 IAsyncEnumerable
分页 API 边加载边展示,提升响应速度
大数据集查询 避免一次性加载 10 万条记录到内存
实时数据流 WebSocket、SignalR、传感器数据
日志流 实时读取日志文件,逐行处理
数据库游标 EF Core 支持 AsAsyncEnumerable()
文件流读取 逐行/逐块读取大文件

❌ 不适合的场景

场景 为什么不选 IAsyncEnumerable 推荐方案
需要多次遍历 每次遍历都会重新执行(惰性求值) 先 ToListAsync()
需要知道总数 流式数据不知道后面还有多少 先查询 Count
需要排序/分组 必须全部加载完才能排序 Task<List> + LINQ
数据量很小(< 100 条) 异步流的开销反而更大 Task<List>

3️⃣ 核心语法:生产与消费

3.1 生产者:如何返回异步流

最简单的例子

// 基础语法
public async IAsyncEnumerable<int> GetNumbersAsync(int count)
{for (int i = 1; i <= count; i++){await Task.Delay(100); // 每个数字都需要异步操作yield return i;}
}

支持取消令牌

// 使用 [EnumeratorCancellation] 特性接收取消令牌
public async IAsyncEnumerable<int> GetNumbersAsync(int count,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{for (int i = 1; i <= count; i++){cancellationToken.ThrowIfCancellationRequested(); // 检查取消await Task.Delay(100, cancellationToken);yield return i;}
}

为什么需要 [EnumeratorCancellation]?

因为 GetAsyncEnumerator(CancellationToken) 传入的 token 需要"传递"到方法参数里。这个特性告诉编译器:"把 token 绑定到这个参数上"。

错误处理

public async IAsyncEnumerable<string> FetchUrlsAsync(string[] urls)
{foreach (var url in urls){string? content = null;try{content = await FetchAsync(url);}catch (HttpRequestException ex){Console.WriteLine($"获取 {url} 失败: {ex.Message}");continue; // 跳过失败的 URL}if (content != null)yield return content;}
}

3.2 消费者:如何使用异步流

基础用法:await foreach

await foreach (var item in GetNumbersAsync(10))
{Console.WriteLine(item);
}

传递取消令牌

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(2)); // 2 秒后取消try
{await foreach (var item in GetNumbersAsync(100).WithCancellation(cts.Token)){Console.WriteLine(item);}
}
catch (OperationCanceledException)
{Console.WriteLine("操作已取消");
}

配置上下文(ConfigureAwait)

await foreach (var item in GetNumbersAsync(10).ConfigureAwait(false))
{// 不捕获 SynchronizationContext,适合库代码Console.WriteLine(item);
}

提前退出

await foreach (var item in GetNumbersAsync(100))
{Console.WriteLine(item);if (item == 10)break; // 停止迭代,后续数据不会再生成
}

手动迭代(不推荐,但有时需要)

await using var enumerator = GetNumbersAsync(10).GetAsyncEnumerator();
while (await enumerator.MoveNextAsync())
{var item = enumerator.Current;Console.WriteLine(item);
}
// await using 会自动调用 DisposeAsync()

4️⃣ 实战案例:GitHub API 分页加载

完整代码位置AsyncEnumerable/GitHubIssueFetcher.cs

4.1 需求分析

我们要实现一个工具,从 GitHub API 获取某个仓库的所有 Issue,支持:

  1. 流式返回:边加载边处理,不等待全部完成
  2. 取消支持:用户可以随时停止
  3. 错误处理:某一页失败不影响其他页
  4. 进度反馈:实时显示已加载的页数

4.2 API 分析

GitHub Issues API:https://api.github.com/repos/{owner}/{repo}/issues?page={page}&per_page=100

  • 每页最多 100 条
  • 通过 Link 响应头判断是否还有下一页
  • 需要认证(否则限流很严)

4.3 实现代码

public class GitHubIssueFetcher
{private readonly HttpClient _httpClient;public GitHubIssueFetcher(string? token = null){_httpClient = new HttpClient{BaseAddress = new Uri("https://api.github.com/"),DefaultRequestHeaders ={{ "User-Agent", "AsyncEnumerable-Demo" }}};if (!string.IsNullOrEmpty(token))_httpClient.DefaultRequestHeaders.Authorization =new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", token);}/// <summary>/// 流式获取所有 Issue/// </summary>public async IAsyncEnumerable<GitHubIssue> GetAllIssuesAsync(string owner,string repo,[EnumeratorCancellation] CancellationToken cancellationToken = default){int page = 1;int totalLoaded = 0;while (true){cancellationToken.ThrowIfCancellationRequested();List<GitHubIssue> issues;try{issues = await FetchPageAsync(owner, repo, page, cancellationToken);}catch (HttpRequestException ex){Console.WriteLine($"❌ 获取第 {page} 页失败: {ex.Message}");break; // 失败后停止}if (issues.Count == 0)break; // 没有更多数据foreach (var issue in issues){yield return issue;totalLoaded++;}Console.WriteLine($"✅ 已加载第 {page} 页,共 {totalLoaded} 条 Issue");page++;// GitHub API 限流:避免请求过快await Task.Delay(100, cancellationToken);}Console.WriteLine($"🎉 加载完成,共 {totalLoaded} 条 Issue");}private async Task<List<GitHubIssue>> FetchPageAsync(string owner,string repo,int page,CancellationToken cancellationToken){var url = $"repos/{owner}/{repo}/issues?page={page}&per_page=100&state=all";var response = await _httpClient.GetAsync(url, cancellationToken);response.EnsureSuccessStatusCode();var json = await response.Content.ReadAsStringAsync(cancellationToken);return JsonSerializer.Deserialize<List<GitHubIssue>>(json) ?? new List<GitHubIssue>();}
}// 数据模型
public class GitHubIssue
{[JsonPropertyName("number")]public int Number { get; set; }[JsonPropertyName("title")]public string Title { get; set; } = string.Empty;[JsonPropertyName("state")]public string State { get; set; } = string.Empty;[JsonPropertyName("created_at")]public DateTime CreatedAt { get; set; }[JsonPropertyName("labels")]public List<GitHubLabel> Labels { get; set; } = new();
}public class GitHubLabel
{[JsonPropertyName("name")]public string Name { get; set; } = string.Empty;
}

4.4 使用示例

// 示例 1:统计所有 Issue 的标签分布
var fetcher = new GitHubIssueFetcher();
var labelCount = new Dictionary<string, int>();await foreach (var issue in fetcher.GetAllIssuesAsync("dotnet", "runtime"))
{foreach (var label in issue.Labels){labelCount[label.Name] = labelCount.GetValueOrDefault(label.Name) + 1;}
}foreach (var (label, count) in labelCount.OrderByDescending(x => x.Value).Take(10))
{Console.WriteLine($"{label}: {count}");
}// 示例 2:查找包含关键词的 Issue(找到前 5 个就停止)
int found = 0;
await foreach (var issue in fetcher.GetAllIssuesAsync("dotnet", "runtime"))
{if (issue.Title.Contains("performance", StringComparison.OrdinalIgnoreCase)){Console.WriteLine($"#{issue.Number}: {issue.Title}");if (++found >= 5)break; // 找到 5 个就停止,不再加载后续页}
}// 示例 3:支持取消
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(5)); // 5 秒后自动取消try
{await foreach (var issue in fetcher.GetAllIssuesAsync("dotnet", "runtime").WithCancellation(cts.Token)){Console.WriteLine($"#{issue.Number}: {issue.Title}");}
}
catch (OperationCanceledException)
{Console.WriteLine("⏱️ 操作超时,已取消");
}

5️⃣ 高级技巧:LINQ 操作与扩展方法

5.1 System.Linq.Async(需要 NuGet 包)

.NET 标准库没有为 IAsyncEnumerable<T> 提供 LINQ 扩展方法(如 Where、Select),需要安装 NuGet 包:

dotnet add package System.Linq.Async

然后就可以使用异步版本的 LINQ:

using System.Linq;// 过滤 + 转换 + 限制数量
await foreach (var title in fetcher.GetAllIssuesAsync("dotnet", "runtime").Where(issue => issue.State == "open").Select(issue => issue.Title).Take(10))
{Console.WriteLine(title);
}

5.2 常用扩展方法

方法 作用 示例
Where 过滤 .Where(x => x.State == "open")
Select 转换 .Select(x => x.Title)
Take 限制数量 .Take(10)
Skip 跳过前 N 个 .Skip(100)
ToListAsync 转换为 List var list = await stream.ToListAsync()
ToArrayAsync 转换为数组 var array = await stream.ToArrayAsync()
CountAsync 计数 var count = await stream.CountAsync()
FirstOrDefaultAsync 获取第一个 var first = await stream.FirstOrDefaultAsync()

5.3 自定义扩展方法:批量处理

public static async IAsyncEnumerable<List<T>> BatchAsync<T>(this IAsyncEnumerable<T> source,int batchSize)
{var batch = new List<T>(batchSize);await foreach (var item in source){batch.Add(item);if (batch.Count >= batchSize){yield return batch;batch = new List<T>(batchSize);}}if (batch.Count > 0)yield return batch; // 返回最后不满一批的数据
}// 使用示例:每 100 条批量插入数据库
await foreach (var batch in fetcher.GetAllIssuesAsync("dotnet", "runtime").BatchAsync(100))
{await _dbContext.Issues.AddRangeAsync(batch);await _dbContext.SaveChangesAsync();Console.WriteLine($"已保存 {batch.Count} 条数据");
}

6️⃣ 性能优化:ValueTask vs Task、缓冲策略

6.1 为什么 MoveNextAsync 返回 ValueTask?

你可能注意到了,IAsyncEnumerator<T>.MoveNextAsync() 返回的是 ValueTask,而不是 Task<bool>

原因:对于高频调用的场景(如数据库游标、文件流),如果每次 MoveNextAsync 都分配一个 Task 对象,会产生大量 GC 压力。

ValueTask 的优势

  • 如果操作同步完成(如数据已在缓冲区),直接返回结果,不分配堆内存
  • 如果操作异步完成(需要等待 I/O),才分配 Task

这就是为什么 IAsyncEnumerable<T> 比自己手写 Task<List<T>> 更高效的原因之一。

6.2 缓冲策略:减少网络往返

对于网络 API,可以实现一个"预加载"策略:当前页在处理时,后台已经在加载下一页。

public async IAsyncEnumerable<GitHubIssue> GetAllIssuesWithPrefetchAsync(string owner,string repo,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{int page = 1;Task<List<GitHubIssue>>? nextPageTask = null;while (true){cancellationToken.ThrowIfCancellationRequested();// 如果没有预加载任务,启动加载当前页var currentPageTask = nextPageTask ?? FetchPageAsync(owner, repo, page, cancellationToken);// 同时启动加载下一页(预加载)nextPageTask = FetchPageAsync(owner, repo, page + 1, cancellationToken);var issues = await currentPageTask;if (issues.Count == 0)break;foreach (var issue in issues)yield return issue;page++;}
}

效果:当前页在处理时,下一页已经在后台加载,减少用户等待时间。

6.3 ⚠️ 避免 CA2024 警告:不要在异步方法中使用 EndOfStream

在读取文件流时,你可能会遇到这个警告:

// ❌ 错误:CA2024 警告
public async IAsyncEnumerable<string> ReadAllLinesAsync(string filePath)
{using var reader = new StreamReader(filePath);while (!reader.EndOfStream) // ⚠️ CA2024:EndOfStream 是同步属性{var line = await reader.ReadLineAsync();if (line != null)yield return line;}
}

问题EndOfStream 是一个同步属性,在异步方法中调用可能导致阻塞,违反了异步编程的原则。

正确做法:通过 ReadLineAsync 的返回值(null)来判断流结束:

// ✅ 正确:通过返回值判断结束
public async IAsyncEnumerable<string> ReadAllLinesAsync(string filePath,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{using var reader = new StreamReader(filePath);while (!cancellationToken.IsCancellationRequested){var line = await reader.ReadLineAsync(cancellationToken);if (line == null) // null 表示已到达流的末尾break;yield return line;}
}

6.4 高性能方案:使用 System.IO.Pipelines(.NET Core 3.0+)

对于大文件、高吞吐量场景,System.IO.Pipelines 提供了更高效的流处理方式:

优势

  • 使用内存池(Memory Pool),减少 GC 压力
  • 更高效的缓冲管理
  • 支持背压控制
  • 典型性能提升:1.5-3x,内存分配减少 50-80%

适用场景

  • 大文件(> 100 MB)
  • 高吞吐量场景(如日志处理、数据导入)
  • 网络流处理

实现示例

using System.IO.Pipelines;
using System.Buffers;public async IAsyncEnumerable<string> ReadAllLinesWithPipelineAsync(string filePath,[EnumeratorCancellation] CancellationToken cancellationToken = default)
{await using var fileStream = File.OpenRead(filePath);var reader = PipeReader.Create(fileStream);try{while (!cancellationToken.IsCancellationRequested){// 读取数据到缓冲区var result = await reader.ReadAsync(cancellationToken);var buffer = result.Buffer;// 逐行解析while (TryReadLine(ref buffer, out var line)){yield return line;}// 告诉 PipeReader 我们已经处理了多少数据reader.AdvanceTo(buffer.Start, buffer.End);// 如果已经完成,退出循环if (result.IsCompleted)break;}}finally{await reader.CompleteAsync();}
}private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out string line)
{// 查找换行符(\n 或 \r\n)var position = buffer.PositionOf((byte)'\n');if (position == null){line = string.Empty;return false;}// 提取一行数据var lineBuffer = buffer.Slice(0, position.Value);line = Encoding.UTF8.GetString(lineBuffer);// 移除可能的 \rif (line.EndsWith('\r'))line = line[..^1];// 移动到下一行的起始位置buffer = buffer.Slice(buffer.GetPosition(1, position.Value));return true;
}

性能对比(10 万行文件):

方案 耗时 内存分配 吞吐量
StreamReader 150 ms ~20 MB 666K 行/秒
Pipeline 100 ms ~5 MB 1M 行/秒
提升 1.5x 减少 75% 1.5x

选择建议

场景 推荐方案 原因
小文件(< 10 MB) StreamReader 代码简单,性能够用
大文件(> 100 MB) Pipeline 显著的性能提升和内存节省
实时监控(tail -f) StreamReader 实时性更重要,性能差异不明显
高吞吐量场景 Pipeline 减少 GC 压力,提高稳定性

完整示例代码AsyncEnumerable/LogStreamProcessor.csStreamReaderVsPipelineComparison.cs


7️⃣ 常见陷阱与最佳实践

7.1 ❌ 陷阱1:多次遍历会重新执行

var stream = GetNumbersAsync(10);// 第一次遍历
await foreach (var num in stream)Console.WriteLine(num); // 输出 1-10// 第二次遍历 ❌ 会重新执行整个方法!
await foreach (var num in stream)Console.WriteLine(num); // 又输出 1-10,API 又调用了一遍!

解决方案:如果需要多次遍历,先转换为 List:

var list = await stream.ToListAsync(); // 只执行一次// 多次遍历 List
foreach (var num in list) { }
foreach (var num in list) { }

7.2 ❌ 陷阱2:忘记 await foreach

// ❌ 错误:忘记 await
foreach (var item in GetNumbersAsync(10)) // 编译错误!
{// ...
}// ✅ 正确
await foreach (var item in GetNumbersAsync(10))
{// ...
}

7.3 ❌ 陷阱3:在 yield return 之前做太多工作

// ❌ 不好的设计
public async IAsyncEnumerable<ProcessedData> ProcessDataAsync()
{var allData = await LoadAllDataAsync(); // 这里就全部加载了foreach (var data in allData){yield return ProcessItem(data); // 失去了"流式"的意义}
}// ✅ 好的设计
public async IAsyncEnumerable<ProcessedData> ProcessDataAsync()
{await foreach (var data in LoadDataStreamAsync()) // 边加载边处理{yield return ProcessItem(data);}
}

7.4 ✅ 最佳实践总结

场景 推荐做法
需要取消 使用 [EnumeratorCancellation] + WithCancellation()
库代码 使用 ConfigureAwait(false)
多次遍历 ToListAsync()
需要总数 返回 (IAsyncEnumerable<T> data, int totalCount) 元组
错误处理 在 yield 之前 try-catch,决定是跳过还是终止
进度反馈 在 yield 时更新进度(如日志、UI)

8️⃣ EF Core 中的 IAsyncEnumerable 支持

Entity Framework Core 原生支持异步流:

// ✅ EF Core 查询大数据集
await foreach (var user in dbContext.Users.Where(u => u.IsActive).AsAsyncEnumerable())
{// 逐条处理,不会一次性加载全部到内存await ProcessUserAsync(user);
}

对比传统方式

// ❌ 传统方式:一次性加载 10 万条记录
var users = await dbContext.Users.Where(u => u.IsActive).ToListAsync();
foreach (var user in users) // 内存已经占满了
{await ProcessUserAsync(user);
}// ✅ 流式方式:同时在内存中的只有当前正在处理的那一条
await foreach (var user in dbContext.Users.Where(u => u.IsActive).AsAsyncEnumerable())
{await ProcessUserAsync(user); // 内存占用很低
}

9️⃣ 实战案例2:实时日志流处理

完整代码位置AsyncEnumerable/LogStreamProcessor.cs

9.1 需求

监控一个日志文件,实时读取新增的行(类似 tail -f)。

9.2 实现

public class LogStreamProcessor
{/// <summary>/// 实时监控日志文件,返回新增的行/// </summary>public async IAsyncEnumerable<string> TailFileAsync(string filePath,[EnumeratorCancellation] CancellationToken cancellationToken = default){using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);using var reader = new StreamReader(fileStream);// 先跳到文件末尾reader.BaseStream.Seek(0, SeekOrigin.End);while (!cancellationToken.IsCancellationRequested){var line = await reader.ReadLineAsync(cancellationToken);if (line != null){yield return line; // 返回新增的行}else{// 文件暂时没有新内容,等待一会儿await Task.Delay(100, cancellationToken);}}}
}// 使用示例
var processor = new LogStreamProcessor();
var cts = new CancellationTokenSource();// 在后台任务中监控日志
_ = Task.Run(async () =>
{await foreach (var line in processor.TailFileAsync("app.log", cts.Token)){if (line.Contains("ERROR"))Console.WriteLine($"🔴 发现错误: {line}");else if (line.Contains("WARNING"))Console.WriteLine($"🟡 警告: {line}");}
});// 5 秒后停止监控
await Task.Delay(5000);
cts.Cancel();

🔟 IAsyncEnumerable 与 SSE(Server-Sent Events)的关系

10.1 什么是 SSE?

SSE(Server-Sent Events) 是一种 HTTP 长连接技术,允许服务器主动向客户端推送数据流,常用于:

  • 💬 实时聊天消息推送
  • 📊 股票价格、比赛分数实时更新
  • 📝 AI 流式输出(如 ChatGPT 的打字机效果)
  • 🔔 系统通知推送

技术特点

  • 基于 HTTP,客户端发起连接后,服务器持续推送 data: 格式的文本流
  • 单向通信(服务器 → 客户端)
  • 自动重连机制
  • 兼容性好(支持 EventSource API)

10.2 IAsyncEnumerable 与 SSE 的天然契合

在 ASP.NET Core 中,IAsyncEnumerable 是实现 SSE 的完美载体

// ASP.NET Core 控制器:实现 SSE 端点
[HttpGet("stream-messages")]
public async IAsyncEnumerable<string> StreamMessagesAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{// 模拟实时消息流for (int i = 1; i <= 100; i++){cancellationToken.ThrowIfCancellationRequested();// 每隔 1 秒推送一条消息await Task.Delay(1000, cancellationToken);yield return $"消息 #{i} - {DateTime.Now:HH:mm:ss}";}
}

ASP.NET Core 自动处理

  • ✅ 自动设置 Content-Type: text/event-stream
  • ✅ 自动将每个 yield return 的值转换为 SSE 格式:data: 消息内容\n\n
  • ✅ 自动处理客户端断开连接(通过 CancellationToken

客户端(JavaScript)

const eventSource = new EventSource('/api/stream-messages');eventSource.onmessage = (event) => {console.log('收到消息:', event.data);// 输出:消息 #1 - 14:30:01
};eventSource.onerror = () => {console.error('连接断开');eventSource.close();
};

10.3 实战案例:AI 流式输出(ChatGPT 效果)

场景:调用 AI API,逐字返回生成的内容,而不是等全部生成完。

[HttpGet("ai-stream")]
public async IAsyncEnumerable<string> StreamAiResponseAsync([FromQuery] string prompt,[EnumeratorCancellation] CancellationToken cancellationToken)
{// 模拟调用 AI API(如 OpenAI)var response = await _aiClient.ChatCompletionAsync(prompt, stream: true, // 开启流式输出cancellationToken);// 逐块返回 AI 生成的内容await foreach (var chunk in response.WithCancellation(cancellationToken)){yield return chunk.Text; // 每个词、每个句子逐步返回}
}

前端效果(打字机动画):

const eventSource = new EventSource('/api/ai-stream?prompt=写一首诗');
let fullText = '';eventSource.onmessage = (event) => {fullText += event.data; // 累加内容document.getElementById('output').innerText = fullText;
};

用户体验

  • ❌ 传统方式:等待 5 秒,突然显示完整答案
  • ✅ 流式方式:立即开始显示,逐字呈现,体验流畅

10.4 IAsyncEnumerable 与 SSE 的区别与联系

维度 IAsyncEnumerable SSE
本质 C# 语言特性(异步迭代器) HTTP 传输协议
作用范围 服务器端数据流处理 服务器 → 客户端推送
关系 SSE 的服务端实现方式 SSE 的传输格式
典型用法 await foreach 处理流式数据 EventSource 接收服务器推送
ASP.NET Core 集成 返回 IAsyncEnumerable<T> 自动转换为 SSE 框架自动处理 SSE 格式

核心理解

  • IAsyncEnumerable生产者端的抽象(如何生成流式数据)
  • SSE 是 传输层的协议(如何通过 HTTP 推送数据)
  • 在 ASP.NET Core 中,IAsyncEnumerable 自动映射为 SSE

10.5 其他流式技术对比

技术 通信方式 协议 适用场景 IAsyncEnumerable 支持
SSE 单向(服务器 → 客户端) HTTP 实时推送、股票行情、日志流 ✅ 原生支持
WebSocket 双向(全双工) WebSocket 聊天室、协作编辑、游戏 ✅ 可配合使用
gRPC Streaming 双向(基于 HTTP/2) gRPC 微服务间流式通信 ✅ 原生支持
SignalR 双向(抽象层) 多种(WebSocket/SSE/长轮询) 实时应用框架 ✅ 内部使用

选择建议

  • 简单的服务器推送 → SSE + IAsyncEnumerable
  • 需要客户端回传数据 → WebSocket
  • 微服务流式调用 → gRPC Streaming
  • 高级实时功能(如房间、群组) → SignalR

10.6 完整示例:实时日志推送

服务端(ASP.NET Core)

[ApiController]
[Route("api/[controller]")]
public class LogsController : ControllerBase
{[HttpGet("stream")]public async IAsyncEnumerable<string> StreamLogsAsync([EnumeratorCancellation] CancellationToken cancellationToken){var logFile = "app.log";var processor = new LogStreamProcessor();await foreach (var line in processor.TailFileAsync(logFile, cancellationToken)){// 只推送错误和警告if (line.Contains("ERROR") || line.Contains("WARNING")){yield return line;}}}
}

客户端(浏览器)

<div id="logs"></div><script>
const eventSource = new EventSource('/api/logs/stream');
const logsDiv = document.getElementById('logs');eventSource.onmessage = (event) => {const logLine = event.data;const p = document.createElement('p');p.textContent = logLine;// 错误标红,警告标黄if (logLine.includes('ERROR')) {p.style.color = 'red';} else if (logLine.includes('WARNING')) {p.style.color = 'orange';}logsDiv.appendChild(p);
};eventSource.onerror = () => {console.error('日志流断开');
};
</script>

效果:浏览器实时显示服务器日志,无需刷新页面。


10.7 要点总结

关系IAsyncEnumerable 是 SSE 在 ASP.NET Core 中的服务端实现方式

优势:代码简洁(无需手动拼接 SSE 格式),框架自动处理

适用场景

  • 实时数据推送(股票、天气、比分)
  • AI 流式输出(ChatGPT 效果)
  • 日志/监控流
  • 进度更新

限制

  • 只支持单向推送(客户端 → 服务器需要额外请求)
  • 文本格式(二进制数据需要 Base64 编码)

不适合:需要双向通信的场景(建议用 WebSocket 或 SignalR)


🎓 本章总结

核心要点回顾

  1. IAsyncEnumerable 是什么:异步版本的 IEnumerable<T>,支持流式返回异步数据
  2. 核心语法
    • 生产者:async IAsyncEnumerable<T> + yield return + await
    • 消费者:await foreach
  3. 优势
    • 响应快:边加载边处理,无需等待全部完成
    • 内存低:同时在内存中的只有当前批次
    • 可取消:支持 CancellationToken
  4. 适用场景:分页 API、大数据集查询、实时数据流、日志流
  5. 性能优化ValueTask、预加载策略、批量处理
  6. 常见陷阱:多次遍历会重新执行、忘记 await foreach、在 yield 之前加载全部数据

与其他技术的对比

技术 适用场景 优势 劣势
Task<List<T>> 小数据量、需要多次遍历 简单、可多次遍历 内存占用高、响应慢
IAsyncEnumerable<T> 大数据量、流式处理 内存低、响应快 不能多次遍历、不知道总数
Channel<T> 生产者-消费者模式 解耦、背压控制 代码复杂
IObservable<T> (Rx) 复杂事件流 强大的组合操作符 学习曲线陡峭
SSE (Server-Sent Events) HTTP 实时推送 基于 HTTP、自动重连 单向推送、文本格式

最佳实践清单

  • ✅ 对于 API 分页、数据库大数据集,优先考虑 IAsyncEnumerable<T>
  • ✅ 始终支持 CancellationToken(使用 [EnumeratorCancellation]
  • ✅ 库代码中使用 ConfigureAwait(false)
  • ✅ 需要多次遍历时,先 ToListAsync()
  • ✅ 在 yield 之前处理异常,避免流中断
  • ✅ 使用 System.Linq.Async 进行 LINQ 操作
  • ❌ 不要在 yield 之前加载全部数据(失去流式意义)
  • ❌ 不要忘记 await foreach

📚 推荐阅读

  • 官方文档
    • IAsyncEnumerable Interface
    • System.Linq.Async on NuGet

http://www.zskr.cn/news/1526203.html

相关文章:

  • 2026年6月最新版永州正规房屋漏水防水补漏维修口碑名单:创维修缮机构等5家深度测评 - 一休咨询
  • 别再死磕EKF了!聊聊ESKF:一种更优雅、更省算力的IMU融合方案
  • 快手怎么去水印?2026实测避坑指南 - 科技热点发布
  • 2026爆火!5款AI写作辅助网站实测,告别推倒重来,初稿一气呵成
  • 2026年视频去水印在线工具怎么选 - 科技热点发布
  • 【万字文档+源码】基于springboot+vue购物网站系统 -学习项目资料分享
  • 2026年6月最新版扬州正规房屋漏水防水补漏维修口碑名单:创维修缮机构等5家深度测评 - 一休咨询
  • py每日spider案例之某乎x-zse-96逆向参数(webpack+补环境)
  • 开封汉服妆造体验来袭!交通便利之处,开启一场穿越时空的美丽邂逅 - GrowthUME
  • 深度解析AssetRipper:Unity资源逆向工程的架构哲学与实践指南
  • Anthropic最强模型Fable 5被禁,美国政府要求修复漏洞,退款引网友不满
  • 无穷大电源系统三相短路仿真3(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码
  • League Akari:英雄联盟玩家的终极智能助手,免费提升游戏体验的完整方案
  • 水电站转速信号开关JSX-325
  • Mac Mouse Fix:让普通鼠标在macOS上获得专业级体验的终极指南
  • 2026年6月市面上专业的铜鼎生产厂家推荐,铜雕/铜麒麟/铜牛/铸铜雕塑/铜大缸/铜鼎/动物雕塑/铜钟,铜鼎企业怎么选择 - 品牌推荐师
  • 开源阅读鸿蒙版深度解析:构建下一代跨设备数字阅读生态的完整架构实践指南
  • 深度学习 - Ref
  • MPC7450指令时序深度解析:从流水线原理到性能优化实战
  • MPC7450处理器信号接口深度解析:L3缓存、中断复位与时钟配置实战
  • Qt-UI StyleKit 使用说明 - Qt
  • Windows窗口管理终极指南:如何用Traymond彻底释放任务栏空间
  • ok-ww鸣潮自动化框架:基于图像识别的智能游戏操作引擎技术解析
  • Qlib实战指南:从零开始构建AI量化策略的7个关键步骤
  • GHelper:华硕笔记本轻量级控制工具,彻底取代Armoury Crate的终极方案
  • 嵌入式处理器e300核心机制解析:缓存、中断与内存管理实战
  • 如何在Windows 11上玩转经典局域网游戏?IPXWrapper给你答案!
  • 2026权威树洞陪聊|不泄密不存痕,正能量陪你聊到天亮 - 时时资讯
  • 《星源纪》七境心法拆解:修心+成事终极操作手册
  • MPC8260 I2C控制器与并行I/O端口配置详解及实战指南