json - 如何使用Python解析Spark 1.6中格式错误的JSON字符串,其中包含空格,多

This question already has answers here:
How to query JSON data column using Spark DataFrames?

(5个答案)


去年关闭。




使用 Pyspark 1.6 ,我尝试使用从Cloudera Hadoop发行版从HDFS文件位置解析,读取和加载给定的 JSON 文件,但我一直遇到砖墙。这是在 hadoop 中执行“-cat”操作后, JSON 字符串的样子:
{"json_data":"{\"table\":\"TEST.FUBAR\",\"op_type\":\"I\",\"op_ts\":\"2019-03-14 15:33:50.031848\",\"current_ts\":\"2019-03-14T15:33:57.479002\",\"pos\":\"1111\",\"after\":{\"COL1\":949494949494949494,\"COL2\":99,\"COL3\":2,\"COL4\":\"            99999\",\"COL5\":9999999,\"COL6\":90,\"COL7\":42478,\"COL8\":\"I\",\"COL9\":null,\"COL10\":\"2019-03-14 15:33:49\",\"COL11\":null,\"COL12\":null,\"COL13\":null,\"COL14\":\"x222263 \",\"COL15\":\"2019-03-14 15:33:49\",\"COL16\":\"x222263 \",\"COL17\":\"2019-03-14 15:33:49\",\"COL18\":\"2020-09-10 00:00:00\",\"COL19\":\"A\",\"COL20\":\"A\",\"COL21\":0,\"COL22\":null,\"COL23\":\"2019-03-14 15:33:47\",\"COL24\":2,\"COL25\":2,\"COL26\":\"R\",\"COL27\":\"2019-03-14 15:33:49\",\"COL28\":\"  \",\"COL29\":\"PBU67H   \",\"COL30\":\"            20000\",\"COL31\":2,\"COL32\":null}}"}

然后,我尝试使用以下命令从json文件创建一个数据框:
df = sqlContext.read.json("test_data.json")

然后,我运行pyspark脚本,然后将数据帧插入 parquet 格式的临时配置单元表中,选择时如下所示:

+------------------+
|         json_data|
+------------------+
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
|{"table":"TEST....|
+------------------+
only showing top 20 rows

我需要所有的 JSON 属性和值以行和列格式显示,但它显示为一个长字符串。由于我的工作是通过具有严格防火墙设置的代理完成的,因此我目前无法使用' Hive-JSON-Serde 'jar将其解析为json(这确实可以解决我的许多问题)。我尝试做一个 posexplode 或侧面 View 爆炸,但是这些是针对结构类型而不是 ArrayBuffer 类型的。

无需在群集上安装任何第三方软件或更改Spark SQL执行引擎或任何其他管理员设置(因为我是普通用户,所以失败),就可以解决 Cloudera 5.x 的任何变通方法,以将该字符串解析为 JSON 还是使用将值插入到具有行,列等的去规范化表中?

最佳答案

json_data ”内容实际上是一个字符串,而不是 json ,它已内置了诸如数组,映射和结构的架构结构。我的问题是在“ json_data ”的实际内线周围加上了多余的双引号(“),这在Spark尝试读取它时引起了问题。示例:

{"json_data":"{"table":"TEST.FUBAR","op_type":"I","op_ts":"2019-03-14 15:33:50.031848","current_ts":"2019-03-14T15:33:57.479002","pos":"1111","after":{"COL1":949494949494949494,"COL2":99,"COL3":2,"COL4":"            99999","COL5":9999999,"COL6":90,"COL7":42478,"COL8":"I","COL9":null,"COL10":"2019-03-14 15:33:49","COL11":null,"COL12":null,"COL13":null,"COL14":"x222263 ","COL15":"2019-03-14 15:33:49","COL16":"x222263 ","COL17":"2019-03-14 15:33:49","COL18":"2020-09-10 00:00:00","COL19":"A","COL20":"A","COL21":0,"COL22":null,"COL23":"2019-03-14 15:33:47","COL24":2,"COL25":2,"COL26":"R","COL27":"2019-03-14 15:33:49","COL28":"  ","COL29":"PBU67H   ","COL30":"            20000","COL31":2,"COL32":null}}"}

在删除双引号后,我使用了此示例:
{"json_data":{"table":"TEST.FUBAR","op_type":"I","op_ts":"2019-03-14 15:33:50.031848","current_ts":"2019-03-14T15:33:57.479002","pos":"1111","after":{"COL1":949494949494949494,"COL2":99,"COL3":2,"COL4":"            99999","COL5":9999999,"COL6":90,"COL7":42478,"COL8":"I","COL9":null,"COL10":"2019-03-14 15:33:49","COL11":null,"COL12":null,"COL13":null,"COL14":"x222263 ","COL15":"2019-03-14 15:33:49","COL16":"x222263 ","COL17":"2019-03-14 15:33:49","COL18":"2020-09-10 00:00:00","COL19":"A","COL20":"A","COL21":0,"COL22":null,"COL23":"2019-03-14 15:33:47","COL24":2,"COL25":2,"COL26":"R","COL27":"2019-03-14 15:33:49","COL28":"  ","COL29":"PBU67H   ","COL30":"            20000","COL31":2,"COL32":null}}}

我可能不得不使用一些regEx或某种函数实用程序来删除数据周围的双引号。但是在修改并运行 pyspark 之后,我得到了以下信息:
    Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkContext available as sc, HiveContext available as sqlContext.
>>> filePath = "/user/no_quote_json.json"
>>> df = sqlContext.read.json(filePath)
>>> df.printSchema()
root
 |-- json_data: struct (nullable = true)
 |    |-- after: struct (nullable = true)
 |    |    |-- COL1: long (nullable = true)
 |    |    |-- COL10: string (nullable = true)
 |    |    |-- COL11: string (nullable = true)
 |    |    |-- COL12: string (nullable = true)
 |    |    |-- COL13: string (nullable = true)
 |    |    |-- COL14: string (nullable = true)
 |    |    |-- COL15: string (nullable = true)
 |    |    |-- COL16: string (nullable = true)
 |    |    |-- COL17: string (nullable = true)
 |    |    |-- COL18: string (nullable = true)
 |    |    |-- COL19: string (nullable = true)
 |    |    |-- COL2: long (nullable = true)
 |    |    |-- COL20: string (nullable = true)
 |    |    |-- COL21: long (nullable = true)
 |    |    |-- COL22: string (nullable = true)
 |    |    |-- COL23: string (nullable = true)
 |    |    |-- COL24: long (nullable = true)
 |    |    |-- COL25: long (nullable = true)
 |    |    |-- COL26: string (nullable = true)
 |    |    |-- COL27: string (nullable = true)
 |    |    |-- COL28: string (nullable = true)
 |    |    |-- COL29: string (nullable = true)
 |    |    |-- COL3: long (nullable = true)
 |    |    |-- COL30: string (nullable = true)
 |    |    |-- COL31: long (nullable = true)
 |    |    |-- COL32: string (nullable = true)
 |    |    |-- COL4: string (nullable = true)
 |    |    |-- COL5: long (nullable = true)
 |    |    |-- COL6: long (nullable = true)
 |    |    |-- COL7: long (nullable = true)
 |    |    |-- COL8: string (nullable = true)
 |    |    |-- COL9: string (nullable = true)
 |    |-- current_ts: string (nullable = true)
 |    |-- op_ts: string (nullable = true)
 |    |-- op_type: string (nullable = true)
 |    |-- pos: string (nullable = true)
 |    |-- table: string (nullable = true)

>>> df.select("json_data.after.col29").show()
+---------+
|    col29|
+---------+
|PBU67H   |
+---------+

一旦采用这种格式,我完成的所有其他代码将压平struct对象并将其插入平整的Hive表中,而我拥有的所有其他逻辑将起作用。如果您遇到类似的问题,希望这对以后的所有人有帮助。

关于json - 如何使用Python解析Spark 1.6中格式错误的JSON字符串,其中包含空格,多余的双引号和反斜杠? ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55230220/

上一篇:docker - 如何将Docker的容器与管道连接

下一篇:hadoop - 从Teradata查询到pyspark

相关文章:

java - 如何从 HDFS 中的 Path 获取绝对路径

python - 具有非字母数字字符的字段名称的 Pydantic 模型

apache-spark - 在EMR集群中运行Spark应用时在哪里指定Spark配置

apache-spark - Apache Spark "Py4JError: Answer from Java side is empty"

python - 具有二进制输入的 Hadoop 流作业?

mysql - 将数据从 mysql 导入到 hbase 时出现问题

c# - 如何使用 jQuery 遍历我的 Json 响应?

python - 从 API 获取 header

json - Cloudformation 与 OpsWorks 处理空值

java - Java 中 Spark 的 Scala Seq?

相关文章:

docker - 如何将Docker的容器与管道连接

hadoop - NameNode 的用户名必须与 DataNode 的用户名相同吗?

docker - 在Boot2Docker中使用MySQL时JDBC连接缓慢

docker - Docker中的开发环境

java - java.io.IOException:方案:maprfs没有文件系统。将maprfs

docker - ssh进入boot2docker-您可以从boot2docker vm中看到-v文

hadoop - 在我们的发现集群中提高 yarn 容器利用率的建议

docker - Docker容器访问群集IP

docker - 权限被拒绝:docker-machine create

hadoop - 使用sqoop排除表