python - PySpark:在 UDF 中使用列名并根据逻辑连接列名

我有一个这样的 Spark 数据框...

身份证 A B C D id1 1 0 0 2 id2 0 3 0 1 id3 1 2 5 0 id4 4 0 0 1 表>

我想要一个基于此逻辑的新数据框...

  1. 接受任何具有正值的列
  2. 连接他们的名字

结果会是这样的...

身份证 新列 id1 A,D id2 B,D id3 A,B,C id4 A,D 表>

我的努力:

A) 对于第一步,我认为我会将整数转换为列的名称... 所以它看起来像这样......

身份证 A B C D id1 一个 0 0 D id2 0 B 0 D id3 一个 B C 0 id4 一个 0 0 D 表>

我正在尝试使用 UDF,但它不起作用...

def CountSelect(colname, x):
  if x>0 :
    return colname
  else:
    return ""

countUDF = UserDefinedFunction(CountSelect, T.StringType())

cols = inoutDF.columns
cols.remove("ID")

intermediateDF = inputDF.select("ID", *(countDF(c, col(c)).alias(c) for c in cols))

但是它不起作用...

请问有人能帮忙吗?

B) 然后我将在所有列上使用字符串连接函数

这部分应该更容易,但如果您能将两个逻辑组合成一个更简单的工作代码,我将非常感谢您。

非常感谢

最佳答案

想法是标记列中的行是positive 并返回相应列的值。

您可以使用 reduce标记列并创建一个新的 DataFrame 最后使用 concat_ws形成所需的值

@anky 提供的更简洁的解决方案

简洁的解决方案-

sparkDF.withColumn("GreaterThanZero",F.concat_ws(",",*[F.when(F.col(col)>0,col) for col in to_concat]))\
.select("id","GreaterThanZero").show()

+---+---------------+
| id|GreaterThanZero|
+---+---------------+
|id1|            A,D|
|id2|            B,D|
|id3|          A,B,C|
|id4|            A,D|
+---+---------------+

数据准备

input_str = """
id1 1   0   0   2
id2 0   3   0   1
id3 1   2   5   0
id4 4   0   0   1
""".split()

input_values = list(map(lambda x: x.strip() if x.strip() != 'null' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "ID   A   B   C   D".split()))
            
n = len(input_values)
n_cols = 5

input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list, cols)

sparkDF.show()

+---+---+---+---+---+
| ID|  A|  B|  C|  D|
+---+---+---+---+---+
|id1|  1|  0|  0|  2|
|id2|  0|  3|  0|  1|
|id3|  1|  2|  5|  0|
|id4|  4|  0|  0|  1|
+---+---+---+---+---+

减少

to_check = ['id','A','B','C','D']

sparkDF_marked = reduce(lambda df
                , x: df.withColumn(x,F.when(F.col(x) > 0 ,x).otherwise(None))\
                        if x != 'id' else df.withColumn(x,F.col(x)) \
                ,to_check, sparkDF
            )
                      
sparkDF_marked.show()

+---+----+----+----+----+
| id|   A|   B|   C|   D|
+---+----+----+----+----+
|id1|   A|null|null|   D|
|id2|null|   B|null|   D|
|id3|   A|   B|   C|null|
|id4|   A|null|null|   D|
+---+----+----+----+----+

连接

to_concat = ['A','B','C','D']

sparkDF_marked.select(['id',F.concat_ws(',',*to_concat).alias('GreaterThanZero')]).show()

+---+---------------+
| id|GreaterThanZero|
+---+---------------+
|id1|            A,D|
|id2|            B,D|
|id3|          A,B,C|
|id4|            A,D|
+---+---------------+

该解决方案虽然有效,但有一些细微差别,您需要注意,尤其是 reduce 代码片段to_checkto_concat.

to_check 可以很容易地替换为 - sparkDF.columns 用于实际数据,但请让我知道更大数据集的性能。

https://stackoverflow.com/questions/69231846/

相关文章:

c# - 如何将 ViewData 传递给类中的 Action 方法?

c++ - 为什么 C++ 数组的括号在变量上而不是在数据类型上?

c - Valgrind : Invalid read of size 1

apache-kafka - 如何为 Kafka 主题实现生存时间 (TTL)

swift - 是否有可能在 Swift 的枚举中将多个案例分组并表示为另一个案例?

android - Jetpack Compose 中的副作用

javascript - leetcode 如何做到这一点 : Read user's input

r - 如何按 r 中数据框中的行对数字数据进行排名?

javascript - 如果 javascript 中的数据属性(HTML)=== 输入文本,如何

java - 如何检查至少一个 boolean 值是否为真?