这是当前代码:
from pyspark.sql import SparkSession
park_session = SparkSession\
.builder\
.appName("test")\
.getOrCreate()
lines = spark_session\
.readStream\
.format("socket")\
.option("host", "127.0.0.1")\
.option("port", 9998)\
.load()
The 'lines' looks like this:
+-------------+
| value |
+-------------+
| a,b,c |
+-------------+
But I want to look like this:
+---+---+---+
| a | b | c |
+---+---+---+
我尝试使用“split()”方法,但没有用。您只能将每个字符串拆分为一列中的列表,而不能拆分为多列
我该怎么办?
最佳答案
拆分
值列并通过访问array index
(或)element_at(from spark-2.4)
(或)getItem()
函数到 create new columns.
from pyspark.sql.functions import *
lines.withColumn("tmp",split(col("value"),',')).\
withColumn("col1",col("tmp")[0]).\
withColumn("col2",col("tmp").getItem(1)).\
withColumn("col3",element_at(col("tmp"),3))
drop("tmp","value").\
show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#| a| b| c|
#+----+----+----+
关于pyspark - Spark Structured Streaming时DataFrame中的字符串列如何拆分为多列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61324549/
相关文章:
c# - Blazor + MongoDb 身份 : Value cannot be null.(参
windows - 如何使用 ssh 将 git push 到远程 Windows 机器
amazon-web-services - 注册到 AWS Chime session 通知
python - 管道中的第二个 `ParallelRunStep` 在启动时超时
uwp - 如何支持使用 Windows Cloud Sync Engine API 进行删除?
ruby-on-rails - 指定环境时 Webpacker 不替换 "process.env"变
python - 使用 Python Selenium 下载文件