kotlin – 有没有办法使用协同程序获得FixedTreadPool行为?

有没有办法获得与下面的代码片段相同的行为,但使用协同程序?

更新的代码段:

fun main(args: Array<String>) = runBlocking {
    val executor = Executors.newFixedThreadPool(50)
    log.info("Start")
    val jobs = List(300) {
        executor.submit {
            log.info("worker #$it started")
            sleep(1000L)
            log.info("worker #$it done")
        }
    }
    jobs.forEach { it.get() }
    executor.shutdown()
    log.info("All done!")
}

如何使用并行因子== 50运行300个作业,但没有创建50个真正的线程?

更新2:解决方案

在再次阅读Coroutines Guide之后,我发现Fan-out example正是我所寻找的.因此,我的例子如下:

fun produceTasks() = produce {
    for (taskId in 1..300) {
        send(
                async(start = CoroutineStart.LAZY) {
                    delay(1000) // simulate long work
                    taskId
                }
        )
    }
    close()
}

fun launchWorker(index: Int, channel: ProducerJob<Deferred<Int>>) = launch {
    channel.consumeEach {
        val result = it.await()
        log.info("Worker #$index done task #$result")
    }
}

fun main(args: Array<String>) = runBlocking {
    val tasks = produceTasks()
    val workers = List(50) { launchWorker(it + 1, tasks) }
    workers.forEach { it.join() }
    log.info("Done")
}

最佳答案 首先 – 请
check issue on Gitbub,可能有官方解决方案.

您可以使用协程通道来实现此逻辑

/**
 * How it works:
 * 1. Executors queue has N items
 * 2. Before each execution:
 * 2.1. Get item from queue (if it is possible)
 * 2.2. Execute action
 * 2.3. Return item to queue
 *
 * Example below allows us to execute methods in the resticted count of threads.
 * However we can create object pool from this Semaphore:
 * 1. Firstly - push N pooled objects into executors variable (e.g. we will have Channel<PooledObject>
 * 2. Then change signature of schedule method: just change arg "()" to PooledObject
 */
class Semapshore(maxParallelWorkers: Int) {
    private val executors = Channel<Unit>(maxParallelWorkers).apply {
        runBlocking {
            repeat(maxParallelWorkers) {
                send(Unit)
            }
        }
    }

    suspend fun <TResult> schedule(func: () -> TResult): TResult {
        val executor = executors.receive()

        return try {
            func()
        } finally {
            executors.send(executor)
        }
    }
}
点赞