简介
协程并不是线程,而是一个异步框架,让开发者能够使用同步的风格去编写异步代码,并且能够以无锁的方式切换线程执行顺序,简化了异步编程。
协程让开发者可以用代码进行线程切换和调度,线程是由系统进行调度
优点:
- 轻量:可以在单个线程上运行多个协程,因为协程支持挂起,不会使正在运行协程的线程阻塞。
- 内存泄漏更少:在一个作用域内可以启动多个协程,并在不需要时统一销毁。
- 内置取消支持:递归的取消协程
- Jetpack 集成:许多 Jetpack 库都提供支持协程的扩展,或者提供自定义协程作用域。
类似Dart语言的async、await
使用
class LoginViewModel(private val loginRepository: LoginRepository): ViewModel() {
fun login(username: String, token: String) {
// 创建协程,在IO线程中运行
viewModelScope.launch(Dispatchers.IO) {
val jsonBody = "{ username: \"$username\", token: \"$token\"}"
loginRepository.makeLoginRequest(jsonBody)
}
}
}
launch
创建协程,并执行代码块
Dispatcher
调度程序,指定在何处运行协程:
Dispatchers.Main
:主线程,用于更新UI,更新LiveData数据等Dispatchers.IO
:用于执行I/O操作,例如数据库、文件、网络I/ODispatchers.Default
:用于执行CPU耗时操作,例如Json解析
Dispatchers内部使用线程池进行调度,不能保证两个同样使用
`Dispatchers.IO
的代码块按顺序执行。
使用withContext
在协程中切换线程,withContext
是一个挂起函数。
使用
suspend
声明挂起函数,挂起函数需要在协程作用域、或其他挂起函数中调用
class LoginRepository(...) {
suspend fun makeLoginRequest(jsonBody: String): Result<LoginResponse> {
//切换到I/O线程
return withContext(Dispatchers.IO) {
//执行网络请求
}
}
}
class LoginViewModel(private val loginRepository: LoginRepository): ViewModel() {
fun login(username: String, token: String) {
// 在主线程创建一个协程
viewModelScope.launch {
val jsonBody = "{ username: \"$username\", token: \"$token\"}"
// 挂起协程,在I/O线程进行网络请求,直到结果返回
val result = try {
loginRepository.makeLoginRequest(jsonBody)
} catch(e: Exception) {
//捕获异常
Result.Error(Exception("Network request failed"))
}
// 恢复协程,主线程操作UI
when (result) {
is Result.Success<LoginResponse> -> // 执行成功
else -> // 执行失败
}
}
}
}
上面的代码在请求的函数中使用了
withContext
切换线程,实际上也可以在ViewModel
中切换线程。这么写的好处是ViewModel调用方不需要关心在哪个线程执行函数:例如
makeLoginRequest
可以在IO线程、也可以在Default线程,对于调用方来说是未知的,因此将决定权交给函数自身。
suspend
:挂起当前协程,并保存堆栈帧(函数信息和局部变量)resume
:恢复协程,将堆栈帧复制回来,从挂起处继续执行
suspend
并不会切换线程,只是告诉协程编译器从当前位置挂起,主线程也可以使用协程,切换线程需要使用withContext
调度程序如果suspend中不创建协程、或者切换线程,则该标记没有实际意义,编译时不会做特殊处理。
launch启动
launch
:启动新协程,不等待结果返回
- 通过自定义协程作用域launch,可以手动取消协程:
viewModelScope.launch
- 通过Global作用域launch,无法取消,可能会产生内存泄漏:
GlobalScope.launch
async启动
async
:启动新协程,使用await
获取执行结果。可以并行两个任务,直到两个任务都完成才继续执行。
suspend fun fetchTwoDocs() =
coroutineScope {
//使用await等待
val deferredOne = async { fetchDoc(1) }
val deferredTwo = async { fetchDoc(2) }
deferredOne.await()
deferredTwo.await()
//使用awaitAll等待
val deferreds = listOf( // fetch two docs at the same time
async { fetchDoc(1) }, // async returns a result for the first doc
async { fetchDoc(2) } // async returns a result for the second doc
)
deferreds.awaitAll()
}
类似于Java和Dart的Future
Job
通过launch
、async
创建协程后会返回一个协程的句柄Job,可以通过Job取消协程、获取协程状态等
协程作用域
CoroutineScope
:协程作用域,用于管理它创建的所有协程,使用cancel
取消所有该作用域内的协程。GlobalScope
:全局作用域,无法取消该作用域,只能通过Job取消协程。
不建议使用
GlobalScope
创建协程(如果一定要使用的话,用于不需要取消的任务)
- 难以进行单元测试,无法传入测试作用域
- 不方便统一管理,多个协程无法共享
CoroutineContext
上下文如果需要更长生命周期的协程作用域,可以由外部注入
ViewModel
KTX扩展库:
- 提供预定义的协程作用域(
CoroutineScope
):viewModelScope
- 页面销毁时ViewModel clear会自动取消
viewModelScope
此外还有Lifecycle
KTX扩展库,提供lifecycleScope
CoroutineContext
CoroutineContext
:协程上下文
Job
:控制协程生命周期CoroutineDispatcher
:切换线程CoroutineName
:协程名称CoroutineExceptionHandler
:处理未捕获的异常
重载了+
运算符,可以将多个CoroutineContext
相加,得到一个新的上下文
suspendCancellableCoroutine
手动resume返回执行结果
协程原理
本质就是CPS+状态机流转,将协程代码编译成CPS写法,通过label状态机,保存代码执行点,并传入上一次挂起的执行结果。
CPS
CPS(Continuation Passing-Style,续体传递风格):简单来说将函数返回结果通过回调参数形式传递给接下来的程序。
常规写法如下
class Test {
public int plus(int i1, int i2) {
return i1 + i2;
}
public void main() {
int ret = plus(1, 2);
// plus下方写两行代码,便于理解
System.out.println(ret);
System.out.println("Hello World");
}
}
转换为CPS写法:
class Test {
interface Continuation {
void next(int result);
}
public void plus(int i1, int i2, Continuation continuation) {
int ret = i1 + i2;
continuation.next(ret);
}
public void main() {
// 把plus下方的代码块使用回调接口包裹起来,作为plus方法的最后一个参数,等原plus方法体执行完之后回调
plus(1, 2, result -> {
System.out.println(result);
System.out.println("Hello World");
});
}
}
结合状态机流转
上面的代码看起来没什么问题,但是如果有多次plus操作,会不断地增加嵌套和缩进,可能会变成这样:
class Test {
// 编译前
public void main(String[] args) {
int ret1 = plus(1, 2);
System.out.println(ret1);
int ret2 = plus(3, 4);
System.out.println(ret2);
int ret3 = plus(5, 6);
System.out.println(ret3);
}
// 编译后
public void main() {
plus(1, 2, result -> {
System.out.println(result);
plus(3, 4, result -> {
System.out.println(result);
plus(5, 6, result -> {
System.out.println(result);
});
});
});
}
}
结合状态机流转之后,上面的代码可能会变成这样:三次plus调用,划分为了四步状态。消除了代码嵌套
class Test {
int state = 0;
public void main() {
new Continution() {
@Override
public void next(int result) {
switch(state) {
case 0:
state = 1; // 修改状态,跳到下一个case执行
plus(1, 2, this); // 每次传入this,plus执行完之后再次调用next,传入结果
case 1:
System.out.println(result);
state = 2;
plus(3, 4, this);
case 2:
System.out.println(result);
state = 3;
plus(5, 6, this);
case 3:
System.out.println(result);
}
}
}
}
}
Kotlin协程原理
Kotlin原理类似上述步骤,只不过生成的代码更复杂
Kotlin协程续体函数如下:
//续体函数,调用resumeWith恢复
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
写一段示例代码
// 定义2个挂起函数
suspend fun foo1(): String = withContext(Dispatchers.IO) { "foo1" }
suspend fun foo2(): String = withContext(Dispatchers.IO) { "foo2" }
class Main {
fun main() {
GlobalScope.launch {
println("Hello")
val foo1 = foo1() // 挂起协程,执行任务
println(foo1) // 恢复协程
val foo2 = foo2() // 挂起协程,执行任务
println(foo2)
}
}
}
使用AS工具反编译Kotlin字节码为Java代码,伪代码如下(原来的格式不好看,这里做下调整):
- 可以看到
foo1()
和foo2()
经过编译,多了一个Continuation
参数 - main方法中启动协程,初始状态为0,执行
launch
中第1、2行代码,并修改状态为1- 打印Hello
- 执行
foo1()
方法挂起协程,并传入this作为续体,等foo1()
执行完成之后调用续体函数
foo1()
中调用invoke
方法,传入foo1()
的执行结果,判断状态为1,执行launch
中第3、4行代码,并修改状态为2- 打印foo1
- 执行
foo2()
方法挂起协程,并传入this作为续体,等foo1()
执行完成之后调用续体函数
foo2()
中调用invoke
方法,传入foo2()
的执行结果,判断状态为2,执行launch
中第5行代码并推出- 打印foo2
// FooKt.java
public final class FooKt {
@Nullable
public static final Object foo1(@NotNull Continuation $completion) {
// 内部会经过几层包装,切换线程,执行任务,最终会调用Continuation的resumeWith恢复
...
}
@Nullable
public static final Object foo2(@NotNull Continuation $completion) {
...
}
}
// Main.java
public final class Main {
public final void main() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label; //当前状态,初始为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000;
label17: {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); // 用于判断是否挂起状态
switch(this.label) {
case 0: // 首次调用invoke
ResultKt.throwOnFailure($result);
System.out.println("Hello"); // 打印Hello
this.label = 1; // 将状态改为1
var10000 = FooKt.foo1(this); // 调用挂起函数,将this作为续体传入
if (var10000 == var5) { // 判断是挂起状态,直接返回,等待外部下次调用invoke
return var5;
}
break;
case 1: // 再次调用invoke,并传入foo1的结果恢复协程
ResultKt.throwOnFailure($result);
var10000 = $result;
System.out.println(var10000); // 打印foo1
this.label = 2; // 将状态改为2
var10000 = FooKt.foo2(this); // 调用挂起函数,将this作为续体传入
if (var10000 == var5) { // 判断是挂起状态,直接返回,等待外部下次调用invoke
return var5;
}
break;
case 2: // 再次调用invoke,并传入foo2的结果恢复协程
ResultKt.throwOnFailure($result);
var10000 = $result;
System.out.println(var10000); // 打印foo2
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
}
// 创建实例
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
// 外部调用Continuation的resumeWith方法恢复协程,最终会调用到invokeSuspend
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
}
结语
参考资料