Operator是Publisher(发布者)的方法,用于对数据流进行变换、过滤、合并等操作,并返回一个新的Publisher。
操作符可以理解为加工函数,对发布者发布的值进行加工,只有订阅(如sink)才会启动整个流。
常用操作符
1、转换(Transform)
1、map:对每个值进行转换。
let publisher = Just(5)
publisher
.map { $0 * 2 } // 乘以 2
.sink { value in
print("Mapped value: \(value)") // 输出:Mapped value: 10
}
2、tryMap:可以抛出错误的map。
Just("123")
.tryMap { string in
guard let n = Int(string) else {
throw MyError.invalid
}
return n
}
3、flatMap:将值映射为另一个Publisher。
message
.flatMap { flatMsg in
Just(flatMsg + "abc")
}
.sink { print($0) }
flatMap会创建并返回一个新的Publsiher,flatMap会获取Publisher的值并组合成一个新的Publisher。
2、过滤(Filter)
1、filter:筛选满足条件的数据。
[1, 2, 3, 4, 5].publisher
.filter { $0 % 2 == 0 } // 只要偶数
.sink { print($0) } // 输出:2, 4
2、removeDuplicates:比较“相邻的值”,不会完全去重。
[1,1,2,2,1].publisher
.removeDuplicates() // 输出:1, 2, 1
相邻值进行去重,适用于数据筛选,防止相邻重复的问题。
3、组合(Combine)
1、merge:合流两个Publisher。
publisherA
.merge(with: publisherB)
合流可以合并多个事件源,例如当菜单点击、按钮点击都会触发同一个行为时,合流可以把多个行为合并成同一行为:
keyboardPastePublisher
.merge(with: menuPastePublisher, buttonPastePublisher)
.sink { paste() }
merge的限制为Output类型和Failure类型必须相同。
2、zip:一对一配对。
publisherA
.zip(publisherB) // ("pubA","pubB")
同步两个请求的结果。
3、combineLatest:合并多个Publisher的最新值。
usernamePublisher
.combineLatest(passwordPublisher)
保持最新值合并发送,例如A发布了a1,B没有发布,则等待配对。
当B发布b1后,合并最新值为(a1,b1),当A发布a2后,合并最新值为(a2,b1),类似CurrentValueSubject,会保留最新值。
时间 →
A: a1 a2
B: b1 b2
combineLatest:
(a1,b1) (a2,b1) (a2,b2)
4、延迟 / 调度(Scheduling)
控制事件在哪个线程执行。
1、subscribe(on:):控制订阅在哪个线程执行。
publisher
.subscribe(on: DispatchQueue.global()) // 订阅在后台线程
.sink { value in
// 在主线程更新 UI
}
subscribe表示订阅行为本身,例如网络请求发布等场景。
例如,请求网络:
URLSession.shared.dataTaskPublisher(for: url)
.subscribe(on: DispatchQueue.global())
.sink { ... }
这表示在后台线程发起网络请求和读取数据。
2、receive(on:):指定在哪个线程/调度器(Scheduler)上接收事件。
publisher
.subscribe(on: DispatchQueue.global()) // 订阅在后台线程
.receive(on: DispatchQueue.main) // 接收在主线程
.sink { value in
// 在主线程更新 UI
}
例如,更新UI:
.receive(on: DispatchQueue.main)
.sink { value in
self.label.text = value
}
表示无论上有在哪个线程,UI需要在主线程更新。
5、错误处理
1、catch:发布错误时,用新的Publisher替代上游Publisher。
let pub1 = Fail<Int, Error>(error: URLError(.badServerResponse))
let pub2 = pub1.catch { error in
Just(100) // 出错后,用新 Publisher 继续发送值
}
pub2.sink { value in
print(value)
}
// 输出:100
返回新的Publisher,适合错误恢复或者切换备用数据源。
2、retry:当Publisher失败后,重新订阅Publisher。
URLSession.shared.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: MyModel.self, decoder: JSONDecoder())
.retry(3)
.sink(
receiveCompletion: { completion in
print(completion)
},
receiveValue: { model in
print(model)
}
)
retry只会在Publisher自动发出错误时重新订阅。只处理失败,不处理完成。
手动调用不会重试,常用于网络重试。
3、replaceError:使用默认值替代错误。
let pub = Fail<Int, Error>(error: URLError(.badServerResponse))
.replaceError(with: 42)
pub.sink { value in
print(value)
}
// 输出:42
当发生错误时,会输出默认值。
6、其他
1、debounce:延迟发送(防抖)。
input
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { print("debounce:", $0) }
在指定时间内,返回最后一个值。例如当input在0.5秒内发布多个值(a,b,c),debounce只会返回最后一个值(c)。
2、handleEvents:添加副作用,如打印日志。
input
.handleEvents(
receiveSubscription: { _ in print("订阅开始") },
receiveOutput: { value in print("即将输出:", value) },
receiveCompletion: { completion in print("完成:", completion) },
receiveCancel: { print("被取消") }
)
.sink { print("sink 收到:", $0) }
不会改变值、仅用于观察状态。
例如:
订阅开始
即将输出: abc
sink 收到: abc
完成: failure(Error)
4、throttle:限速,只允许第一个通过。
input
.throttle(for: .seconds(0.5),
scheduler: RunLoop.main,
latest: false)
.sink { print("throttle:", $0) }
可以理解为,当发布多个值时(aaa,bbb,ccc,ddd)后,会先收到第一个值(aaa),在限定的时间过后,收到第二个值(bbb或者ddd,由latest决定)。
5、decode:解码Publisher发布的Data数据。
struct User: Decodable {
let id: Int
let name: String
}
URLSession.shared.dataTaskPublisher(for: url)
.map(\.data) // DataTask 输出的是 (data, response)
.decode(type: User.self, decoder: JSONDecoder())
.sink(
receiveCompletion: { print($0) },
receiveValue: { user in print(user) }
)
当DataTask返回data数据后,decode可以将data数据转换为对应的类型。
常用调度器
Combine很多操作符(debounce、delay)都需要指定scheduler,因为它们的计时器必须挂在某个RunLoop/Queue上。
以下是常用的调度器:
1、DispatchQueue:使用GCD的调度队列。
DispatchQueue.main // 主线程
DispatchQueue.global(qos: .background) // 后台线程,(其他选项 .utility, .userInitiated 等)
DispatchQueue(label: "xxx")
2、RunLoop:主线程(UI线程),适用于UI更新。
RunLoop.main
3、OperationQueue:更加细粒度的调度器控制,适用于复杂任务依赖或并发控制。
OperationQueue.main
OperationQueue()
使用频率较低,适用于需要依赖关系、老项目、UIKit项目。
总结
操作符是Publisher的加工函数,本身不会发出事件,只会返回新的Publsiher。
事件流必须被订阅(sink/assign)才能激活。
