pyspark - Spark Structured Streaming时DataFrame中的字符

这是当前代码:

from pyspark.sql import SparkSession

park_session = SparkSession\
    .builder\
    .appName("test")\
    .getOrCreate()

lines = spark_session\
    .readStream\
    .format("socket")\
    .option("host", "127.0.0.1")\
    .option("port", 9998)\
    .load()

The 'lines' looks like this:
+-------------+
|    value    |
+-------------+
|     a,b,c   |
+-------------+

But I want to look like this:
+---+---+---+
| a | b | c |
+---+---+---+

我尝试使用“split()”方法,但没有用。您只能将每个字符串拆分为一列中的列表,而不能拆分为多列

我该怎么办?

最佳答案

拆分值列并通过访问array index(或)element_at(from spark-2.4) (或)getItem() 函数到 create new columns.


from pyspark.sql.functions import *

lines.withColumn("tmp",split(col("value"),',')).\
withColumn("col1",col("tmp")[0]).\
withColumn("col2",col("tmp").getItem(1)).\
withColumn("col3",element_at(col("tmp"),3))
drop("tmp","value").\
show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#|   a|   b|   c|
#+----+----+----+

关于pyspark - Spark Structured Streaming时DataFrame中的字符串列如何拆分为多列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61324549/

相关文章:

c# - Blazor + MongoDb 身份 : Value cannot be null.(参

windows - 如何使用 ssh 将 git push 到远程 Windows 机器

amazon-web-services - 注册到 AWS Chime session 通知

python - 管道中的第二个 `ParallelRunStep` 在启动时超时

uwp - 如何支持使用 Windows Cloud Sync Engine API 进行删除?

ruby-on-rails - 指定环境时 Webpacker 不替换 "process.env"变

python - 使用 Python Selenium 下载文件

reactjs - 为什么 react-router 在调度时自动返回到以前的路由

mysql - SQL:如果*条件*在另一行,则更新一行

reactjs - 为什么在主题 UI 上使用 Rebass?