python - 如何在 Nifi 中将 pandas 数据帧从一个处理器传递到另一个处理器?

使用 Nifi,我想:

  1. 运行导出 pandas 数据框的 Python 脚本
  2. 发送通过 ExecuteStreamCommand 到各种输入和输出 pandas 数据帧的即插即用 Python 脚本,不知道它们是通过 Nifi 运行的,我无法修改它以使用 STDIN/STDOUT 而不是 pandas。
  3. 将输出数据帧传递给进一步处理。

这可能吗?如果是,怎么办?

换句话说:

  1. 第一个脚本:flowfile -> pandas
  2. 许多脚本:用 pandas 做事
  3. 最后一个脚本:pandas -> flowfile

最佳答案

NiFi的ExecuteScript支持Jython,它不允许Python原生库(pandas是原生库),所以你不能直接在NiFi中执行这个 Action 。我建议您编写一个完整的 Python shell 脚本,它执行以下操作并使用 ExecuteStreamCommand 处理器从 NiFi 调用它:

Python 封装脚本:

  1. 接受来自 STDIN 的输入(这将是流文件内容)
    • 您还可以使用 ESC 处理器的“命令参数”属性将流文件属性作为参数放在命令行上
  2. STDIN 输入转换为 pandas 数据帧
  3. 在不了解 NiFi 的任意 Python 脚本之间传递数据帧
  4. 将最终数据帧输出为STDOUT

这将允许将传入的流文件内容发送到此包装器脚本,使用包含的脚本进行所有内部修改,然后将输出从 STDOUT 转换回流文件内容。

https://stackoverflow.com/questions/61785013/

相关文章:

c - 等待运行目标的 makefile 列表

bash - 如何捕获输入来控制截图功能?

amazon-web-services - 如何替换 AWS Auto Scaling 组中的特定实

wordpress - Post Object Timber/Twig WordPress 问题

java - Intellij Idea : sbt refresh FetchError$Down

django - 将 Django 连接到 Microsoft SQL 数据库

javascript - 在 includes() 中使用 "OR"运算符来试验字符串中是否存在任何

javascript - 如何使用 nestfastify 编写来自 nestjs 异常过滤器的响应

javascript - web3.eth.accounts[0] 返回未定义和 app.vote(

google-cloud-platform - GCP Nat 后超时