简介

协程并不是线程,而是一个异步框架,让开发者能够使用同步的风格去编写异步代码,并且能够以无锁的方式切换线程执行顺序,简化了异步编程。

协程让开发者可以用代码进行线程切换和调度,线程是由系统进行调度

优点:

  • 轻量:可以在单个线程上运行多个协程,因为协程支持挂起,不会使正在运行协程的线程阻塞。
  • 内存泄漏更少:在一个作用域内可以启动多个协程,并在不需要时统一销毁。
  • 内置取消支持:递归的取消协程
  • Jetpack 集成:许多 Jetpack 库都提供支持协程的扩展,或者提供自定义协程作用域。

类似Dart语言的async、await

使用

class LoginViewModel(private val loginRepository: LoginRepository): ViewModel() {
    fun login(username: String, token: String) {
        // 创建协程,在IO线程中运行
        viewModelScope.launch(Dispatchers.IO) {
            val jsonBody = "{ username: \"$username\", token: \"$token\"}"
            loginRepository.makeLoginRequest(jsonBody)
        }
    }
}

launch创建协程,并执行代码块

Dispatcher调度程序,指定在何处运行协程:

  • Dispatchers.Main:主线程,用于更新UI,更新LiveData数据等
  • Dispatchers.IO :用于执行I/O操作,例如数据库、文件、网络I/O
  • Dispatchers.Default:用于执行CPU耗时操作,例如Json解析

Dispatchers内部使用线程池进行调度,不能保证两个同样使用`Dispatchers.IO的代码块按顺序执行

使用withContext在协程中切换线程,withContext是一个挂起函数。

使用suspend声明挂起函数,挂起函数需要在协程作用域、或其他挂起函数中调用

class LoginRepository(...) {
    suspend fun makeLoginRequest(jsonBody: String): Result<LoginResponse> {
        //切换到I/O线程
        return withContext(Dispatchers.IO) {
            //执行网络请求
        }
    }
}
class LoginViewModel(private val loginRepository: LoginRepository): ViewModel() {
    fun login(username: String, token: String) {
        // 在主线程创建一个协程
        viewModelScope.launch {
            val jsonBody = "{ username: \"$username\", token: \"$token\"}"
            // 挂起协程,在I/O线程进行网络请求,直到结果返回
            val result = try {
              loginRepository.makeLoginRequest(jsonBody)
            } catch(e: Exception) {
                //捕获异常
                Result.Error(Exception("Network request failed"))
            }
            // 恢复协程,主线程操作UI
            when (result) {
                is Result.Success<LoginResponse> -> // 执行成功
                else -> // 执行失败
            }
        }
    }
}

上面的代码在请求的函数中使用了withContext切换线程,实际上也可以在ViewModel中切换线程。

这么写的好处是ViewModel调用方不需要关心在哪个线程执行函数:例如makeLoginRequest可以在IO线程、也可以在Default线程,对于调用方来说是未知的,因此将决定权交给函数自身。

  • suspend:挂起当前协程,并保存堆栈帧(函数信息和局部变量)
  • resume:恢复协程,将堆栈帧复制回来,从挂起处继续执行

suspend并不会切换线程,只是告诉协程编译器从当前位置挂起,主线程也可以使用协程,切换线程需要使用withContext调度程序

如果suspend中不创建协程、或者切换线程,则该标记没有实际意义,编译时不会做特殊处理。

launch启动

launch:启动新协程,不等待结果返回

  1. 通过自定义协程作用域launch,可以手动取消协程:viewModelScope.launch
  2. 通过Global作用域launch,无法取消,可能会产生内存泄漏:GlobalScope.launch

async启动

async:启动新协程,使用await获取执行结果。可以并行两个任务,直到两个任务都完成才继续执行。

suspend fun fetchTwoDocs() =
    coroutineScope {
        //使用await等待
        val deferredOne = async { fetchDoc(1) }
        val deferredTwo = async { fetchDoc(2) }
        deferredOne.await()
        deferredTwo.await()
        //使用awaitAll等待
        val deferreds = listOf(     // fetch two docs at the same time
            async { fetchDoc(1) },  // async returns a result for the first doc
            async { fetchDoc(2) }   // async returns a result for the second doc
        )
        deferreds.awaitAll()
    }

类似于Java和Dart的Future

Job

通过launchasync创建协程后会返回一个协程的句柄Job,可以通过Job取消协程、获取协程状态等

协程作用域

  • CoroutineScope:协程作用域,用于管理它创建的所有协程,使用cancel取消所有该作用域内的协程。
  • GlobalScope:全局作用域,无法取消该作用域,只能通过Job取消协程。

不建议使用GlobalScope创建协程(如果一定要使用的话,用于不需要取消的任务)

  • 难以进行单元测试,无法传入测试作用域
  • 不方便统一管理,多个协程无法共享CoroutineContext上下文

如果需要更长生命周期的协程作用域,可以由外部注入

ViewModelKTX扩展库:

  1. 提供预定义的协程作用域(CoroutineScope):viewModelScope
  2. 页面销毁时ViewModel clear会自动取消viewModelScope

此外还有LifecycleKTX扩展库,提供lifecycleScope

CoroutineContext

CoroutineContext:协程上下文

  • Job:控制协程生命周期
  • CoroutineDispatcher:切换线程
  • CoroutineName:协程名称
  • CoroutineExceptionHandler:处理未捕获的异常

重载了+运算符,可以将多个CoroutineContext相加,得到一个新的上下文

suspendCancellableCoroutine

手动resume返回执行结果

协程原理

本质就是CPS+状态机流转,将协程代码编译成CPS写法,通过label状态机,保存代码执行点,并传入上一次挂起的执行结果。

CPS

CPS(Continuation Passing-Style,续体传递风格):简单来说将函数返回结果通过回调参数形式传递给接下来的程序。

常规写法如下

class Test {
    public int plus(int i1, int i2) {
        return i1 + i2;
    }
    public void main() {
        int ret = plus(1, 2);
        // plus下方写两行代码,便于理解
        System.out.println(ret);
        System.out.println("Hello World");
    }
}

转换为CPS写法:

class Test {
    interface Continuation {
        void next(int result);
    }
    public void plus(int i1, int i2, Continuation continuation) {
        int ret = i1 + i2;
        continuation.next(ret);
    }
    public void main() {
        // 把plus下方的代码块使用回调接口包裹起来,作为plus方法的最后一个参数,等原plus方法体执行完之后回调
        plus(1, 2, result -> {
          System.out.println(result);
          System.out.println("Hello World");
        });
    }
}

结合状态机流转

上面的代码看起来没什么问题,但是如果有多次plus操作,会不断地增加嵌套和缩进,可能会变成这样:

class Test {
    // 编译前
    public void main(String[] args) {
        int ret1 = plus(1, 2);
        System.out.println(ret1);
        int ret2 = plus(3, 4);
        System.out.println(ret2);
        int ret3 = plus(5, 6);
        System.out.println(ret3);
    }
    // 编译后
    public void main() {
        plus(1, 2, result -> {
            System.out.println(result);
            plus(3, 4, result -> { 
                System.out.println(result);
                plus(5, 6, result -> {
                    System.out.println(result);
                });
             });
        });
    }
}

结合状态机流转之后,上面的代码可能会变成这样:三次plus调用,划分为了四步状态。消除了代码嵌套

class Test {
    int state = 0;
    public void main() {
        new Continution() {
          @Override
          public void next(int result) {
              switch(state) {
              case 0:
                  state = 1;  // 修改状态,跳到下一个case执行
                  plus(1, 2, this);  // 每次传入this,plus执行完之后再次调用next,传入结果
              case 1:
                  System.out.println(result);
                  state = 2;
                  plus(3, 4, this);
              case 2:
                  System.out.println(result);
                  state = 3plus(5, 6, this);
              case 3:    
                  System.out.println(result);
              }
          }
        }
    }
}

Kotlin协程原理

Kotlin原理类似上述步骤,只不过生成的代码更复杂

Kotlin协程续体函数如下:

//续体函数,调用resumeWith恢复
public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

写一段示例代码

// 定义2个挂起函数
suspend fun foo1(): String = withContext(Dispatchers.IO) { "foo1" }
suspend fun foo2(): String = withContext(Dispatchers.IO) { "foo2" }
class Main {
    fun main() {
        GlobalScope.launch {
            println("Hello")
            val foo1 = foo1() // 挂起协程,执行任务
            println(foo1) // 恢复协程
            val foo2 = foo2() // 挂起协程,执行任务
            println(foo2)
        }
    }
}

使用AS工具反编译Kotlin字节码为Java代码,伪代码如下(原来的格式不好看,这里做下调整):

  1. 可以看到foo1()foo2()经过编译,多了一个Continuation参数
  2. main方法中启动协程,初始状态为0,执行launch中第1、2行代码,并修改状态为1
    1. 打印Hello
    2. 执行foo1()方法挂起协程,并传入this作为续体,等foo1()执行完成之后调用续体函数
  3. foo1()中调用invoke方法,传入foo1()的执行结果,判断状态为1,执行launch中第3、4行代码,并修改状态为2
    1. 打印foo1
    2. 执行foo2()方法挂起协程,并传入this作为续体,等foo1()执行完成之后调用续体函数
  4. foo2()中调用invoke方法,传入foo2()的执行结果,判断状态为2,执行launch中第5行代码并推出
    1. 打印foo2
// FooKt.java
public final class FooKt {
   @Nullable
   public static final Object foo1(@NotNull Continuation $completion) {
     // 内部会经过几层包装,切换线程,执行任务,最终会调用Continuation的resumeWith恢复
     ...
   }

   @Nullable
   public static final Object foo2(@NotNull Continuation $completion) {
      ...
   }
}
// Main.java
public final class Main {
   public final void main() {
      BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int label; //当前状态,初始为0

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var10000;
            label17: {
               Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); // 用于判断是否挂起状态
               switch(this.label) {
               case 0: // 首次调用invoke
                  ResultKt.throwOnFailure($result);
                  System.out.println("Hello"); // 打印Hello
                  this.label = 1; // 将状态改为1
                  var10000 = FooKt.foo1(this);  // 调用挂起函数,将this作为续体传入
                  if (var10000 == var5) { // 判断是挂起状态,直接返回,等待外部下次调用invoke
                     return var5;
                  }
                  break;
               case 1: // 再次调用invoke,并传入foo1的结果恢复协程
                  ResultKt.throwOnFailure($result);
                  var10000 = $result; 
                  System.out.println(var10000); // 打印foo1
                  this.label = 2; // 将状态改为2
                  var10000 = FooKt.foo2(this); // 调用挂起函数,将this作为续体传入
                  if (var10000 == var5) { // 判断是挂起状态,直接返回,等待外部下次调用invoke
                     return var5;
                  }
                  break;
               case 2: // 再次调用invoke,并传入foo2的结果恢复协程
                  ResultKt.throwOnFailure($result);
                  var10000 = $result; 
                  System.out.println(var10000); // 打印foo2
                  return Unit.INSTANCE;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
               }
            }
         }

         // 创建实例
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkParameterIsNotNull(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         // 外部调用Continuation的resumeWith方法恢复协程,最终会调用到invokeSuspend
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 3, (Object)null);
   }
}

结语

参考资料

results matching ""

    No results matching ""