haskell - 如何使 Pipe 与 Haskell 的 Pipe 库并发?

我有一些使用管道的 Haskell 代码:

module Main(main) where
import Pipes

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = runEffect $ a >-> b >-> c

Pipes.Concurrent tutorial演示了使用多个 worker 以及工作窃取。我怎样才能在 b 中做类似的事情?我希望 b 使用一定数量的工作人员同时执行它的工作。

显然,并发在这种情况下没有用,但这是我能想到的最简单的示例。在我的实际用例中,我想使用有限数量的工作人员并发发出一些 Web 请求。

最佳答案

编辑:我误解了你的意思; 您也许可以在管道内执行此操作,但我不确定动机是什么。我建议构建可重复使用的管道链,并使用 worker 向他们 dispatch ,而不是尝试在管道内构建 worker。如果您将它构建到管道本身,您将失去任何先进先出的顺序保证。

关于 Work Stealing 的部分是您要找的,这段代码基本上是教程中的逐字记录,但让我们分解一下它的工作原理。这是我们可以做你想做的事情的一种方法:

module Main(main) where
import Pipes
import Pipes.Concurrent

import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = do
  (output, input) <- spawn unbounded
  feeder <- async $ do runEffect $ a >-> toOutput output
                       performGC

  workers <- forM [1..3] $ \i ->
    async $ do runEffect $ fromInput input  >-> b >-> c
               performGC

  mapM_ wait (feeder:workers)

第一行 spawn unbounded 来自 Pipes.Concurrent,它初始化一个具有输入和输出句柄的“邮箱”。一开始我很困惑,但在这种情况下,我们将消息发送到输出并从输入中提取它们。这类似于 golang 等语言中的推拉式消息 channel 。

我们指定一个 Buffer说我们可以存储多少条消息,在这种情况下,我们将无限制设置为无界。

好的,邮箱已初始化,我们现在可以创建向其发送消息的 Effect。邮箱 channel 是使用 STM 实现的,所以这就是它可以异步收集消息的方式。

让我们创建一个为邮箱提供数据的异步作业;

feeder <- async $ do runEffect $ a >-> toOutput output
                     performGC

a >-> toOutput 输出 只是普通的管道组合,我们需要toOutput 将输出转换为管道。请注意 performGC 调用也是 IO 的一部分,它允许 Pipes.Concurrent 知道在作业完成后进行清理。如果愿意,我们可以使用 forkIO 运行它,但在这种情况下,我们使用 async 以便我们可以等待结果稍后完成。好的,所以我们的邮箱应该异步接收消息,让我们把它们拉出来做一些工作。

workers <- forM [1..3] $ \i ->
  async $ do runEffect $ fromInput input  >-> b >-> c
             performGC

与之前的想法相同,但这次我们只是生成其中的一些。我们使用 fromInput 从输入中读取输入,就像普通管道一样,然后在链的其余部分运行它,完成后进行清理。 input 将确保每次提取一个值时只有一个工作人员接收它。当输入 output 的所有作业完成时(它会跟踪所有打开的作业),然后它将关闭 input 管道,worker 将完成。

如果您在 web-worker 场景中使用它,您将有一个主循环,它不断向 toOutput output channel 发送请求,然后生成任意数量的 worker 进入他们的管道来自 fromInput 输入

https://stackoverflow.com/questions/41412823/

相关文章:

c - 这个函数被认为是可重入的吗?

spring-mvc - 处理 org.thymeleaf.exceptions.TemplateI

pointers - SGX - 受信任的网桥和受信任的代理有什么区别?

class - 为什么我不能定义 `delete` 方法?

python-3.x - 模块 'urllib' 没有属性 'request'

c - 这是什么意思,我该如何纠正它 *** 检测到堆栈粉碎 *** : ./array1outpu

visual-studio - 如何在 Visual Studio 2015(对于 C++)中仅禁用

css - 由于非整数宽度,媒体查询运行异常

haskell - Machines 和 Conduits(或其他类似库)之间的概念区别是什么?

datetime - 如何在Lua中的给定日期时间添加天数