为R shiny中的异步期货获取子流程的PID

server <- function(input, output, session) {
  out1_rows <- reactiveVal()

   observeEvent(input$run1, {
   prog <- Progress$new(session)
   prog$set(message = "Analysis in progress",
         detail = "This may take a while...",
         value = NULL)

  fut1 = future({
  system(paste("Command1" , input$file ">", "out1.txt"))

  system(paste("Command2" , out1.txt ">", "out2.txt"))
  head_rows <- read.delim("out2.txt")
    return(head_rows)
    }) %...>%
     out1_rows() %>%
  finally( ~ prog$close())
NULL
})


 observeEvent(req(out1_rows()), {
 output$out_table <-
  DT::renderDataTable(DT::datatable(
    out1_rows(),
    )
  ))

observeEvent(input$cancel, {
    async_pid <- fut1$job$pid  ##this is empty
    #async_pid <- Sys.getpid()  ##this return PID for main process and kills "/opt/shiny-server/R/SockJSAdapter.R"  but not for subprocesses inside future()
    system(paste("kill -15", async_pid))
  })
}

在这里,我需要杀死在future()中运行命令的进程.我尝试以上述方式获取运行future()过程的PID,并在触发输入$cancel时终止.但是,fut1 $job $pid没有返回任何PID值,因此kill操作不成功.

此链接from future vignettes显示了如何为future()作业获取PID.但是,在我的情况下,我无法在future()中使用Sys.getpid(),因为我不知道如何存储PID值,因为进程已经从我的系统命令返回一些输出.

此页面future GIT显示了使用语法fut1 $job $pid的另一种外部杀戮方式.但这无法获取PID.

在尝试不同的方法或用语法盲目后,我无法弄清楚这一点.有人可能暗示这样做的方式.

最佳答案 能否请您提供完整的可重复示例?

您可能想查看库(future.callr):

使用plan(callr)你可以得到pid并杀死这个过程:

library(future)
library(promises)
library(future.callr)

plan(callr)

myFuture <- future({
  Sys.sleep(5)
  return(runif(1))
})

myFuture$process$get_pid()
myFuture$process$is_alive()

# myFuture$process$kill()
# myFuture$process$is_alive()

then(myFuture, onFulfilled = function(value){
print(value)
}, onRejected = NULL)

编辑 – 根据您的代码改编:

library(shiny)
library(DT)
library(future)
library(promises)
library(future.callr)
library(shinyjs)
library(V8)

plan(callr)

ui <- fluidPage(
  useShinyjs(),
  titlePanel("Trigger & kill future"),
  sidebarLayout(
    sidebarPanel(
      actionButton(inputId="run1", label="run future"),
      actionButton(inputId="cancel", label="kill future")
    ),
    mainPanel(
      dataTableOutput('out_table')
    )
  )
)

server <- function(input, output, session) {

  disable("cancel") 
  out1 <- reactiveValues(rows=NULL)

  observeEvent(input$run1, {

    disable("run1")
    enable("cancel")
    out1$rows <- NULL

    prog <- Progress$new(session)
    prog$set(message = "Analysis in progress",
             detail = "This may take a while...",
             value = NULL)

    fut1 <<- future({
      # system(paste("Command1" , input$file, ">", "out1.txt"))
      # system(paste("Command2" , out1.txt, ">", "out2.txt"))
      # head_rows <- read.delim("out2.txt")
      head_rows <- data.frame(replicate(5, sample(runif(20, 0, 1), 20, rep=TRUE)))
      Sys.sleep(5)
      return(head_rows)
    })

    print(paste("Running async process with PID:", fut1$process$get_pid()))

    then(fut1, onFulfilled = function(value){
      out1$rows <<- value
    }, onRejected = function(error){NULL})

    finally(fut1, function(){
      prog$close()
      disable("cancel")
      enable("run1")
    })

    return(NULL)
  }, ignoreInit = TRUE)


  observeEvent(req(out1$rows), {
    output$out_table <- DT::renderDataTable(DT::datatable(out1$rows))
  })

  observeEvent(input$cancel, {
    async_pid <- fut1$process$get_pid()
    print(paste("Killing PID:", async_pid))
    # system(paste("kill -9", async_pid)) # Linux - kill
    # system(paste("taskkill /f /pid", async_pid)) # Windows - kill
    fut1$process$kill() # library(future.callr) - kill
    out1$rows <- NULL
    disable("cancel")
    enable("run1")
  }, ignoreInit = TRUE)

}

shinyApp(ui = ui, server = server)

偶尔会出现错误:

Unhandled promise error: callr failed, could not start R, exited with non-zero status, has crashed or was killed 
Warning: Error in : callr failed, could not start R, exited with non-zero status, has crashed or was killed 
  95: <Anonymous>

也许@HenrikB的statement告诉我们我们遇到了什么:

However, to get this working properly you probably also need to make
your future expression / future code interrupt aware using
withCallingHandlers() etc. It’ll also unknown to me what happens if
you signal too many interrupts in a row – it might be that you manage
to interrupt the main R-loop of the worker, which then will cause the
R worker to terminate. That’ll result in a missing R worker and you’ve
got that problem you mention at the beginning.

该错误也在here中提到,但目前在将来.callr-context我不知道如何解决它.

第二编辑:
到目前为止,我还从Henrik Bengtsson那里得到了一些feedback.他再次提到核心未来API目前不支持终止期货.所以,无论我们使用什么后端,我们最终都会遇到问题.

忽略这些信息,我将再看一下库(ipc)vignette,它提供了关于杀死长时间运行过程主题的两个例子.但我已经指出了这个here – 这可能导致了question.

最后,所有这些对于您的场景可能毫无用处,因为您正在使用system()调用来创建自己的子进程(并相应地拥有自己的pid).因此,为什么不在系统命令中使用wait = FALSE(正如我在here评论中提到的那样)来获取异步行为并使用myCommand&回声$! (见this).这样你就不需要任何未来.

点赞