Kotlin协程原理
协程原理
suspend 原理
suspend 方法
示例代码:
suspend fun susFun() {
delay(100)
println("hello suspend function.")
}
反编译后:
public static final Object susFun(Continuation<? super kotlin.Unit> r7) {
/*
r4 = -2147483648(0xffffffff80000000, float:-0.0)
boolean r2 = r7 instanceof me.hacket.coroutine.SuspendFunTestKt.susFun.1 // 内部类SuspendFunTestKt$susFun$1
if (r2 == 0) goto L_0x0026
r2 = r7
me.hacket.coroutine.SuspendFunTestKt$susFun$1 r2 = (me.hacket.coroutine.SuspendFunTestKt.susFun.1) r2
int r3 = r2.label
r3 = r3 & r4
if (r3 == 0) goto L_0x0026
int r3 = r2.label
int r3 = r3 - r4
r2.label = r3
L_0x0013:
java.lang.Object r1 = r2.result
java.lang.Object r3 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
int r4 = r2.label
switch(r4) {
case 0: goto L_0x002d;
case 1: goto L_0x003d;
default: goto L_0x001e;
}
L_0x001e:
java.lang.IllegalStateException r2 = new java.lang.IllegalStateException
java.lang.String r3 = "call to 'resume' before 'invoke' with coroutine"
r2.<init>(r3)
throw r2
L_0x0026:
me.hacket.coroutine.SuspendFunTestKt$susFun$1 r0 = new me.hacket.coroutine.SuspendFunTestKt$susFun$1
r0.<init>(r7)
r2 = r0
goto L_0x0013
L_0x002d:
kotlin.ResultKt.throwOnFailure(r1)
r4 = 100
r6 = 1
r2.label = r6
java.lang.Object r2 = kotlinx.coroutines.DelayKt.delay(r4, r2)
if (r2 != r3) goto L_0x0040
r2 = r3
L_0x003c:
return r2
L_0x003d:
kotlin.ResultKt.throwOnFailure(r1)
L_0x0040:
java.lang.String r2 = "hello suspend function."
java.io.PrintStream r3 = java.lang.System.out
r3.println(r2)
kotlin.Unit r2 = kotlin.Unit.INSTANCE
goto L_0x003c
switch-data {0->0x002d, 1->0x003d, }
*/
}
- 就是 CPS 代码,一堆 switch-case 处理不同的状态
- susFun 方法内部类
SuspendFunTestKt$susFun$1是一个 ContinuationImpl:
final class SuspendFunTestKt$susFun$1 extends ContinuationImpl {
int label;
/* synthetic */ Object result;
SuspendFunTestKt$susFun$1(Continuation continuation) {
super(continuation);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return SuspendFunTestKt.susFun(this);
}
}
suspend lambda 是个什么东西?
示例代码:
fun main() {
val mySuspend1: suspend () -> String = {
delay(1000)
"hehe"
}
val javaClass = mySuspend1.javaClass
println("javaClass=${javaClass.simpleName}") // main$mySuspend1$1
println("javaClass.superclass=${javaClass.superclass.simpleName}") // SuspendLambda
}
我们用 jadx 反编译看看:
public final class SuspendTestKt {
public static final void main() {
Class javaClass = new main.mySuspend1.1((Continuation) null).getClass();
System.out.println((Object) ("javaClass=" + javaClass.getSimpleName()));
StringBuilder append = new StringBuilder().append("javaClass.superclass=");
Class<? super Object> superclass = javaClass.getSuperclass();
Intrinsics.checkExpressionValueIsNotNull(superclass, "javaClass.superclass");
System.out.println((Object) append.append(superclass.getSimpleName()).toString());
}
}
// 内部类SuspendTestKt$main$mySuspend1$1
final class SuspendTestKt$main$mySuspend1$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
int label;
SuspendTestKt$main$mySuspend1$1(Continuation continuation) {
super(1, continuation);
}
@NotNull
public final Continuation<Unit> create(@NotNull Continuation<?> continuation) {
Intrinsics.checkParameterIsNotNull(continuation, "completion");
return new SuspendTestKt$main$mySuspend1$1(continuation);
}
public final Object invoke(Object obj) {
return create((Continuation) obj).invokeSuspend(Unit.INSTANCE);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000, this) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "hehe";
}
}
由反编译可知,可以看到 suspend () -> String 是一个 SuspendLambda 并实现了 Function1 接口。
协程原理
以下面代码为例,分析协程执行原理:
fun main() {
val scope = MyContextScope(EmptyCoroutineContext)
scope.launch(Dispatchers.IO) {
println("hello world. ${Thread.currentThread().name}")
}
Thread.sleep(2000)
}
输出:
hello world. DefaultDispatcher-worker-1
反编译生成代码
反编译生成的代码,launch 的第三个参数 block 会生成一个 SuspendLambda 内部类:
public final class HahaKt {
public static final void main() {
BuildersKt.launch$default((CoroutineScope) new MyContextScope(EmptyCoroutineContext.INSTANCE), Dispatchers.getIO(), (CoroutineStart) null, new main.1((Continuation) null), 2, (Object) null);
Thread.sleep(2000);
}
}
// launch的第三个参数block会生成内部类HahaKt$main$1
final class HahaKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
private CoroutineScope p$;
HahaKt$main$1(Continuation continuation) {
super(2, continuation);
}
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> continuation) {
Intrinsics.checkParameterIsNotNull(continuation, "completion");
HahaKt$main$1 hahaKt$main$1 = new HahaKt$main$1(continuation);
CoroutineScope coroutineScope = (CoroutineScope) value;
hahaKt$main$1.p$ = (CoroutineScope) value;
return hahaKt$main$1;
}
public final Object invoke(Object obj, Object obj2) {
return create(obj, (Continuation) obj2).invokeSuspend(Unit.INSTANCE);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineScope coroutineScope = this.p$;
StringBuilder append = new StringBuilder().append("hello world. ");
Thread currentThread = Thread.currentThread();
Intrinsics.checkExpressionValueIsNotNull(currentThread, "Thread.currentThread()");
System.out.println((Object) append.append(currentThread.getName()).toString());
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
}
协程的创建
CoroutineScope.launch
首先看 launch:
// CoroutineScope
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// 根据父级创建新的上下文(协程的父级上下文)
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
// 协程真正的上下文生成是以newContext作为父级上下文生成的
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
// 这是一个CoroutineScope的扩展函数,coroutineContext其实就是拿到到了scope对象的成员
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
- newCoroutineContext 创建一个新的协程上下文,作为生成 XXXCoroutine 的 parentContext
- 根据 CoroutineStart 是否 isLazy,协程是否马上执行
- coroutine.start 执行协程
AbstractCoroutine.start
现在看看 AbstractCoroutine.start:
// AbstractCoroutine.kt
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
if (initParentJob) initParentJob(parentContext[Job])
}
// receiver: StandaloneCoroutine
// block: suspend StandaloneCoroutine.() -> Unit
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// initParentJob()
start(block, receiver, this) // 等同于start.invoke() ,注意第3个参数completion:this
}
}
调用了 CoroutineStart.invoke() 方法,看看它的参数:
- 参数 1:block,要执行的协程体:
suspend StandaloneCoroutine.() -> Unit,、其实就是一个 SuspendLambda - 参数 1:receiver,为一个 AbstractCoroutine,这里为
StandaloneCoroutine - 参数 3:completion,就是 this,即 StandaloneCoroutine:,这个参数很重要,后面要用到
看看 StandaloneCoroutine:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
CoroutineStart.invoke
调用到了 CoroutineStart,CoroutineStart 是一个枚举类,接着看 CoroutineStart#invoke() 方法:
// CoroutineStart
public enum class CoroutineStart {
DEFAULT, LAZY, ATOMIC, UNDISPATCHED;
public val isLazy: Boolean get() = this === LAZY
// block - suspend StandaloneCoroutine.() -> Unit,为SuspendLambda
// receiver - StandaloneCoroutine
// completion - StandaloneCoroutine<Unit>
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
// 根据 start 参数的类型调用不同的方法
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
}
然后调用 startCoroutineCancellable()
(suspend (R) -> T).startCoroutineCancellable 创建 Continuation
这里我们看 CoroutineStart.DEFAULT,然后调用了 block.startCoroutineCancellable(receiver, completion)
// Cancellable.kt
// receiver - StandaloneCoroutine
// completion - StandaloneCoroutine<Unit>
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) { // 抛出异常后,调用Continuation.resumeWith(Result.failure(e))
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
completion.resumeWith(Result.failure(e))
}
}
(suspend (R) -> T).createCoroutineUnintercepted
(suspend (R) -> T).createCoroutineUnintercepted 创建一个 Continuation
现在看看 createCoroutineUnintercepted():
// Cancellable.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>):Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/jvm/src/kotlin/coroutines/jvm/internal/DebugProbes.kt
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
/** implementation of this function is replaced by debugger */
return completion
}
- 通过前面的分析可知道
suspend()->T是一个 SuspendLambda,SuspendLambda 间接继承了 BaseContinuationImpl,上面会走 create() 方法,前面反编译可知 create 方法会创建一个 SuspendLambda,参数为 completion
其继承关系为:SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation - createCoroutineFromSuspendFunction 用来
suspending lambda没有继承 BaseContinuationImpl,具体源码看 IntrinsicsJvm
接着看 create(completion),create 方法创建的 Continuation 是一个 SuspendLambda 对象。
看看 create 反编译后的代码:
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
接着看 intercepted()
Continuation.intercepted() 返回 DispatchedContinuation
接着回到 startCoroutineCancellable 看 intercepted(),通过 ContinuationInterceptor 拦截当前 Continuation
// Cancellable.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// 如果是ContinuationImpl类型,则调用intercepted方法,否则返回自身
// 这里的 this 是 Main$main$1 实例 - ContinuationImpl的子类
(this as? ContinuationImpl)?.intercepted() ?: this
接着看 ContinuationImpl.intercepted():
// ContinuationImpl
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
// context[ContinuationInterceptor]是 CoroutineDispatcher 实例
intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
直接返回 intercepted;如果 intercepted 为 null,取 CoroutineContext 中的 ContinuationInterceptor,并调用其 interceptContinuation()
而 CoroutineDispatcher 实现了 ContinuationInterceptor
// CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
所以 intercepted() 分情况:
- 需要线程调度 - 返回 DispatchedContinuation,其 continuation 参数值为 SuspendLambda
- 不需要线程调度 - 返回 SuspendLambda
协程的启动
接下来看看 resumeCancellableWith 是怎么启动协程的,这里还涉及到 Dispatchers 线程调度的逻辑:
DispatchedContinuation
前面 startCoroutineCancellable() 里,如果有线程调度,那么返回的是 DispatchedContinuation;没有的话返回 SuspendLambda
// DispatchedContinuation
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
override val delegate: Continuation<T>
get() = this
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) { // 不需要调度则直接在当前线程执行协程
resumeUndispatchedWith(result)
}
}
}
}
inline fun resumeUndispatchedWith(result: Result<T>) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
}
}
}
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
// 进行线程调度,最后也会执行到continuation.resumeWith方法
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
// 直接执行continuation.resumeWith方法
else -> resumeWith(result)
}
- 当需要线程调度时,则在调度后会调用 DispatchedContinuation.continuation.resumeWith 来启动协程,其中 continuation 是 SuspendLambda 实例
- 当不需要线程调度时,则直接调用 SuspendLambda.resumeWith 来启动协程
DispatchedContinuation 继承自 DispatchedTask,又继承自 Task,最终实现了 Runnable,那我们看下其 run 方法:
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
- continuation 就是 DispatchedContinuation 构造器中的 continuation,就是 SuspendLambda
- 封装了 Continuation 的
resumeWithException和resume操作逻辑,最终调用的是 SuspendLambda 的 resume 方法
下面看看 SuspendLambda 的类关系:SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
resumeWith 方法调用的是父类 BaseContinuationImpl 中的 resumeWith 方法:
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
// ...
val outcome = invokeSuspend(param)
// ...
}
}
SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
SuspendLambda
由前面可知 suspend () -> T 是一个 SuspendLambda,现在看看 SuspendLambda:
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}
ContinuationImpl
SuspendLambda 继承 ContinuationImpl,接着看看 ContinuationImpl:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> = // 拦截Continuation
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
BaseContinuationImpl
ContinuationImpl 又继承 BaseContinuationImpl,接着看 BaseContinuationImpl:
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) :
// 这个completion就是AbstractCoroutine
Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) { // 死循环
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// SuspendLambda是BaseContinuationImpl
// unrolling recursion via loop
current = completion
param = outcome
} else { // AbstractCoroutine不是BaseContinuationImpl
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
// 子类实现,返回一个Continuation
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
// 子类实现,返回一个Continuation
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
public override fun toString(): String =
"Continuation at ${getStackTraceElement() ?: this::class.java.name}"
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = completion as? CoroutineStackFrame
public override fun getStackTraceElement(): StackTraceElement? =
getStackTraceElementImpl()
}
- create 方法需由子类实现,返回一个 Continuation(类似
SuspendTestKt$main$mySuspend1$1继承自 SuspendLambda) - resumeWith 执行的入口
- invokeSuspend 真正的代码逻辑,即你自己写的协程体的代码
最后协程是调用了 AbstractCoroutine 的 resumeWith
// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
协程启动小结
- 协程的启动是通过 BaseContinuationImpl.resumeWith 方法调用到了子类 SuspendLambda.invokeSuspend 方法,然后通过状态机来控制顺序运行
- 在 BaseContinuationImpl.resumeWith 有个死循环,调用
invokeSuspend来执行具体的协程代码,碰到COROUTINE_SUSPENDED时, - Kotlin 中的协程存在着三层包装
第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。

协程状态机
以下面的代码为例解析一下协程启动的状态机流程:
private suspend fun getId(): String {
return GlobalScope.async(Dispatchers.IO) {
delay(1000)
"hearing"
}.await()
}
private suspend fun getAvatar(id: String): String {
return GlobalScope.async(Dispatchers.IO) {
delay(1000)
"avatar-$id"
}.await()
}
fun main() {
GlobalScope.launch {
val id = getId()
val avatar = getAvatar(id)
println("${Thread.currentThread().name} - $id - $avatar")
}
}
上面 main 方法中,GlobalScope.launch 启动的协程体在执行到 getId 后,协程体会挂起,直到 getId 返回可用结果,才会 resume launch 协程,执行到 getAvatar 也是同样的过程。
协程内部实现使用状态机来处理不同的挂起点,将 GlobalScope.launch 协程体字节码反编译成 Java 代码,大致如下 (有所删减):
private static final Object getId(Continuation $completion) {
return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "hearing";
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null).await($completion);
}
private static final Object getAvatar(final String id, Continuation $completion) {
return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "avatar-" + id;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null).await($completion);
}
public static final void main() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
(CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000;
String id;
label17: {
CoroutineScope $this$launch;
switch(this.label) {
case 0: // a
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
this.label = 1; // label置为1
var10000 = getId(this);
if (var10000 == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
// 若此时已经有结果,则不挂起,直接break
break;
case 1: // b
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2: // d
id = (String)this.L$1;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label17; // 退出label17
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
// c
id = (String)var10000;
this.L$1 = id; // 将id赋给L$1
this.label = 2; // label置为2
var10000 = getAvatar(id, this);
if (var10000 == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
}
// e
String avatar = (String)var10000;
String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
System.out.println(var5);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}
}
invokeSuspend 方法会在协程体中的 suspend 函数得到结果后被调用;具体调用在
BaseContinuationImpl.resumeWith调用
执行流程:
- a: launch 协程体刚执行到 getId 方法时,getId 方法的返回值将是
COROUTINE_SUSPENDED, 此时直接 return, 则 launch 协程体中 getId 后面的代码暂时不会执行,即 launch 协程体被挂起 (非阻塞, 该线程依旧会做其它工作)。这里将 label 置为了 1. 而若此时 getId 已经有结果 (内部没有调用 delay 之类的 suspend 函数等),则不挂起,而是直接 break。 - b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 则当 getId 有可用结果返回后,会重新执行 launch 协程体的 invokeSuspend 方法,根据上面的
label==1, 会执行到这里检查一下 result 没问题的话就 break, 此时 id 赋值给了 var10000。 - c: 在 a 中若直接 break 或 在 b 中得到 getId 的结果然后 break 后,都会执行到这里,得到 id 的值并把 label 置为 2。然后调用 getAvatar 方法,跟 getId 类似,若其返回 COROUTINE_SUSPENDED 则 return,协程被挂起,等到下次 invokeSuspend 被执行,否则离开 label17 接着执行后续逻辑。
- d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 则当 getAvatar 有可用结果返回后会重新调用 launch 协程体的 invokeSuspend 方法,此时根据
label==2来到这里并取得之前的 id 值,检验 result(即 avatar),然后 break label17。 - e: c 中直接返回了可用结果 或 d 中 break label17 后,launch 协程体中的 suspend 函数都执行完毕了,这里会执行剩下的逻辑。
协程的挂起和恢复
Kotlin 编译器会为 协程体 生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在其 invokeSuspend 方法中。
Kotlin 协程的内部实现使用了 Kotlin 编译器的一些编译技术,当 suspend 函数被调用时,都有一个隐式的参数额外传入,这个参数是 Continuation 类型,封装了协程 resume 后执行的代码逻辑。
private suspend fun getId(): String {
return GlobalScope.async(Dispatchers.IO) {
delay(1000)
"hearing"
}.await()
}
// Decompile成Java
final Object getId(@NotNull Continuation $completion) {
// ...
}
其中传入的 $completion 参数,可以看到是调用 getId 方法所在的协程体对象,也就是一个 SuspendLambda 对象。Continuation 的定义如下:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
将 getId 方法编译后的字节码反编译成 Java 代码如下:
final Object getId(@NotNull Continuation $completion) {
// 新建与启动协程
return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "hearing";
}
// ...
}), 2, (Object)null).await($completion); // 调用 await() suspend 函数
}
- 在 getId,delay 未返回值时,返回 COROUTINE_SUSPENDED,即代表还没有值,此时协程挂起,但不阻塞线程;
- 当 suspend 函数有返回值时,会继续调用 invokeSuspend,恢复协程运行
父子协程
launch:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
// AbstractCoroutine
init {
if (initParentJob) initParentJob(parentContext[Job])
}
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
接着看 initParentJob():
// JobSupport
protected fun initParentJob(parent: Job?) { // parent就是父协程的CoroutineContext协程上下文
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
接下来重点在于 parent.attachChild 方法:
public final override fun attachChild(child: ChildJob): ChildHandle {
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}
invokeOnCompletion 方法主要是将 handler 节点添加到父协程的一个队列 (state.list) 中。
GlobalScope.launch 没有父协程
协程完成
协程的完成通过 AbstractCoroutine.resumeWith 实现
// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
调用路径:makeCompletingOnce -> tryMakeCompleting -> tryMakeCompletingSlowPath -> tryWaitForChild:
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
val nextChild = child.nextChild() ?: return false
return tryWaitForChild(state, nextChild, proposedUpdate)
}
可知 tryWaitForChild 方法将 ChildCompletion 节点添加到了子协程的 state.list 队列中,当子协程完成或者取消时调用 ChildCompletion.invoke:
// ChildCompletion
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
// figure out if we need to wait for next child
val waitChild = lastChild.nextChild()
// try wait for next child
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
// no more children to wait -- try update state
val finalState = finalizeFinishingState(state, proposedUpdate)
afterCompletion(finalState)
}
父协程需要等待所有子协程处于完成或者取消状态才能完成自身。
协程取消
// JobSupport.kt
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns state it means it had make it
// completing and had recorded exception
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
在 makeCancelling() 调用了 notifyCancelling()
// JobSupport.kt
// list是一个协程启动时,initParentJob()将自己添加到了父Job的list,封装成了ChildHandleNode添加到父Job的list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
// 会循环执行上面添加的 ChildHandleNode 的 invoke 方法,即循环取消子协程
notifyHandlers<JobCancellingNode>(list, cause)
// then cancel parent // 可能取消父协程
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
下面看看父 parent 如何取消 child,notifyHandlers<JobCancellingNode>(list, cause):
internal abstract class JobCancellingNode : JobNode()
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) // parent取消child
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) // job取消parent
}
// 子协程通过该方法取消自己
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
下面看看 child 如何取消 parent,cancelParent():
private fun cancelParent(cause: Throwable): Boolean {
// isScopedCoroutine 为 true 则不传播且不取消父协程直接返回,默认为false,子类可以重写
// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return true
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}
private class SupervisorCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
// supervisorScope 启动的协程调用 cancel 和传递异常时,只能由父协程向子协程传播,
// 不会取消父协程
override fun childCancelled(cause: Throwable): Boolean = false
}
- 协程调用 cancel 时会取消它的所有子协程,默认不会取消它的父协程
- 协程的取消只是在第一层包装 AbstractCoroutine 中修改协程的状态,不会影响到第二层包装 BaseContinuationImpl 中的执行逻辑,即协程的取消只是修改状态,不会取消协程的实际执行逻辑
协程异常处理
异常处理入口:BaseContinuationImpl.resumeWith:
class BaseContinuationImpl {
fun resumeWith(result: Result<Any?>) {
// ...
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception) // 子协程抛出异常时,在这里捕获并作为结果给 outcome
}
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
在捕获了异常后,调用 AbstractCoroutine.resumeWith 来处理,其流程为:AbstractCoroutine.resumeWith -> JobSupport.makeCompletingOnce -> JobSupport.tryMakeCompleting -> JobSupport.tryMakeCompletingSlowPath
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
val finishing = state as? Finishing ?: Finishing(list, false, null)
var notifyRootCause: Throwable? = null
synchronized(finishing) {
if (finishing.isCompleting) return COMPLETING_ALREADY
finishing.isCompleting = true
if (finishing !== state) {
if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
}
val wasCancelling = finishing.isCancelling
(proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
// If it just becomes cancelling --> must process cancelling notifications
notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
}
// process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
notifyRootCause?.let { notifyCancelling(list, it) }
val child = firstChild(state) // now wait for children
if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
}
- 当协程发生异常时会取消它的所有子协程,默认会取消它的父协程
接下来看看 finalizeFinishingState 方法:
// JobSupport.kt
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
// ...
if (finalException != null) {
val handled = cancelParent(finalException) || handleJobException(finalException)
if (handled) (finalState as CompletedExceptionally).makeHandled()
}
// ...
}
- cancelParent 如果是 CancellationException 会返回 true,抛出 CancellationException 父协程可以不取消自己,忽略掉
- 如果协程抛出未捕获的非取消异常,则会一步步取消上层的协程,最后根协程调用 handleJobException 处理异常
// JobSupport.kt
// 处理未被parent coroutine处理的异常;返回true表示处理掉
protected open fun handleJobException(exception: Throwable): Boolean = false
实现类有 StandaloneCoroutine 和 ActorCoroutine
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
调用 handleCoroutineException 来处理异常:
// CoroutineExceptionHandler.kt
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke an exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let { // 定义了 CoroutineExceptionHandler 则由它处理
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}
// CoroutineExceptionHandlerImpl.kt
// 根据 ServiceLoader, 在 Android 平台中还有 AndroidExceptionPreHandler 处理异常
private val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
CoroutineExceptionHandler::class.java,
CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()
internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
// use additional extension handlers
for (handler in handlers) {
try {
handler.handleException(context, exception)
} catch (t: Throwable) {
// Use thread's handler if custom handler failed to handle exception
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
}
}
// use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
- 不要抛出异常,这是最后一道能处理异常的地方
- 从 CoroutineContext 取出 CoroutineExceptionHandler 来处理异常
- 如果没有 CoroutineExceptionHandler 或者在 CoroutineExceptionHandler 又抛出一个异常 handleCoroutineExceptionImpl 处理
- 没有处理的异常交给 UncaughtExceptionHandler 来处理
- AndroidExceptionPreHandler 是对 CoroutineExceptionHandler 实现的 spi
结构化并发 (Structured Concurrency) 原理
疑问
协程何时需要线程切换?context[ContinuationInterceptor] 什么时候有值
在 CoroutineContext 定义了线程需要切换;
在 newCoroutineContext,默认会添加 Dispatchers.Default,这个时候 context[ContinuationInterceptor] 就会有值
协程如何切线程?
Continuation.intercepted(),ContinuationInterceptor 拦截 Continuation,而 CoroutineDispatcher 实现了 ContinuationInterceptor,所以协程的切换是以拦截器的方式实现的。
协程如何处理异常?
入口:
在 BaseContinuationImpl.resumeWith,Result.failure(exception)
CancellationException 异常,会被忽略掉,不会取消父协程,只会取消其下所有子协程
private fun cancelParent(cause: Throwable): Boolean {
// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return true
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}