apache-spark - Pyspark - 圆时间表示为最接近刻钟(15 分钟)的整数

我正在 Pyspark 上处理数据框。一列由时间对应的整数值组成:

data.select('TIME').show(4)
+------------------+
|TIME              |
+------------------+
|               925|
|              2205|
|              2205|
|              2205|
+------------------+

我想将这个时间四舍五入到最接近的值,时间步长为 15 分钟,以获得:

+------------------+
|TIME_15_MIN_STEP  |
+------------------+
|               930|
|              2200|
|              2200|
|              2200|
+------------------+

有人知道怎么做吗?

非常感谢!!

最佳答案

不使用 udf 的一种方法是首先将整数列转换为虚拟时间戳,然后执行与 my answer 中概述的几乎相同的操作。到 similar question .最后将结果转换回所需格式的整数。

更详尽的示例数据

我创建了一个具有更多可变性的示例来证明此方法正常工作。

data = sqlCtx.createDataFrame([(925,), (2205,), (2210,), (2242,), (2255,)], ["TIME"])
data.show()
#+----+
#|TIME|
#+----+
#| 925|
#|2205|
#|2210|
#|2242|
#|2255|
#+----+

将整数列转换为虚拟时间戳

要将整数小时-分钟列转换为时间戳,我们首先使用 pyspark.sql.functions.format_string()至 add leading zeros到合适的时候。接下来,我们将一个虚拟日期(我使用了 "2018-01-01")与转换后的时间连接起来,并在末尾添加 ":00"(表示秒)。

data = data.withColumn("time_string", f.format_string("%04d", f.col("TIME")))\
    .withColumn(
        "time_string",
        f.concat_ws(
            ":",
            f.array(
                [
                    f.substring(
                        "time_string",
                        1,
                        2
                    ),
                    f.substring(
                        "time_string",
                        3,
                        2
                    ),
                    f.lit("00")
                ]
            )
        )
    )\
    .withColumn("time_string", f.concat(f.lit("2018-01-01 "), f.col("time_string")))

data.show()
#+----+-------------------+
#|TIME|        time_string|
#+----+-------------------+
#| 925|2018-01-01 09:25:00|
#|2205|2018-01-01 22:05:00|
#|2210|2018-01-01 22:10:00|
#|2242|2018-01-01 22:42:00|
#|2255|2018-01-01 22:55:00|
#+----+-------------------+

计算偏移时间戳的分钟数

使用pyspark.sql.functions.minute()从虚拟时间戳中获取分钟。我们除以 15,四舍五入,然后乘以 15 得到"new"分钟。 (此逻辑在 linked answer 中有更详细的解释。)

data = data.withColumn("minute", f.minute("time_string"))\
    .withColumn("new_minute", f.round(f.col("minute")/15)*15)\
    .withColumn("minute_add", f.col("new_minute") - f.col("minute"))\

data.show()
#+----+-------------------+------+----------+----------+
#|TIME|        time_string|minute|new_minute|minute_add|
#+----+-------------------+------+----------+----------+
#| 925|2018-01-01 09:25:00|    25|      30.0|       5.0|
#|2205|2018-01-01 22:05:00|     5|       0.0|      -5.0|
#|2210|2018-01-01 22:10:00|    10|      15.0|       5.0|
#|2242|2018-01-01 22:42:00|    42|      45.0|       3.0|
#|2255|2018-01-01 22:55:00|    55|      60.0|       5.0|
#+----+-------------------+------+----------+----------+

以秒为单位添加偏移量,转换回整数

minute_add 列乘以 60 以获得以秒为单位的偏移量。将此添加到 time_string 以获得"new"时间。

data = data.withColumn(
        "new_time",
        f.from_unixtime(f.unix_timestamp("time_string") + f.col("minute_add")*60)
    )\
    .withColumn(
        "NEW_TIME",
        f.format_string("%02d%02d", f.hour("new_time"), f.minute("new_time")).cast("int")
    )
data.select("TIME", "NEW_TIME").show()
#+----+--------+
#|TIME|NEW_TIME|
#+----+--------+
#| 925|     930|
#|2205|    2200|
#|2210|    2215|
#|2242|    2245|
#|2255|    2300|
#+----+--------+

https://stackoverflow.com/questions/50345282/

相关文章:

intellij-idea - IntelliJ IDEA - 在路径中查找被锁定在另一个窗口监视器

android - 使用限制查询 Firebase 数据库

apache-spark - Spark UI 中的 "red"executors 是什么意思?

typescript - 在 Typescript 中构建 const 键的类型

reactjs - 无法使用 axios 和 ReactJS 执行获取请求

python - 如何永远运行异步函数(Python)

ansible - 例如,如何限制 Ansible 的设置模块 (gather_facts) 仅检索

wordpress - 使用 AWS RDS 和我自己的数据库哪个最便宜?

python - 使用 bigquery tables GET api 获取表的最后修改日期

amazon-web-services - 如何在不使用 IAM 的情况下允许第三方文件上传到私有(