python - 在使用 pyspark 和预定义的结构模式读取嵌套的 JSON 时,如何将缺失的列

python =3.6

Spark =2.4

我的示例 JSON 数据:

{"data":{"header":"someheader","body":{"name":"somename","value":"somevalue","books":[{"name":"somename"},{"value":"somevalue"},{"author":"someauthor"}]}}},
{"data":{"header":"someheader1","body":{"name":"somename1","value":"somevalue1","books":[{"name":"somename1"},{"value":"somevalue1"},{"author":"someauthor1"}]}}},....

我的结构模式:

Schema = StructType([StructField('header',StringType(),True),StructField('body',StructType([StructField('name1',StringType(),True),StructField('value',StringType(),True),StructField('books',ArrayType(StructType([StructField('name1',StringType(),True),StructField('value',StringType(),True),StructField('author',StringType(),True),StructField('publisher',StringType(),True)]),True),True)]),True)])

我想传递此架构并能够将所有字段(包括数据中缺少的字段)填充为 NULL。

因为,对于某一天的负载,可能会发生任何输入数据在结构字段的书籍数组中没有作者列。

因此,如果我不使用架构,spark 将无法推断该列,因为任何输入数据都没有它。

这是我尝试过的,

1> df = spark.read.schema(schema).json('/input/data/path')

这给了我所有的空行,因为输入文件在数据字段中有标题和正文,而结构模式中不存在数据字段

2> df = spark.read.json('/input/data/path').select(col("data.*")) df.coalesce(1).write.json('/输出/路径') df2 = spark.read.schema(schema).json('/输出/路径')

这也为我提供了所有空行,因为结构模式具有数据中不存在的额外列。

3> df = spark.read.json('/input/data/path').select(col("data.*")) df2 = spark.createDataFrame(df.rdd, schema)

这失败了,至于这个工作,列的顺序和所有嵌套列需要在数据和模式中完全相同,这是不可行的。

4> 在这种方法中,我尝试从输入中读取没有模式的数据并将其写回临时路径。

然后使用来自输入的模式再次读取数据,它为我提供所有空行,然后将空值替换为“1”,然后以追加模式写入相同的临时路径。

然后从这个临时路径再次读取并让 spark 推断模式。

但这也不起作用,因为嵌套的结构空列没有被非空值替换,当我写它时,输出路径没有所有列。

df.coalesce(1).write.json('/output/path')
df_input_with_schema = spark.read.schema(schema).json('/input/data/path') --all null rows
df_input_with_schema.na.fill('1').format('json').write.mode('append').save('/output/path')
df_final = spark.read.json('/output/path').filter(col("keycolumn") === 1)

有人可以帮忙吗?

最佳答案

从 spark 3 开始就有了 ignoreNullFields选项(默认为 True )

你可以这样做:

df.coalesce(1).write.mode('overwrite').json(ignoreNullFields=False,path="a")

保留只有空值的列

关于python - 在使用 pyspark 和预定义的结构模式读取嵌套的 JSON 时,如何将缺失的列添加为 null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63870745/

相关文章:

python-3.x - 安装 github 子模块时,诗歌安装失败并显示 [CalledProce

variables - 如何在 Pyomo 模型中设置决策变量的初始值

linux - 用 n 步旋转文本文件中的行

node.js - 即时将文件从 S3 移动到 AWS EFS

visual-studio - VS代码-OSS : Downloading files and f

google-cloud-platform - 接受 Google 帐户转移请求时如何解决消息 "T

python - 使用 Python 的双向方差分析

node.js - NodeJS Docker 当前与最新

github - 最初发布一个github包显示 "Username must not be nul

java - 使用 Spring JPA/Hibernate 按不同值批量更新