apache-spark - Spark : save ordered data to parque

我有 30TB 的数据按日期和小时划分,每小时分成 300 个文件。我进行了一些数据转换,然后希望对数据进行排序并按排序顺序保存,以便 C++ 程序轻松摄取。我知道当你序列化时,顺序只在文件中是正确的。我希望通过更好地划分数据来避免这种情况。

我想同时按 sessionID 和时间戳排序。我不希望 sessionIDs 在不同的文件之间分割。如果我在 SessionID 上分区,我将拥有太多,所以我对 N 取模以生成 N 个桶,旨在获得 1 个桶的数据大约 100-200MB:

df = df.withColumn("bucket", F.abs(F.col("sessionId")) % F.lit(50))

然后我在排序之前按日期、小时和桶遣返

df = df.repartition(50,"dt","hr","bucket")
df = df.sortWithinPartitions("sessionId","timestamp")
df.write.option("compression","gzip").partitionBy("dt","hr","bucket").parquet(SAVE_PATH)

这会将数据保存到 dt/hr/bucket,每个 bucket 中有 1 个文件,但顺序丢失了。如果我不创建存储桶和重新分区,那么我最终会得到 200 个文件,数据是有序的,但 sessionId 被拆分到多个文件中。

编辑: 问题似乎出在使用 partitionBy("dt","hr","bucket") 保存时,它会随机重新分区数据,因此不再排序。如果我在没有 partitionBy 的情况下保存,那么我得到的正是我所期望的 - N 个文件用于 N 个存储桶/分区和 sessionIds 跨单个文件,所有文件都正确排序。所以我有一个 non-spark hack 手动迭代所有日期 + 小时目录

如果您按列分区、排序,然后使用 partitionBy 写入同一列,那么您希望直接转储已排序的分区,而不是对数据进行一些随机重新洗牌,这似乎是一个错误。

最佳答案

将分区列放在已排序的列列表中可能会成功。

完整描述在这里 - https://stackoverflow.com/a/59161488/3061686

关于apache-spark - Spark : save ordered data to parquet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58881553/

相关文章:

google-cloud-platform - 使用 --allow-unauthenticated

apache-zookeeper - Chubby Sequencer 的目的是什么

java - Canny 边缘检测不检测 100% 水平/未旋转的线

python - 设置 scipy.signal.peak_widths 的绝对值

node.js - 使用 fetch API 时如何记录实际的请求 header

javascript - 如何使用 ngx-export-as 中的选项

amazon-web-services - 您应该使用客户端凭据授权类型来验证服务器到服务器吗?

leaflet - 校正 Rayshader 的卫星图像覆盖

docker - 让 docker build --memory-swap=20g 使用可用的交换空

mysql - 在 macOS Catalina 10.15.1 下安装 DBD::mysql 时遇