我有一些使用管道的 Haskell 代码:
module Main(main) where
import Pipes
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = runEffect $ a >-> b >-> c
Pipes.Concurrent tutorial演示了使用多个 worker 以及工作窃取。我怎样才能在 b
中做类似的事情?我希望 b
使用一定数量的工作人员同时执行它的工作。
显然,并发在这种情况下没有用,但这是我能想到的最简单的示例。在我的实际用例中,我想使用有限数量的工作人员并发发出一些 Web 请求。
最佳答案
编辑:我误解了你的意思; 您也许可以在管道内执行此操作,但我不确定动机是什么。我建议构建可重复使用的管道链,并使用 worker 向他们 dispatch ,而不是尝试在管道内构建 worker。如果您将它构建到管道本身,您将失去任何先进先出的顺序保证。
关于 Work Stealing 的部分是您要找的,这段代码基本上是教程中的逐字记录,但让我们分解一下它的工作原理。这是我们可以做你想做的事情的一种方法:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
第一行 spawn unbounded
来自 Pipes.Concurrent,它初始化一个具有输入和输出句柄的“邮箱”。一开始我很困惑,但在这种情况下,我们将消息发送到输出并从输入中提取它们。这类似于 golang 等语言中的推拉式消息 channel 。
我们指定一个 Buffer说我们可以存储多少条消息,在这种情况下,我们将无限制设置为无界。
好的,邮箱已初始化,我们现在可以创建向其发送消息的 Effect
。邮箱 channel 是使用 STM 实现的,所以这就是它可以异步收集消息的方式。
让我们创建一个为邮箱提供数据的异步作业;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
a >-> toOutput 输出
只是普通的管道组合,我们需要toOutput
将输出转换为管道。请注意 performGC
调用也是 IO 的一部分,它允许 Pipes.Concurrent 知道在作业完成后进行清理。如果愿意,我们可以使用 forkIO
运行它,但在这种情况下,我们使用 async
以便我们可以等待结果稍后完成。好的,所以我们的邮箱应该异步接收消息,让我们把它们拉出来做一些工作。
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
与之前的想法相同,但这次我们只是生成其中的一些。我们使用 fromInput
从输入中读取输入,就像普通管道一样,然后在链的其余部分运行它,完成后进行清理。 input
将确保每次提取一个值时只有一个工作人员接收它。当输入 output
的所有作业完成时(它会跟踪所有打开的作业),然后它将关闭 input
管道,worker 将完成。
如果您在 web-worker 场景中使用它,您将有一个主循环,它不断向 toOutput output
channel 发送请求,然后生成任意数量的 worker 进入他们的管道来自 fromInput 输入
。
https://stackoverflow.com/questions/41412823/