源码

RxSwift调度者scheduler

RxSwift的核心非常简单,无非就是以下四点:

  • 可观察序列 Observalbe

  • 观察者 Observer

  • 调度者 Scheduler

  • 销毁者 Dispose

可观察序列、观察者,在《RxSwift核心源码探索》中有讲,下面就来看看调度者Scheduler具体做了哪些事情。调度者scheduler主要用于控制任务在哪个线程或队列运行,而scheduler是对GCD的封装,GCD我们很熟悉,通过GCD创建队列,开启线程,开发中所有动作都等价于任务+队列

任务:

  • 异步任务

  • 同步任务

队列:

  • 主队列

  • 全局队列

  • 并行队列

  • 串行队列

参见:《GCD部分总结》

scheduler中调度队列如下:

  • MainScheduler主线程,与UI相关的任务均在该线程下执行

  • SerialDispatchQueueScheduler相当于GCD对应的串行队列

  • ConcurrentDispatchQueueScheduler相当于GCD并行队列

  • OperationQueueScheduler相当于NSOperationQueue管理者可以设置并发数

  • CurrentThreadScheduler-当前线程

scheduler.png

以上几种类型,通过代码能够发现实际上就是对GCD队列创建的封装,以及Operation的封装。

1、MainScheduler

表示为主线程。开发中需要执行一些和UI相关的任务,则需要我们切换到该Scheduler上执行。点击进入MainScheduler类,如下:

public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    var numberEnqueued = AtomicInt(0)

    /// Initializes new instance of `MainScheduler`.
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
}复制代码

MainScheduler继承了SerialDispatchQueueScheduler串行队列类,当然这里不难理解,因为主队列就是一个特殊的串行队列。在该类中,能够清楚的看到,在初始化对象时,确定了队列类型为主队列self._mainQueue = DispatchQueue.main

2、SerialDispatchQueueScheduler

串行队列,有需要串行执行的任务,都需要切换至该scheduler下。

public class SerialDispatchQueueScheduler : SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    /// - returns: Current time.
    public var now : Date {return Date()
    }let configuration: DispatchQueueConfiguration
    
    /**
    Constructs new `SerialDispatchQueueScheduler` that wraps `serialQueue`.

    - parameter serialQueue: Target dispatch queue.
    - parameter leeway: The amount of time, in nanoseconds, that the system will defer the timer.
    */
    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }

    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }
}复制代码

同样初始化的时候,通过DispatchQueue对attributes属性置空操作设定了队列为串行队列。

3、ConcurrentDispatchQueueScheduler

并行队列,如下载任务,我们需要多个任务同时进行,则需要切换到当前scheduler

public class ConcurrentDispatchQueueScheduler: SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    public var now : Date {return Date()
    }let configuration: DispatchQueueConfiguration
     
    public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
    }
    @available(iOS 8, OSX 10.10, *)
    public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue(
            label: "rxswift.queue./(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent],
            target: nil),
            leeway: leeway
        )
    }
}复制代码

和我们的串行队列的配置方法参数,一样,不同的是对队列类型attributes的设置,该处置空为串行,concurrent为并行。

4、OperationQueueScheduler

用来获取当前线程,看到名称我们应该就能猜到该类就是对OperationQueue的封装。

public class OperationQueueScheduler: ImmediateSchedulerType {
    public let operationQueue: OperationQueue
    public let queuePriority: Operation.QueuePriority

    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
}复制代码

5、CurrentThreadScheduler

表示当前线程,默认就在当前调度上。

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox/// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t inlet key = UnsafeMutablePointer.allocate(capacity: 1)
        defer {#if swift(>=4.1)key.deallocate()#elsekey.deallocate(capacity: 1)#endif}

    static var queue : ScheduleQueue? {
        get {return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }set(isScheduleRequired) {if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
                rxFatalError("pthread_setspecific failed")
            }
        }
    }
}复制代码
  • isScheduleRequired:用来表示是否必须调用schedule方法

  • 通过queue方法的set方法将当前你线程绑定到相应的key上,get方法通过key获取当前线程,queue是在schedule中调用的

schedule方法:

public func schedule(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {if CurrentThreadScheduler.isScheduleRequired {
        CurrentThreadScheduler.isScheduleRequired = falselet disposable = action(state)

        defer {
            CurrentThreadScheduler.isScheduleRequired = trueCurrentThreadScheduler.queue = nil
        }

        guard let queue = CurrentThreadScheduler.queue else {return disposable
        }while let latest = queue.value.dequeue() {if latest.isDisposed {continue}
            latest.invoke()
        }return disposable
    }let existingQueue = CurrentThreadScheduler.queuelet queue: RxMutableBoxif let existingQueue = existingQueue {
        queue = existingQueue
    }else {
        queue = RxMutableBox(Queue(capacity: 1))
        CurrentThreadScheduler.queue = queue
    }let scheduledItem = ScheduledItem(action: action, state: state)
    queue.value.enqueue(scheduledItem)return scheduledItem
}复制代码

通过Thread.setThreadLocalStorageValue方法看一下内部做了哪些工作,代码如下:

extension Thread {
    static func setThreadLocalStorageValue(_ value: T?, forKey key: NSCopying) {let currentThread = Thread.currentlet threadDictionary = currentThread.threadDictionaryif let newValue = value {
            threadDictionary[key] = newValue
        }else {
            threadDictionary[key] = nil
        }
    }

    static func getThreadLocalStorageValueForKey(_ key: NSCopying) -> T? {let currentThread = Thread.currentlet threadDictionary = currentThread.threadDictionary        return threadDictionary[key] as? T
    }
}复制代码
  • threadDictionary一个可变类型的字典,

  • setThreadLocalStorageValue绑定当前线程到key

  • getThreadLocalStorageValueForKey通过key获取绑定的线程

调度者scheduler使用

模拟一个异步线程处理数据,完成后在主线程展示。

GCD实现:

DispatchQueue.init(label: "label",qos: .default,attributes:.concurrent).async {
    var num = 0for i in 0...100{
        num += I
    }
    DispatchQueue.main.sync {print("num:/(num)  /(Thread.current)")
    }
}复制代码
  • DispatchQueue.init:初始化一个队列

  • label:队列标识

  • qos:设置队列优先级

  • DispatchQueue.main.sync:返回主线程

  • attributes:设置队列类型,不设置默认为串行

RxSwift实现:

Observable.create { (observer) -> Disposable invar num = 0for i in 0...100{
            num += I
        }
        observer.onNext(num)return Disposables.create()
    }
    .subscribeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "yahibo"))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { (val) inprint(val)
    }).disposed(by: disposeBag)复制代码
  • 创建一个序列,在内部处理耗时操作

  • subscribeOn:决定序列的构造函数在哪个Scheduler上运行,使用SerialDispatchQueueScheduler设置为串行队列

  • ObserverOn:决定在哪个Scheduler上监听序列,使用MainScheduler设置为主线程队列中观察

还是我们熟悉的编码方式,通过点语法来设置序列所在线程。

无论是对序列元素的观察,UI绑定,还是多线程,在RxSwift中,统一处理成这种链式的形式,函数与函数之间没有强依赖性,使用灵活,降低了编码的复杂度。

(0)

本文由 投稿者 创作,文章地址:https://blog.isoyu.com/archives/rxswiftdiaoduzhescheduler.html
采用知识共享署名4.0 国际许可协议进行许可。除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。最后编辑时间为:8 月 16, 2019 at 04:01 下午

热评文章