多线程 – Akka HTTP和长时间运行的请求

我们有一个用简单的
Scala Akka HTTP实现的API – 面向繁重计算(CPU和内存密集型)的几条路径.没有集群 – 所有都在一台强大的机器上运行.计算量很大 – 一个隔离请求可能需要60多秒才能完成.我们并不关心这么快的速度.没有阻塞IO,只有很多CPU处理.

当我开始对该事物进行性能测试时,一个有趣的模式表明:请求A1,A2,…,A10通过.他们使用的资源非常多,事实证明,Akka将为请求A5-A10返回HTTP 503.问题是即使没有人拿起结果,计算仍在运行.

从那里我们看到级联性能崩溃:请求A11-A20到达仍在处理请求A5-A10的服务器.显然,这些新请求也有可能超出 – 甚至更高,因为服务器更繁忙.因此,当Akka触发超时时,它们中的一些将运行,使服务器更加繁忙和更慢,然后新一批请求通过……所以在运行系统一段时间后,您会看到几乎所有请求点开始失败并超时.在您停止加载后,您会在日志中看到一些请求仍在处理中.

我尝试在单独的ExecutionContext和系统调度程序中运行计算,尝试使其完全异步(通过Future组合),但结果仍然相同.繁琐的工作使服务器如此繁忙,最终几乎每个请求都失败了.

https://github.com/zcox/spray-blocking-test中描述了类似的情况,但重点转移到那里 – / ping对于我们来说无关紧要,因为在处理长时间运行的请求的端点上的责任或多或少是稳定的.

问题是:如何设计我的应用程序以更好地中断挂起请求?在重负载下,我可以容忍一小部分失败的请求,但几秒钟后将整个系统停止运行是不可接受的.

最佳答案 Akka HTTP不会自动终止已超时的请求的处理.通常需要额外的簿记才能获得回报,因此默认情况下不会打开.我认为这是一种疏忽,TBH,我自己也遇到过与Akka HTTP类似的问题.

我认为您需要在请求超时时手动中止处理,否则服务器在超载时将无法恢复,如您所见.

没有一种标准机制可以用来实现它(参见“How to cancel Future in Scala?”).如果线程在没有i / o的情况下进行CPU工作,那么Thread.interrupt()将没有用处.相反,您应该创建一个截止日期或承诺或类似情况,以显示请求是否仍处于打开状态,并在计算过程中传递并定期检查超时:

// in the HTTP server class:
val responseTimeout: Duration = 30.seconds

val routes = 
  path("slowComputation") {
    complete {
      val responseTimeoutDeadline: Deadline = responseTimeout.fromNow
      computeSlowResult(responseTimeoutDeadline)
    }
  }

// in the processing code:
def computeSlowResult(responseDeadline: Deadline): Future[HttpResponse] = Future {
  val gatherInputs: List[_] = ???
  gatherInputs.fold(0) { (acc, next) =>

    // check if the response has timed out
    if (responseDeadline.isOverdue())
      throw new TimeoutException()

    acc + next // proceed with the calculation a little
  }
}

(检查Promise是否已经完成将比检查截止日期是否已经过期便宜得多.我已经将后面的代码放在上面,因为它更容易编写.)

点赞