我正在尝试在 Apache Spark SQL 中创建一个函数,该函数对多行数据进行操作,但无法找到直接在 Spark 中执行此操作的方法 - 在 Java 中。
我目前的解决方案是将数据从 Spark DataFrame 中提取出来并放入 Java 列表中进行处理,然后再返回到 Spark DataFrame。这在性能方面并不理想。
最好的选择似乎是Window functions ,但不幸的是,这些需要 Hive 上下文,我无权访问。 explode() function似乎是另一种选择,但同样,这是特定于 Scala 的,我无法让它在 Java 中工作。
也许这可以通过将 DataFrame 转换回 RDD 来完成?
如果有人对如何在 Java 中为 Apache Spark SQL 完成此操作有任何提示或建议,我们将不胜感激。谢谢你。
更新:提供的示例:
+----------+-----------+------------+
| Item | Timestamp | Difference |
+----------+-----------+------------+
| A | 11:00 | 02:00 |
| A | 13:00 | - |
+----------+-----------+------------+
| B | 09:00 | - |
+----------+-----------+------------+
| C | 15:15 | 00:20 |
| C | 15:35 | 01:30 |
| C | 17:05 | - |
+----------+-----------+------------+
所以在示例中,我尝试对按项目分组的行对进行操作,以计算每个项目行之间的时间差。
这样的任务可以通过 SQL 中的 LAG() 和 LEAD() 函数实现,但这些需要 Spark 中的 Hive。
最佳答案
从 Spark 1.5 开始,您现在可以定义 UDAF 或用户定义的聚合函数,以允许您对输入数据组执行自定义聚合。我认为这可能是我所看到的最接近您正在寻找的东西。
通常,您需要创建一个类来扩展 UserDefinedAggregateFunction
并实现所需的方法,包括初始化、合并和聚合。
一旦您创建了它,您就可以对其进行实例化、注册,然后在您的 SQL 中使用它。
val myAggregation = new MyAggregation
sqlContext.udf.register("MY_AGG", myAggregation)
https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
https://stackoverflow.com/questions/34140418/