haskell – MonadBaseControl:如何解除ThreadGroup

在模块
Control.Concurrent.Thread.Group中的
threads包中有一个函数forkIO:

forkIO :: ThreadGroup -> IO α -> IO (ThreadId, IO (Result α))

我想从monad-control使用MonadBaseControl解除它.这是我的尝试:

fork :: (MonadBase IO m) => TG.ThreadGroup -> m α -> m (ThreadId, m (Result α))
fork tg action = control (\runInBase -> TG.forkIO tg (runInBase action))

这是错误消息:

Couldn't match type `(ThreadId, IO (Result (StM m α)))'
              with `StM m (ThreadId, m (Result α))'
Expected type: IO (StM m (ThreadId, m (Result α)))
  Actual type: IO (ThreadId, IO (Result (StM m α)))
In the return type of a call of `TG.forkIO'
In the expression: TG.forkIO tg (runInBase action)
In the first argument of `control', namely
  `(\ runInBase -> TG.forkIO tg (runInBase action))'

要使类型匹配需要更改的内容?

最佳答案 主要问题是IO是forkIO的一个参数.要在IO中分叉操作,我们需要一种方法来运行m a到IO a.为此,我们可以尝试使具有runBase :: MonadBase的monad类b m => m a – > b一种方法,但很少有趣的变形金刚可以提供.如果我们考虑例如StateT转换器,它可以弄清楚如果首先有机会观察它自己的状态,如何用runStateT在base monad中运行一些东西.

runFork :: Monad m => StateT s m a -> StateT s m (m b)
runFork x = do
    s <- get
    return $do
        (a, s') <- runStateT x s
        return a

这表明类型runForkBase :: MonadBase b m => m a – > m(b a),我们将为以下类型类定居.

{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}

import Control.Monad.Base

class (MonadBase b m) => MonadRunForkBase b m | m -> b where
    runForkBase :: m a -> m (b a)

我在名称中添加了Fork这个词,以强调未来的状态变化通常不会在两个期货之间分享.出于这个原因,像WriterT这样可以提供runBase的少数有趣的变换器只提供了一个无趣的runBase;它们会产生永远不会被观察到的副作用.

我们可以使用由MonadRunForkBase IO m实例提供的有限降低形式来编写类似fork的东西.我要从正常的forkIO中取出正常的forkIO,而不是threads的那个,你也可以这样做.

{-# LANGUAGE FlexibleContexts #-}

import Control.Concurrent

forkInIO :: (MonadRunForkBase IO m) => m () -> m ThreadId
forkInIO action = runForkBase action >>= liftBase . forkIO

实例

这提出了一个问题,“我们可以为MonadRunForkBase实例提供哪些变换器”?直接蝙蝠,我们可以轻松地为任何具有MonadBase实例的基础monad提供它们

import Control.Monad.Trans.Identity
import GHC.Conc.Sync (STM)

instance MonadRunForkBase [] [] where runForkBase = return 
instance MonadRunForkBase IO IO where runForkBase = return
instance MonadRunForkBase STM STM where runForkBase = return
instance MonadRunForkBase Maybe Maybe where runForkBase = return
instance MonadRunForkBase Identity Identity where runForkBase = return

对于变形金刚来说,通常更容易构建这样的功能.这是可以在直接底层monad中运行fork的变换器类.

import Control.Monad.Trans.Class

class (MonadTrans t) => MonadTransRunFork t where
    runFork :: Monad m => t m a -> t m (m a)

我们可以提供默认实现,以便在基础中一直运行

runForkBaseDefault :: (Monad (t m), MonadTransRunFork t, MonadRunForkBase b m) =>
                      t m a -> t m (b a)
runForkBaseDefault = (>>= lift . runForkBase) . runFork

这让我们分两步完成StateT的MonadRunForkBase实例.首先,我们将使用上面的runFork来创建MonadTransRunFork实例

import Control.Monad
import qualified Control.Monad.Trans.State.Lazy as State

instance MonadTransRunFork (State.StateT s) where
    runFork x = State.get >>= return . liftM fst . State.runStateT x

然后我们将使用默认值来提供MonadRunForkBase实例.

{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}

instance (MonadRunForkBase b m) => MonadRunForkBase b (State.StateT s m) where
    runForkBase = runForkBaseDefault

我们可以为RWS做同样的事情

import qualified Control.Monad.Trans.RWS.Lazy as RWS

instance (Monoid w) => MonadTransRunFork (RWS.RWST r w s) where
    runFork x = do
        r <- RWS.ask
        s <- RWS.get
        return $do 
            (a, s', w') <- RWS.runRWST x r s
            return a

instance (MonadRunForkBase b m, Monoid w) => MonadRunForkBase b (RWS.RWST r w s m) where
    runForkBase = runForkBaseDefault

MonadBaseControl

与我们在前两节中开发的MonadRunForkBase不同,monad-control的MonadBaseControl并未假设“未来的状态变化通常不会在两个期货之间共享”. MonadBaseContol和control通过restoreM :: StM m a – >努力从控制结构中的分支恢复状态.嘛.对于forkIO来说,这不会产生问题;使用forkIO是MonadBaseControl文档中提供的示例.对于forkIO from threads来说这将是一个小问题,因为返回了额外的m(结果a).

我们想要的m(结果a)实际上将作为IO(结果(StM m a))返回.我们可以摆脱IO并用带有liftBase的m替换它,留下m(结果(StM m a)).我们可以将StM m a转换为恢复状态的m a然后使用restoreM返回a,但是它被卡在Result~As SomeException中.要么l是仿函数,所以我们可以在其中的任何地方应用restoreM,将类型简化为m(Result(m a)).要么l也是Traversable,对于任何Traversable,我们总是可以在Monad或Applicative中用sequenceA :: t(f a) – >交换它. f(t a).在这种情况下,我们可以使用专用mapM,它是fmap和sequenceA的组合,只有Monad约束.这将给出m(m(结果a)),并且ms将通过Monad中的连接一起展平或者仅使用>> =.这导致了

{-# LANGUAGE FlexibleContexts #-}

import Control.Concurrent
import Control.Concurrent.Thread
import qualified Control.Concurrent.Thread.Group as TG

import Control.Monad.Base
import Control.Monad.Trans.Control

import Data.Functor
import Data.Traversable
import Prelude hiding (mapM)

fork :: (MonadBaseControl IO m) =>
        TG.ThreadGroup -> m a -> m (ThreadId, m (Result a))
fork tg action = do
    (tid, r) <- liftBaseWith (\runInBase -> TG.forkIO tg (runInBase action))    
    return (tid, liftBase r >>= mapM restoreM)

当我们在原始线程中运行m(结果a)时,它会将状态从分叉线程复制到原始线程,这可能很有用.如果要在读取结果后恢复主线程的状态,则需要先捕获它. checkpoint将捕获整个状态并返回一个动作来恢复它.

checkpoint :: MonadBaseControl b m => m (m ())
checkpoint = liftBaseWith (\runInBase -> runInBase (return ()))
             >>= return . restoreM

一个完整的例子将展示两个线程对状态的影响.无论在另一个线程中修改状态的努力如何,两个线程都从fork发生时获得状态.当我们在主线程中等待结果时,主线程中的状态被设置为分叉线程中的状态.我们可以通过运行checkpoint创建的操作来获取主线程的状态.

import Control.Monad.State hiding (mapM)

example :: (MonadState String m, MonadBase IO m, MonadBaseControl IO m) => m ()
example = do    
    get >>= liftBase . putStrLn
    tg <- liftBase TG.new
    (_, getResult) <- fork tg (get >>= put . ("In Fork:" ++)  >> return 7)
    get >>= put . ("In Main:" ++) 
    revert <- checkpoint
    result <- getResult
    (liftBase . print) result
    get >>= liftBase . putStrLn
    revert
    get >>= liftBase . putStrLn

main = do
    runStateT example "Initial"
    return ()

这输出

Initial
Right 7
In Fork:Initial
In Main:Initial
点赞