Kotlin语言基础笔记
Kotlin流程控制语句笔记
Kotlin操作符重载与中缀表示法笔记
Kotlin扩展函数和扩展属性笔记
Kotlin空指针安全(null-safety)笔记
Kotlin类型系统笔记
Kotlin面向对象编程笔记
Kotlin委托(Delegation)笔记
Kotlin泛型型笔记
Kotlin函数式编程笔记
Kotlin与Java互操作笔记
Kotlin协程笔记
很多小伙伴可能会觉得Java有了线程、线程池了,我们还要协程(Coroutines)干嘛。这里还是有些区别的。区别有:
- 线程是为了提高CPU的利用率,调度是由操作系统决定的,而协程是为了解决多个任务更好的协作,调度是由我们代码控制的。
- 协程并不是为了取代线程,协程对线程进行抽象,你可以看成协程是一个异步调用的框架,解决了之前线程间协作代码繁琐的问题。
我们先来看一段代码,如下:
data class Product(var id: String, var title: String)
data class Stock(var pid: String, var stock: Int)
data class Pms(var pid: String, var pmsTips: String)
suspend fun getProductsByIds(pids: List<String>): List<Product> {
delay(1000)
return listOf(Product("1", "a"), Product("2", "b"))
}
suspend fun getProductStocksByIds(pids: List<String>): List<Stock> {
delay(2000)
return listOf(Stock("1", 2), Stock("2", 4))
}
suspend fun getProductPMSByIds(pids: List<String>): List<Pms> {
delay(3000)
return listOf(Pms("1", "100减99"), Pms("2", "100减99"))
}
fun combine(products: List<Product>?, productStocks: List<Stock>?, productPMS: List<Pms>?) {
println(products)
println(productStocks)
println(productPMS)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val pids = listOf<String>("1", "2")
val products = async {
withTimeoutOrNull(1500) {
getProductsByIds(pids)
}
}
val productStocks = async {
withTimeoutOrNull(2500) {
getProductStocksByIds(pids)
}
}
val productPMS = async {
withTimeoutOrNull(2500) {
getProductPMSByIds(pids)
}
}
val measureTimeMillis = measureTimeMillis {
combine(products.await(), productStocks.await(), productPMS.await())
}
println(measureTimeMillis)
}
这段代码看起来就像是伪代码,不过还是非常容易理解,就是通过一批商品id,分别调用三个接口拿到商品的信息,商品的库存,商品的优惠信息,然后再合并数据,这个场景无论在后端还是前端都会经常遇到,比如APP调用的一个接口,需要从不同的底层系统获取到不同部分的数据,然后聚合好一次性返回给APP。想想如果是用Java来实现的会有多复杂。用Kotlin的协程实现就像是写顺序执行的代码,但实际上你做的是异步调用。
1.第一个协程代码
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
我们使用lauch
来启动一个协程,其中要注意的是delay
这个函数,看起来它跟Thread.sleep
是一样的作用,但是他们有本质的区别,Thread.sleep
会阻塞当前线程(线程就傻傻的在等待),而delay
是暂停当前的协程,不会阻塞当前线程,这个线程可以去做其他事情。delay
是一个suspending function,它只能运行在协程里面,如果不在协程中运行,会报以下异常。
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
2. runBlocking
runBlocking
函数会阻塞当前线程,一直等到协程运行完。上面的例子可以改成下面的:
fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
3.等待协程完成
延时一段时间来等待协程完成通常不是很高效,我们可以通过join
来实现一旦协程完成就退出main函数。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
4. suspending function 暂停函数
我们也可以使用suspending function重构下。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
注意:delay
也是一个suspending function,所以depay
只能放在suspending function或者协程代码(lanuch)里面。
5. 协程是非常轻量级的
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
启动了10万个协程,最后代码能够成功的执行完成。同样,大家可以试试换成起10万个线程试试,应该会得出OOM的结果。
6. 协程像守护线程
请看下面这段代码:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
输出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
可以知道,等待1.3秒后,main退出了。不会等待launch的协程运行完。
7. 协程取消
launch
返回一个Job对象,它可以被取消:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
输出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
可以看到,一旦调用了job.cancel(),就退出了main函数。Job还有一个cancelAndJoin方法,合并了cancel和join操作。
8. 协程的取消可能需要协作完成
协程的取消可能需要协作完成,所有在kotlinx.coroutines
包下面的suspending functions都可以被取消,但是如果一个协程处在计算中,他是不能被取消的,比如这个例子:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
你可以看到调用取消后,还在打印。
9. 让处于计算中的协程可取消
有两种方式可以做到:
- 最简单的在while循环最后面调用下yield函数。这样就在每次循环后让协程有了被取消的机会,yield是
kotlinx.coroutines
包下的suspending functions。 - 检查协程取消的状态,如果发现被取消,则退出循环。
下面我们以第二种方式演示下:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
isActive是协程的CoroutineScope的一个属性。
10. 协程中try catch finally
当协程被取消时,catch和finally可以被执行。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}catch (e:Throwable){
println("I'm running catch")
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
输出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running catch
I'm running finally
main: Now I can quit.
11. withContext函数
在上个例子中,如果我们在finally块中调用suspending functions的话,会抛出CancellationException,因为协程已经被取消了。不过一般来说没什么太大问题,只要不调用suspending functions。如果你一定要在调用的话,你可以使用withContext(NonCancellable) {...}
。如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
12. Timeout超时
如果要设定协程调用超时时间,我们可以使用withTimeout函数,如下:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
输出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
如果超时的时候你不想抛出异常,你可以使用withTimeoutOrNull函数,超时的时候它会返回null。
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
输出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
13. 使用async并发调用
async与launch类似,它也是启动一个协程,只不过lauch返回的是Job(没有返回值),而async返回的是Deferred(带返回值),你可以使用.await()
来获取Deferred的值。Deferred是Job的子类,所以Deferred也可以被取消。看看下面这段代码:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
输出如下:
The answer is 42
Completed in 1016 ms
因为是并行调用,所以时间差不多是1秒。
14. async延时调用
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
如果async带上了start = CoroutineStart.LAZY
参数,协程不会立即执行,会等到调用await的时候才开始执行。上面代码输出如下:
The answer is 42
Completed in 2017 ms
执行结果看起来变成了顺序执行,那是因为one.await执行完成之后,才会开始调用two.await()执行。所以变成了顺序执行。
15. Async-style functions
// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
上面两个方法xxxAsync
并不是suspending functions,所以他们可以在任何地方调用。
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
16. Dispatchers and threads
launch和async都接收一个可选的CoroutineContext参数可以用来指定CoroutineDispatcher。如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
输出如下:
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
默认的dispatcher是DefaultDispatcher当前的实现是CommonPool
17. Unconfined vs confined dispatcher
Unconfined dispatcher会在当前线程开始执行协程,但是仅仅是在第一个暂停点,之后它恢复后的dispatcher取决于那个线程执行suspending function。
coroutineContext 是CoroutineScope的一个属性,它的dispatcher会继承它parent线程的dispatcher。 代码如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
输出如下:
'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main
所以,coroutineContext继承了runBlocking的main线程,而unconfined恢复后变成了default executor线程。
18. 线程切换
加上-Dkotlinx.coroutines.debug
JVM参数运行下面的代码:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
这里展示了几个用法:一个是使用runBlocking指明一个特殊的Context,另外一个是使用withContext来切换Context,输出如下:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
还有就是run
来释放线程。
19. 通过Context来获取Job
协程的Job是Context的一个属性,如下:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
19. 子协程
在协程中使用coroutineContext来启动另一个协程,新协程的Job变成了父协程的子Job,当父协程取消时,子协程也会被取消。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
输出结果如下:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
20. Context联合
协程Context可以使用+
联合,如下:
fun main(args: Array<String>) = runBlocking<Unit> {
// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
job是request的子协程,但是是在CommonPool的线程中执行操作。所以取消request,job也会取消。
21. 父协程会等待子协程完成
父协程会等待子协程完成,不需要使用join来等待他们完成。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
输出如下:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
22. Tricks
假如我们现在在写一个anroid app,在activity中启动了很多协程异步调用接口获取数据,当这个activity被destory后,所有的协程需要被取消,要不然就可能会发生内存泄漏。
我们可以创建一个Job实例,然后使用launch(coroutineContext, parent = job)
来明确指定parent job。
这样的话,我们可以调用Job.cancel来取消所有的子协程,而Job.join可以等待所有的子协程完成。如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
输出如下:
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
23. channel, select, actor
请看:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md