我们正在使用与 Spark 1.3.1 接口(interface)的 PySpark 库。
我们有两个数据框,documents_df := {document_id, document_text}
和 keywords_df := {keyword}
。我们想 JOIN 这两个数据帧,并使用 keyword_df.keyword 出现在 document_df.document_text 字符串中的条件返回一个包含 {document_id, keyword}
对的结果数据帧。
例如,在 PostgreSQL 中,我们可以使用以下形式的 ON 子句来实现此目的:
document_df.document_text 类似 '%' || keyword_df.关键字 || '%'
但是,在 PySpark 中,我无法使用任何形式的连接语法。以前有人取得过这样的成就吗?
谨致问候,
将
最佳答案
可以通过两种不同的方式实现,但一般来说不推荐。首先让我们创建一个虚拟数据:
from pyspark.sql import Row
document_row = Row("document_id", "document_text")
keyword_row = Row("keyword")
documents_df = sc.parallelize([
document_row(1L, "apache spark is the best"),
document_row(2L, "erlang rocks"),
document_row(3L, "but haskell is better")
]).toDF()
keywords_df = sc.parallelize([
keyword_row("erlang"),
keyword_row("haskell"),
keyword_row("spark")
]).toDF()
Hive UDF
documents_df.registerTempTable("documents")
keywords_df.registerTempTable("keywords")
query = """SELECT document_id, keyword
FROM documents JOIN keywords
ON document_text LIKE CONCAT('%', keyword, '%')"""
like_with_hive_udf = sqlContext.sql(query)
like_with_hive_udf.show()
## +-----------+-------+
## |document_id|keyword|
## +-----------+-------+
## | 1| spark|
## | 2| erlang|
## | 3|haskell|
## +-----------+-------+
Python 用户定义函数
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
# Of you can replace `in` with a regular expression
contains = udf(lambda s, q: q in s, BooleanType())
like_with_python_udf = (documents_df.join(keywords_df)
.where(contains(col("document_text"), col("keyword")))
.select(col("document_id"), col("keyword")))
like_with_python_udf.show()
## +-----------+-------+
## |document_id|keyword|
## +-----------+-------+
## | 1| spark|
## | 2| erlang|
## | 3|haskell|
## +-----------+-------+
为什么不推荐?因为在这两种情况下都需要笛卡尔积:
like_with_hive_udf.explain()
## TungstenProject [document_id#2L,keyword#4]
## Filter document_text#3 LIKE concat(%,keyword#4,%)
## CartesianProduct
## Scan PhysicalRDD[document_id#2L,document_text#3]
## Scan PhysicalRDD[keyword#4]
like_with_python_udf.explain()
## TungstenProject [document_id#2L,keyword#4]
## Filter pythonUDF#13
## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ...
## CartesianProduct
## Scan PhysicalRDD[document_id#2L,document_text#3]
## Scan PhysicalRDD[keyword#4]
还有其他方法可以在没有完全笛卡尔的情况下实现类似的效果。
加入标记化文档 - 如果关键字列表太大而无法在单台机器的内存中处理,则很有用
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import explode
tokenizer = Tokenizer(inputCol="document_text", outputCol="words")
tokenized = (tokenizer.transform(documents_df)
.select(col("document_id"), explode(col("words")).alias("token")))
like_with_tokenizer = (tokenized
.join(keywords_df, col("token") == col("keyword"))
.drop("token"))
like_with_tokenizer.show()
## +-----------+-------+
## |document_id|keyword|
## +-----------+-------+
## | 3|haskell|
## | 1| spark|
## | 2| erlang|
## +-----------+-------+
这需要洗牌而不是笛卡尔:
like_with_tokenizer.explain()
## TungstenProject [document_id#2L,keyword#4]
## SortMergeJoin [token#29], [keyword#4]
## TungstenSort [token#29 ASC], false, 0
## TungstenExchange hashpartitioning(token#29)
## TungstenProject [document_id#2L,token#29]
## !Generate explode(words#27), true, false, [document_id#2L, ...
## ConvertToSafe
## TungstenProject [document_id#2L,UDF(document_text#3) AS words#27]
## Scan PhysicalRDD[document_id#2L,document_text#3]
## TungstenSort [keyword#4 ASC], false, 0
## TungstenExchange hashpartitioning(keyword#4)
## ConvertToUnsafe
## Scan PhysicalRDD[keyword#4]
Python UDF 和广播变量 - 如果关键字列表相对较小
from pyspark.sql.types import ArrayType, StringType
keywords = sc.broadcast(set(
keywords_df.map(lambda row: row[0]).collect()))
bd_contains = udf(
lambda s: list(set(s.split()) & keywords.value),
ArrayType(StringType()))
like_with_bd = (documents_df.select(
col("document_id"),
explode(bd_contains(col("document_text"))).alias("keyword")))
like_with_bd.show()
## +-----------+-------+
## |document_id|keyword|
## +-----------+-------+
## | 1| spark|
## | 2| erlang|
## | 3|haskell|
## +-----------+-------+
它既不需要 shuffle 也不需要笛卡尔,但您仍然需要将广播变量传输到每个工作节点。
like_with_bd.explain()
## TungstenProject [document_id#2L,keyword#46]
## !Generate explode(pythonUDF#47), true, false, ...
## ConvertToSafe
## TungstenProject [document_id#2L,pythonUDF#47]
## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ...
## Scan PhysicalRDD[document_id#2L,document_text#3]
从 Spark 1.6.0 开始,您可以使用 sql.functions.broadcast
标记一个小数据框,以获得与上述类似的效果,而无需使用 UDF 和显式广播变量。重用标记化数据:
from pyspark.sql.functions import broadcast
like_with_tokenizer_and_bd = (broadcast(tokenized)
.join(keywords_df, col("token") == col("keyword"))
.drop("token"))
like_with_tokenizer.explain()
## TungstenProject [document_id#3L,keyword#5]
## BroadcastHashJoin [token#10], [keyword#5], BuildLeft
## TungstenProject [document_id#3L,token#10]
## !Generate explode(words#8), true, false, ...
## ConvertToSafe
## TungstenProject [document_id#3L,UDF(document_text#4) AS words#8]
## Scan PhysicalRDD[document_id#3L,document_text#4]
## ConvertToUnsafe
## Scan PhysicalRDD[keyword#5]
相关:
关于python - 我们如何使用 SQL-esque "LIKE"标准连接两个 Spark SQL 数据帧?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33400869/