publicactualfun<T>runBlocking(context: CoroutineContext, block: suspendCoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } val currentThread = Thread.currentThread() val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() }
/** * Use this function to start coroutine in a cancellable way, so that it can be cancelled * while waiting to be dispatched. */ internalfun<R, T>(suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation) }
publicinterfaceFunction2<in P1, in P2, out R> : Function<R> { /** Invokes the function with the specified arguments. */ publicoperatorfuninvoke(p1: P1, p2: P2): R }
Kotlin 编译器用 Function1,Function2 … FuncitonX 接口来实现 lambda 表达式,Function 后面的数字表示 lambda 参数的数量。如果 lambda 有 receiver,receiver 会被视为其第一个参数,则 invoke() 的第一个参数为 receiver,后续参数为 lambda 的实际参数。例如,lambda 表达式 val a: Int.(Int, Int) -> Int = { x: Int, y: Int -> this + x + y } 会用以下代码来实现:
1 2 3 4 5 6
Function3a=newFunction3<Integer, Integer, Integer, Object> { /** Invokes the function with the specified arguments. */ publicfinal Object invoke(Integer p1, Integer p2, Integer p3) { return p1 + p2 + p3; } }
对于 suspend lambda,实现则略有不同,例如对于一个空的 lambda: val a: suspend () -> Unit = {},实际上生成的对象通常长这样的:
publicinterfaceContinuation<in T> { /** * The context of the coroutine that corresponds to this continuation. */ publicval context: CoroutineContext
/** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */ publicfunresumeWith(result: Result<T>) }
publicfinaloverridefunresumeWith(result: Result<Any?>) { var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
publicfinaloverridefunresumeWith(result: Result<Any?>) { probeCoroutineResumed(this) val completion = completion ?: error("Completion should not be null")
val outcome: Result<Any?> = try { val outcome = invokeSuspend(result) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (e: Throwable) { Result.failure(e) }
releaseIntercepted()
if (completion is BaseContinuationImpl) { // 递归调用上游 Continuation completion.resumeWith(outcome) } else { // 调用到最顶层 Continuation completion.resumeWith(outcome) } }
privateclassFun1Continuation( val completion: Continuation<Any?> ) : ContinuationImpl<Any?>(completion) { var label = 0 var result: Any? = null var I$0: Int = 0
funfun1(continuation: Continuation<Any?>): Any? { // 如果 continuation 是之前包装过的,直接使用;否则将 continuation 包装成一个 Fun1Continuation,将其作为上游 Continuation 持有 val cont = if (continuation is Fun1Continuation) { if ((continuation.label and 0x80000000) != 0) { continuation.label = continuation.label and 0x7fffffff continuation } else { Fun1Continuation(continuation) } } else { Fun1Continuation(continuation) }
var result = cont.result
run handleAfterFun3@{ run handleAfterFun2@{ when (cont.label) { 0 -> { // 初始状态 Result.throwOnFailure(result) val localInt = 0 cont.I$0 = localInt cont.label = 1 val res = fun2(cont) if (res === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED result = res }
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine") }
} // fun2 执行完毕后的逻辑,无论同步异步都会走到这 val localInt = cont.I$0 cont.I$0 = localInt + result cont.label = 2 val res = fun3(cont) if (res === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED result = res } // fun3 执行完毕后的逻辑,无论同步异步都会走到这 val localInt = cont.I$0 val finalResult = localInt + result return finalResult }
/** * Completes execution of this with coroutine with the specified result. */ publicfinaloverridefunresumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) }
protectedopenfunafterResume(state: Any?): Unit = afterCompletion(state)
overridefunafterCompletion(state: Any?) { // wake up blocked thread if (Thread.currentThread() != blockedThread) unpark(blockedThread) }
@Suppress("UNCHECKED_CAST") funjoinBlocking(): T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true) { @Suppress("DEPRECATION") if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break parkNanos(this, parkNanos) } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() } // now return result val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T } }