RxSwift的核心非常简单,无非就是以下四点:
可观察序列
Observalbe
观察者
Observer
调度者
Scheduler
销毁者
Dispose
可观察序列、观察者,在《RxSwift核心源码探索》中有讲,下面就来看看调度者Scheduler
具体做了哪些事情。调度者scheduler
主要用于控制任务在哪个线程或队列运行,而scheduler
是对GCD
的封装,GCD
我们很熟悉,通过GCD
创建队列,开启线程,开发中所有动作都等价于任务+队列
。
任务:
异步任务
同步任务
队列:
主队列
全局队列
并行队列
串行队列
参见:《GCD部分总结》
在scheduler
中调度队列如下:
MainScheduler
主线程,与UI相关的任务均在该线程下执行SerialDispatchQueueScheduler
相当于GCD
对应的串行队列ConcurrentDispatchQueueScheduler
相当于GCD
并行队列OperationQueueScheduler
相当于NSOperationQueue
管理者可以设置并发数CurrentThreadScheduler
-当前线程
以上几种类型,通过代码能够发现实际上就是对GCD
队列创建的封装,以及Operation
的封装。
1、MainScheduler
表示为主线程。开发中需要执行一些和UI相关的任务,则需要我们切换到该Scheduler
上执行。点击进入MainScheduler
类,如下:
public final class MainScheduler : SerialDispatchQueueScheduler { private let _mainQueue: DispatchQueue var numberEnqueued = AtomicInt(0) /// Initializes new instance of `MainScheduler`. public init() { self._mainQueue = DispatchQueue.main super.init(serialQueue: self._mainQueue) } }复制代码
MainScheduler
继承了SerialDispatchQueueScheduler
串行队列类,当然这里不难理解,因为主队列就是一个特殊的串行队列。在该类中,能够清楚的看到,在初始化对象时,确定了队列类型为主队列self._mainQueue = DispatchQueue.main
。
2、SerialDispatchQueueScheduler
串行队列,有需要串行执行的任务,都需要切换至该scheduler
下。
public class SerialDispatchQueueScheduler : SchedulerType { public typealias TimeInterval = Foundation.TimeInterval public typealias Time = Date /// - returns: Current time. public var now : Date {return Date() }let configuration: DispatchQueueConfiguration /** Constructs new `SerialDispatchQueueScheduler` that wraps `serialQueue`. - parameter serialQueue: Target dispatch queue. - parameter leeway: The amount of time, in nanoseconds, that the system will defer the timer. */ init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) { self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway) } public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {let queue = DispatchQueue(label: internalSerialQueueName, attributes: []) serialQueueConfiguration?(queue) self.init(serialQueue: queue, leeway: leeway) } }复制代码
同样初始化的时候,通过DispatchQueue对attributes
属性置空操作设定了队列为串行队列。
3、ConcurrentDispatchQueueScheduler
并行队列,如下载任务,我们需要多个任务同时进行,则需要切换到当前scheduler
。
public class ConcurrentDispatchQueueScheduler: SchedulerType { public typealias TimeInterval = Foundation.TimeInterval public typealias Time = Date public var now : Date {return Date() }let configuration: DispatchQueueConfiguration public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) { self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway) } @available(iOS 8, OSX 10.10, *) public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) { self.init(queue: DispatchQueue( label: "rxswift.queue./(qos)", qos: qos, attributes: [DispatchQueue.Attributes.concurrent], target: nil), leeway: leeway ) } }复制代码
和我们的串行队列的配置方法参数,一样,不同的是对队列类型attributes
的设置,该处置空为串行,concurrent
为并行。
4、OperationQueueScheduler
用来获取当前线程,看到名称我们应该就能猜到该类就是对OperationQueue
的封装。
public class OperationQueueScheduler: ImmediateSchedulerType { public let operationQueue: OperationQueue public let queuePriority: Operation.QueuePriority public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) { self.operationQueue = operationQueue self.queuePriority = queuePriority } }复制代码
5、CurrentThreadScheduler
表示当前线程,默认就在当前调度上。
public class CurrentThreadScheduler : ImmediateSchedulerType { typealias ScheduleQueue = RxMutableBox/// The singleton instance of the current thread scheduler. public static let instance = CurrentThreadScheduler() private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t inlet key = UnsafeMutablePointer.allocate(capacity: 1) defer {#if swift(>=4.1)key.deallocate()#elsekey.deallocate(capacity: 1)#endif} static var queue : ScheduleQueue? { get {return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance) }set { Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance) } } /// Gets a value that indicates whether the caller must call a `schedule` method. public static fileprivate(set) var isScheduleRequired: Bool { get {return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil }set(isScheduleRequired) {if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 { rxFatalError("pthread_setspecific failed") } } } }复制代码
isScheduleRequired
:用来表示是否必须调用schedule
方法通过
queue
方法的set
方法将当前你线程绑定到相应的key
上,get
方法通过key
获取当前线程,queue
是在schedule
中调用的
schedule
方法:
public func schedule(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {if CurrentThreadScheduler.isScheduleRequired { CurrentThreadScheduler.isScheduleRequired = falselet disposable = action(state) defer { CurrentThreadScheduler.isScheduleRequired = trueCurrentThreadScheduler.queue = nil } guard let queue = CurrentThreadScheduler.queue else {return disposable }while let latest = queue.value.dequeue() {if latest.isDisposed {continue} latest.invoke() }return disposable }let existingQueue = CurrentThreadScheduler.queuelet queue: RxMutableBoxif let existingQueue = existingQueue { queue = existingQueue }else { queue = RxMutableBox(Queue(capacity: 1)) CurrentThreadScheduler.queue = queue }let scheduledItem = ScheduledItem(action: action, state: state) queue.value.enqueue(scheduledItem)return scheduledItem }复制代码
通过Thread.setThreadLocalStorageValue
方法看一下内部做了哪些工作,代码如下:
extension Thread { static func setThreadLocalStorageValue(_ value: T?, forKey key: NSCopying) {let currentThread = Thread.currentlet threadDictionary = currentThread.threadDictionaryif let newValue = value { threadDictionary[key] = newValue }else { threadDictionary[key] = nil } } static func getThreadLocalStorageValueForKey(_ key: NSCopying) -> T? {let currentThread = Thread.currentlet threadDictionary = currentThread.threadDictionary return threadDictionary[key] as? T } }复制代码
threadDictionary
一个可变类型的字典,setThreadLocalStorageValue
绑定当前线程到key
上getThreadLocalStorageValueForKey
通过key
获取绑定的线程
调度者scheduler
使用
模拟一个异步线程处理数据,完成后在主线程展示。
GCD
实现:
DispatchQueue.init(label: "label",qos: .default,attributes:.concurrent).async { var num = 0for i in 0...100{ num += I } DispatchQueue.main.sync {print("num:/(num) /(Thread.current)") } }复制代码
DispatchQueue.init
:初始化一个队列label
:队列标识qos
:设置队列优先级DispatchQueue.main.sync
:返回主线程attributes
:设置队列类型,不设置默认为串行
RxSwift
实现:
Observable.create { (observer) -> Disposable invar num = 0for i in 0...100{ num += I } observer.onNext(num)return Disposables.create() } .subscribeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "yahibo")) .observeOn(MainScheduler.instance) .subscribe(onNext: { (val) inprint(val) }).disposed(by: disposeBag)复制代码
创建一个序列,在内部处理耗时操作
subscribeOn
:决定序列的构造函数在哪个Scheduler
上运行,使用SerialDispatchQueueScheduler
设置为串行队列ObserverOn
:决定在哪个Scheduler
上监听序列,使用MainScheduler
设置为主线程队列中观察
还是我们熟悉的编码方式,通过点语法来设置序列所在线程。
无论是对序列元素的观察,
UI
绑定,还是多线程,在RxSwift
中,统一处理成这种链式的形式,函数与函数之间没有强依赖性,使用灵活,降低了编码的复杂度。