java - 在 Apache Spark SQL 中对多行进行操作

我正在尝试在 Apache Spark SQL 中创建一个函数,该函数对多行数据进行操作,但无法找到直接在 Spark 中执行此操作的方法 - 在 Java 中。

我目前的解决方案是将数据从 Spark DataFrame 中提取出来并放入 Java 列表中进行处理,然后再返回到 Spark DataFrame。这在性能方面并不理想。

最好的选择似乎是Window functions ,但不幸的是,这些需要 Hive 上下文,我无权访问。 explode() function似乎是另一种选择,但同样,这是特定于 Scala 的,我无法让它在 Java 中工作。

也许这可以通过将 DataFrame 转换回 RDD 来完成?

如果有人对如何在 Java 中为 Apache Spark SQL 完成此操作有任何提示或建议,我们将不胜感激。谢谢你。

更新:提供的示例:

+----------+-----------+------------+
|   Item   | Timestamp | Difference |
+----------+-----------+------------+
|     A    |   11:00   |    02:00   |
|     A    |   13:00   |      -     |
+----------+-----------+------------+
|     B    |   09:00   |      -     |
+----------+-----------+------------+
|     C    |   15:15   |    00:20   |
|     C    |   15:35   |    01:30   |
|     C    |   17:05   |      -     |
+----------+-----------+------------+

所以在示例中,我尝试对按项目分组的行对进行操作,以计算每个项目行之间的时间差。

这样的任务可以通过 SQL 中的 LAG() 和 LEAD() 函数实现,但这些需要 Spark 中的 Hive。

最佳答案

从 Spark 1.5 开始,您现在可以定义 UDAF 或用户定义的聚合函数,以允许您对输入数据组执行自定义聚合。我认为这可能是我所看到的最接近您正在寻找的东西。

通常,您需要创建一个类来扩展 UserDefinedAggregateFunction 并实现所需的方法,包括初始化、合并和聚合。

一旦您创建了它,您就可以对其进行实例化、注册,然后在您的 SQL 中使用它。

val myAggregation = new MyAggregation 
sqlContext.udf.register("MY_AGG", myAggregation)

https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html

https://stackoverflow.com/questions/34140418/

相关文章:

java - 收听 RabbitMQ 队列并获取事件通知

php - Symfony 2 - 登录服务

xml - 将带有越南字符的数据导入 R

matrix - Spark RDD 到 Matrix

git - 为什么 cherry-pick 告诉我所有线路都已更改?

mongodb - 如何运行 mongo-express(无需身份验证)?

d3.js - d3 中的自定义比例

types - 如何声明 TypeScript 导入 * 类型?

sql-server - 在 SQL Server 2012 的情况下,尝试在我的 select 语

node.js - 如何在 mongodb 中插入嵌套对象?