之前能跑Kafka结构流式编程。但是突然间,我所有的结构流 python 程序都失败了,并出现错误。我从 Spark 网站上获取了基本的 Kafka 结构流编程,该网站也因同样的错误而失败。
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
我正在使用的 Spark 提交
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 C:\Users\ranjith.gangam\PycharmProjects\sparktest\Structured_streaming.py
这是我从 Spark github 上拿来的代码
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)")
words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
query.awaitTermination()
最佳答案
您的方法是正确的,但不幸的是,PySpark 尚不支持 Kafka 0.10。正如您在 SPARK-16534 中看到的那样.
到目前为止,对 pySpark 的唯一支持是 Kafka 0.8。因此,您可以迁移到 spark 0.8 或将代码更改为 Scala。
关于apache-spark - Kafka 结构化流 java.lang.NoClassDefFoundError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46184605/
相关文章:
php - 找不到属性 Application\Sonata\NewsBundle\Entity\P
google-cloud-platform - Google Cloud Endpoints 以 V
reactjs - reactjs 应用程序中的全局 SASS 变量
php - 第 0 行未知中的警告 : Unknown: Failed to write sessi
java - 反编译MCP报错(9.18版)返回 'Decompile Failed' 1
azure - Azure WebApp 是否自动进行速率限制/DOS 保护?
dataframe - 通过 [PySpark] 列连接两个 DataFrame