Combine操作符Operator
Combine操作符Operator

Combine操作符Operator

Operator是Publisher(发布者)的方法,用于对数据流进行变换、过滤、合并等操作,并返回一个新的Publisher。

操作符可以理解为加工函数,对发布者发布的值进行加工,只有订阅(如sink)才会启动整个流。

常用操作符

1、转换(Transform)

1、map:对每个值进行转换。

let publisher = Just(5)

publisher
    .map { $0 * 2 }  // 乘以 2
    .sink { value in
        print("Mapped value: \(value)")  // 输出:Mapped value: 10
    }

2、tryMap:可以抛出错误的map。

Just("123")
    .tryMap { string in
        guard let n = Int(string) else {
            throw MyError.invalid
        }
        return n
    }

3、flatMap:将值映射为另一个Publisher。

message
    .flatMap { flatMsg in
        Just(flatMsg + "abc")
    }
    .sink { print($0) }

flatMap会创建并返回一个新的Publsiher,flatMap会获取Publisher的值并组合成一个新的Publisher。

2、过滤(Filter)

1、filter:筛选满足条件的数据。

[1, 2, 3, 4, 5].publisher
    .filter { $0 % 2 == 0 }  // 只要偶数
    .sink { print($0) }  // 输出:2, 4

2、removeDuplicates:比较“相邻的值”,不会完全去重。

[1,1,2,2,1].publisher
    .removeDuplicates() // 输出:1, 2, 1

相邻值进行去重,适用于数据筛选,防止相邻重复的问题。

3、组合(Combine)

1、merge:合流两个Publisher。

publisherA
    .merge(with: publisherB)

合流可以合并多个事件源,例如当菜单点击、按钮点击都会触发同一个行为时,合流可以把多个行为合并成同一行为:

keyboardPastePublisher
    .merge(with: menuPastePublisher, buttonPastePublisher)
    .sink { paste() }

merge的限制为Output类型和Failure类型必须相同。

2、zip:一对一配对。

publisherA
    .zip(publisherB)    // ("pubA","pubB")

同步两个请求的结果。

3、combineLatest:合并多个Publisher的最新值。

usernamePublisher
    .combineLatest(passwordPublisher)

保持最新值合并发送,例如A发布了a1,B没有发布,则等待配对。

当B发布b1后,合并最新值为(a1,b1),当A发布a2后,合并最新值为(a2,b1),类似CurrentValueSubject,会保留最新值。

时间 →
A:  a1      a2
B:      b1          b2
combineLatest:
    (a1,b1) (a2,b1) (a2,b2)

4、延迟 / 调度(Scheduling)

控制事件在哪个线程执行。

1、subscribe(on:):控制订阅在哪个线程执行。

publisher
    .subscribe(on: DispatchQueue.global())  // 订阅在后台线程
    .sink { value in
        // 在主线程更新 UI
    }

subscribe表示订阅行为本身,例如网络请求发布等场景。

例如,请求网络:

URLSession.shared.dataTaskPublisher(for: url)
    .subscribe(on: DispatchQueue.global())
    .sink { ... }

这表示在后台线程发起网络请求和读取数据。

2、receive(on:):指定在哪个线程/调度器(Scheduler)上接收事件。

publisher
    .subscribe(on: DispatchQueue.global())  // 订阅在后台线程
    .receive(on: DispatchQueue.main)  // 接收在主线程
    .sink { value in
        // 在主线程更新 UI
    }

例如,更新UI:

.receive(on: DispatchQueue.main)
.sink { value in
    self.label.text = value
}

表示无论上有在哪个线程,UI需要在主线程更新。

5、错误处理

1、catch:发布错误时,用新的Publisher替代上游Publisher。

let pub1 = Fail<Int, Error>(error: URLError(.badServerResponse))

let pub2 = pub1.catch { error in
    Just(100) // 出错后,用新 Publisher 继续发送值
}

pub2.sink { value in
    print(value)
}
// 输出:100

返回新的Publisher,适合错误恢复或者切换备用数据源。

2、retry:当Publisher失败后,重新订阅Publisher。

URLSession.shared.dataTaskPublisher(for: url)
    .map(\.data)
    .decode(type: MyModel.self, decoder: JSONDecoder())
    .retry(3)
    .sink(
        receiveCompletion: { completion in
            print(completion)
        },
        receiveValue: { model in
            print(model)
        }
    )

retry只会在Publisher自动发出错误时重新订阅。只处理失败,不处理完成。

手动调用不会重试,常用于网络重试。

3、replaceError:使用默认值替代错误。

let pub = Fail<Int, Error>(error: URLError(.badServerResponse))
    .replaceError(with: 42)

pub.sink { value in
    print(value)
}
// 输出:42

当发生错误时,会输出默认值。

6、其他

1、debounce:延迟发送(防抖)。

input
    .debounce(for: .seconds(0.5), scheduler: RunLoop.main)
    .sink { print("debounce:", $0) }

在指定时间内,返回最后一个值。例如当input在0.5秒内发布多个值(a,b,c),debounce只会返回最后一个值(c)。

2、handleEvents:添加副作用,如打印日志。

input
    .handleEvents(
        receiveSubscription: { _ in print("订阅开始") },
        receiveOutput: { value in print("即将输出:", value) },
        receiveCompletion: { completion in print("完成:", completion) },
        receiveCancel: { print("被取消") }
    )
    .sink { print("sink 收到:", $0) }

不会改变值、仅用于观察状态。

例如:

订阅开始
即将输出: abc
sink 收到: abc
完成: failure(Error)

4、throttle:限速,只允许第一个通过。

input
    .throttle(for: .seconds(0.5),
              scheduler: RunLoop.main,
              latest: false)
    .sink { print("throttle:", $0) }

可以理解为,当发布多个值时(aaa,bbb,ccc,ddd)后,会先收到第一个值(aaa),在限定的时间过后,收到第二个值(bbb或者ddd,由latest决定)。

5、decode:解码Publisher发布的Data数据。

struct User: Decodable {
    let id: Int
    let name: String
}

URLSession.shared.dataTaskPublisher(for: url)
    .map(\.data)   // DataTask 输出的是 (data, response)
    .decode(type: User.self, decoder: JSONDecoder())
    .sink(
        receiveCompletion: { print($0) },
        receiveValue: { user in print(user) }
    )

当DataTask返回data数据后,decode可以将data数据转换为对应的类型。

常用调度器

Combine很多操作符(debounce、delay)都需要指定scheduler,因为它们的计时器必须挂在某个RunLoop/Queue上。

以下是常用的调度器:

1、DispatchQueue:使用GCD的调度队列。

DispatchQueue.main  // 主线程
DispatchQueue.global(qos: .background)  // 后台线程,(其他选项 .utility, .userInitiated 等)
DispatchQueue(label: "xxx")

2、RunLoop:主线程(UI线程),适用于UI更新。

RunLoop.main

3、OperationQueue:更加细粒度的调度器控制,适用于复杂任务依赖或并发控制。

OperationQueue.main
OperationQueue()

使用频率较低,适用于需要依赖关系、老项目、UIKit项目。

总结

操作符是Publisher的加工函数,本身不会发出事件,只会返回新的Publsiher。

事件流必须被订阅(sink/assign)才能激活。

相关文章

1、Combine发布者Publisher

2、Combine订阅者Subscriber

   

如果您认为这篇文章给您带来了帮助,您可以在此通过支付宝或者微信打赏网站开发者。

欢迎加入我们的 微信交流群QQ交流群,交流更多精彩内容!
微信交流群二维码 QQ交流群二维码

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注