AsyncStream 是 Swift 并发模型中提供的一种类型,用于把“逐步产生的异步事件”包装成一个符合 AsyncSequence 协议的异步序列,从而可以通过 for-await-in 的方式按顺序消费这些事件。
它常用于把基于回调、通知、delegate、定时器、socket 数据流等“推送式数据源”转换为结构化的异步流。
基本用法
示例:
let stream = AsyncStream(Int.self) { continuation in
// 生产数据
continuation.yield(1)
continuation.yield(2)
continuation.yield(3)
// 结束流
continuation.finish()
}
for await value in stream {
print("received:", value)
}
1、生产数据:
continuation.yield(1)
生产端按需向流中插入数据。
2、消费端:
for await value in stream {
print("received:", value)
}
消费端通过for await逐项读取。
3、结束流:
continuation.finish()
当结束时调用finish方法。
如果需要抛出错误,则可以在finish中抛出:
let stream = AsyncThrowingStream(Int.self) { continuation in
continuation.yield(1)
continuation.finish(throwing: MyError())
}
4、释放资源:
continuation.onTermination = { termination in
switch termination {
case .cancelled: break
case .finished: break
}
}
当调用finish方法取消任务后,onTermination会被调用,用来释放资源。
使用场景
把回调接口转为异步流:
func notificationsStream() -> AsyncStream<String> {
AsyncStream { continuation in
let id = NotificationCenter.default.addObserver(
forName: .myEvent, object: nil, queue: nil
) { note in
continuation.yield("event")
}
// 取消时清理资源
continuation.onTermination = { _ in
NotificationCenter.default.removeObserver(id)
}
}
}
NotificationCenter接收到消息时,会调用 AsyncStream 的 yield 方法,生产数据,并由消费端接收并处理。
当任务主动取消时(调用finish方法),AsyncStream会触发onTermination方法,移除NotificationCenter监听。
缓冲与背压
“缓冲(buffer)”与“背压(backpressure)”,是描述生产者产生数据的速度与消费者处理数据的速度之间如何协调的问题。AsyncStream 为了解决这一类异步事件流常见问题,提供了可配置的缓冲策略。
AsyncStream 默认有缓冲(64 条)。可以自定义:
AsyncStream(Int.self, bufferingPolicy: .bufferingOldest(64)) { continuation in
...
}
为什么需要缓冲?
在事件流中,通常存在两个角色:
1、生产者:负责向流中 yield 数据(例如通知回调、socket 数据、定时器事件)。
2、消费者:负责 for-await 逐条读取数据。
现实中,二者速度往往不一致,例如:
生产者 1 秒产生 100 条数据。
消费者 1 秒只能处理 10 条。
如果没有缓冲,多余的数据要么丢失,要么导致系统崩溃。
因此 AsyncStream 默认提供一个容量为 64 条的缓冲区,用来临时存放“尚未被消费的事件”。
如果当前未消费的数据少于64条,则放入缓冲队列,超过64条,则根据测罗决定丢弃旧数据或新数据(背压行为)。
背压(backpressure):指的是当消费者处理不过来时,系统如何应对生产者继续产出的数据。当缓冲存满后,根据bufferingPolicy自行制定策略。
可配置的缓冲策略:
1、保留最新数据(丢弃旧的数据)
AsyncStream(Int.self, bufferingPolicy: .bufferingNewest(10)) { continuation in
...
}
最多缓存10条,如果继续产出第11条,最旧的那条数据会被丢弃。
适合实时场景,例如位置更新、手势轨迹、传感器读数。
2、保留最旧的数据(丢弃新的数据)
AsyncStream(Int.self, bufferingPolicy: .bufferingOldest(10)) { continuation in
...
}
超过10条后,新的数据被丢弃。
适合任务队列、日志上传等需保证顺序的业务。
3、无限缓冲(不建议随意使用)
AsyncStream(Int.self, bufferingPolicy: .unbounded) { continuation in
...
}
所有数据都会缓存,直到内存耗尽。
通常只在确保生产者速率受控时使用。
总结
AsyncStream可以将事件驱动式异步代码,转换为结构化、可迭代、可取消的异步序列,使并发逻辑更安全、更刻度。
适合delegate / callback数据源、时间流(通知、手势、网络流、传感器),定时器、生产者和消费者模型。
不适合只返回一个结果(应该使用async/await),需求随机访问或索引的集合。
