欢迎光临散文网 会员登陆 & 注册

Coroutine 学习(四)runBlocking async-await withContext 源码分析

2021-11-29 14:45 作者:房顶上的铝皮水塔  | 我要投稿

上篇文章主要分析了协程中的一个重要的部分,也就是CoroutineContext,以及它派生的子类CoroutineInterceptor和CoroutineDispatcher。提到CoroutineInterceptor就不能不提到Continuation,所以上篇文章也稍微说了一下Continuation。 这篇文章,我想从源码分析的视角,看看我们另外几个常见的构建协程代码块的api,它们分别是runBlocking async-await 和 withContext。 

runBlocking比较特别,方法和launch,async-await不一样,并不是属于CoroutineScope的扩展方法。 这个方法设计的主要目的是作为非suspend风格的代码和suspend代码的桥梁。

它本身的代码非常短,但是有几个注意的要点:

  1. runBlocking 会构建自身的CoroutineDispatcher,也就是稍后会提及的EventLoop。它通过EventLoop做自己的事件调度。

  2. EventLoop这个东西是存放在ThreadLocal里面的,也就是说一个线程一个EventLoop。这个和Android本身的Looper是不是很像?

  3. runBlocking构建了一个特殊的coroutine的实例--BlockingCoroutine,它通过这个实例来支持自身的阻塞特性。


runBlocking的调用链和launch也是有相同的地方的,这种相同之处也体现在其他的coroutine builder中。只不过在上篇文章中,分析的重点是CoroutineInterceptor,所以上篇将createCoroutineUnintercepted() 这条链上出了interecpted()之外的方法全部都忽略了。这次我们来看看这部分的全貌

扩展方法(suspend R.()->T).createCoroutineUnintercepted

Kotlin使用编译器的某种方法 ,将suspend function 和 我们写在协程api中的block,我姑且称为suspend lambda进行转化。这里我有一个不知道是否正确的猜想:虽然suspend function 和 suspend lambda 都会被转化为Continuation,但是suspend lambda生成的对应的类型是BaseContinuationImpl。

不过不论如何,返回的类型都是Continuation,接着会调用intercepted。intercepted具体的实现是从CoroutineContext中找到ContinuationInterceptor,这个类型的实例往往是我们传入的CoroutineDispatcher。

仔细分析返回的DispathcedContinuation

 我们是在分析runBlocking的源码,这个CoroutineDispatcher其实就是EventLoop。 在interecepted方法中,找到ContinuationInterceptor之后,就会调用其中的interceptContinuation。

通过CoroutineDispatcher#interceptContinuation会返回一个DispatchedContinuation对象。

这个DispatchedContinuation的两个构造器参数分别是当前的Dispatcher,另外一个是Continuation。后者往上回溯代码的话就可以发现,其实是通过create操作转换的suspend function/lambda。

在DispatcherContinuation#resumeWith方法中可以看出,具体的dispatch和isDispatchNeeded另外交给传入的Dispatcher的实现负责。(上一篇文章中详细的对这些代码进行分析 Continuation & CoroutineInterceptor & Dispatcher,可以参考一下)


这里之所以要强调上述过程中返回DispatchedContinuation,是因为这个对象在后面的执行中有非常重要的作用。


在CoroutineInterceptor返回的DispatchedContinuation中,调用的dispatcher.dispatch具体实现是EventLoopBaseImpl#dispatch,它将DispatchedContinuation即时Continuation也是Runnable,所以dispatch可以将它加入了队列中:

BlockingCoroutine#joinBlocking

我们现在重头看一下runBlocking的代码:

创建了BlockingCoroutine的实例之后,并执行start,这个过程中并没有进行实质上的阻塞操作,具体的阻塞操作藏在coroutine.joinBlocking中。这个方法将存储在队列中的DispatchedContinuation取出,并且执行它的run方法(再次强调,DispatchedContinuation也是一个Runnable)。

run方法就会取出DispatchedContinuation中的构造器参数。上面已经说过,continuation是包装之后的suspend function/lambda,所以整个run方法就是和continuation#resumeWith有关。


最后我们来看看suspend lambda对应的BaseContinuationImpl#resumeWith到底是如何进行协程执行的:

因为协程体中可以声明子协程, 所以协程本身就具有递归的性质。因此,这个方法主要将递归操作展平。可以理解为树的遍历操作,其中invokeSuspend方法是由编译器生成的状态机。


小结一下runBlocking是如何工作的:

  1. 创建coroutine对象,runBlocking中具体而言创建的是BlockingCoroutine。

  2. 执行coroutine.start方法, 这个方法相当重要,因为它会执行过程中会调用Kotlin编译器生成的一些代码,将我们写的suspend lambda/function转化成continuation。转换成的Continuation的实现是BaseContinuationImpl对象。这个continuation后续会被intercepted拦截,并且被包装成DispatchedContinuation。DispatchedContinuation知道当前的Dispatcher是什么,也持有Continuation对象。

  3. DispatchedContinuation被放入队列中,当取出的时候会调用他的Continuation属性的resumeWith,展开递归。

那async-await和runBlocking有什么差异呢?

Async-await

async 和runBlocking 还有launch也是比较相似的。默认的启动逻辑都是CoroutineStart.DEFAULT,我们可以通过await获取这个协程的返回结果。async会返回一个Deferred对象,Deferred是一个Job,这样必然会遵守Job的生命周期:

同时,在async中会创建一个DeferredCoroutine:

上面async的返回值是一个Deferred类型,因为DeferredCoroutine是Deferred的实现类,所以上述代码直接返回了DeferredCoroutine。DeferredCoroutine通过继承JobSupport中的awaitInternal实现返回Continuation中完成的结果:

最后我们来看看withContext:

withContext会根据withContext中设置的context类型进行判断,假设Context不满足在判断中的两个情况,就会构造DispatchedCoroutine。因为DispatchedCoroutine持有构造好的Continuation对象,所以当withContext中的任务执行完成之后,会调用afterCompletion回调继续在原本的CoroutineContext中运行:


总结:

1. 不难看出 launch runBlocking async-await witContext这些协程API都具有相似的逻辑。在开始都会创建一个AbstractCoroutine,在launch中是StandaloneCoroutine,在runBlocking中是BlockingCoroutine等等。这些Coroutine有着自己特殊的地方,比如BlockingCoroutine像队列一样,子任务会加入queue中等到取出执行;DeferredCoroutine有着可以取出结果的能力。

2. 创建好了coroutine之后会调用createCoroutineUnintercepted,通过Kotlin 编译器实现的某种方式将创建AbstractCoroutine作为参数(completion)作为createCoroutineUnintercepted的返回值,被拦截器捕获。AbstractCoroutine会作为DispatchedContinuation的一部分,被对应的Dispatcher进行处理,什么之后做什么样的处理,交给Dispatcher负责。

3. 在处理完成之后会调用Continuation的resumeWith,之后可能会发生切换到之前的Context(也就是调度器)的操作。这里的实现在BaseContinuationImpl中,会执行由Kotin编译器生成的自动机(执行invokeSuspend)方法。


Coroutine 学习(四)runBlocking async-await withContext 源码分析的评论 (共 条)

分享到微博请遵守国家法律