有没有办法获得与下面的代码片段相同的行为,但使用协同程序?
更新的代码段:
fun main(args: Array<String>) = runBlocking {
val executor = Executors.newFixedThreadPool(50)
log.info("Start")
val jobs = List(300) {
executor.submit {
log.info("worker #$it started")
sleep(1000L)
log.info("worker #$it done")
}
}
jobs.forEach { it.get() }
executor.shutdown()
log.info("All done!")
}
如何使用并行因子== 50运行300个作业,但没有创建50个真正的线程?
更新2:解决方案
在再次阅读Coroutines Guide之后,我发现Fan-out example正是我所寻找的.因此,我的例子如下:
fun produceTasks() = produce {
for (taskId in 1..300) {
send(
async(start = CoroutineStart.LAZY) {
delay(1000) // simulate long work
taskId
}
)
}
close()
}
fun launchWorker(index: Int, channel: ProducerJob<Deferred<Int>>) = launch {
channel.consumeEach {
val result = it.await()
log.info("Worker #$index done task #$result")
}
}
fun main(args: Array<String>) = runBlocking {
val tasks = produceTasks()
val workers = List(50) { launchWorker(it + 1, tasks) }
workers.forEach { it.join() }
log.info("Done")
}
最佳答案 首先 – 请
check issue on Gitbub,可能有官方解决方案.
您可以使用协程通道来实现此逻辑
/**
* How it works:
* 1. Executors queue has N items
* 2. Before each execution:
* 2.1. Get item from queue (if it is possible)
* 2.2. Execute action
* 2.3. Return item to queue
*
* Example below allows us to execute methods in the resticted count of threads.
* However we can create object pool from this Semaphore:
* 1. Firstly - push N pooled objects into executors variable (e.g. we will have Channel<PooledObject>
* 2. Then change signature of schedule method: just change arg "()" to PooledObject
*/
class Semapshore(maxParallelWorkers: Int) {
private val executors = Channel<Unit>(maxParallelWorkers).apply {
runBlocking {
repeat(maxParallelWorkers) {
send(Unit)
}
}
}
suspend fun <TResult> schedule(func: () -> TResult): TResult {
val executor = executors.receive()
return try {
func()
} finally {
executors.send(executor)
}
}
}