apache-spark - Kafka 结构化流 java.lang.NoClassDefFoun

之前能跑Kafka结构流式编程。但是突然间,我所有的结构流 python 程序都失败了,并出现错误。我从 Spark 网站上获取了基本的 Kafka 结构流编程,该网站也因同样的错误而失败。

py4j.protocol.Py4JJavaError: An error occurred while calling o31.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)

我正在使用的 Spark 提交

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 C:\Users\ranjith.gangam\PycharmProjects\sparktest\Structured_streaming.py

这是我从 Spark github 上拿来的代码

spark = SparkSession\
      .builder\
      .appName("StructuredKafkaWordCount")\
      .getOrCreate()

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option(subscribeType, topics)\
    .load()\
    .selectExpr("CAST(value AS STRING)")

words = lines.select(
    # explode turns each item in an array into a separate row
    explode(
        split(lines.value, ' ')
    ).alias('word')
)

# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .start()

query.awaitTermination()

最佳答案

您的方法是正确的,但不幸的是,PySpark 尚不支持 Kafka 0.10。正如您在 SPARK-16534 中看到的那样.

到目前为止,对 pySpark 的唯一支持是 Kafka 0.8。因此,您可以迁移到 spark 0.8 或将代码更改为 Scala。

关于apache-spark - Kafka 结构化流 java.lang.NoClassDefFoundError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46184605/

相关文章:

php - 找不到属性 Application\Sonata\NewsBundle\Entity\P

google-cloud-platform - Google Cloud Endpoints 以 V

string - 如何删除大括号之间的文本

reactjs - reactjs 应用程序中的全局 SASS 变量

php - 第 0 行未知中的警告 : Unknown: Failed to write sessi

java - 反编译MCP报错(9.18版)返回 'Decompile Failed' 1

azure - Azure WebApp 是否自动进行速率限制/DOS 保护?

dataframe - 通过 [PySpark] 列连接两个 DataFrame

asp.net-mvc - ASP.NET Core 模型状态不返回 DataMember 名称

jekyll - 如何使用 Jekyll 将 CSS 导入 SCSS 文件?