hadoop - Spring Cloud Dataflow-http |卡夫卡和卡夫卡| hdfs

我在SCDF(本地服务器1.7.3)中创建一个基本流,在其中配置2个流。
1. HTTP-> Kafka主题
2. Kafka主题-> HDFS

流:

stream create --name ingest_from_http --definition "http --port=8000 --path-pattern=/test > :streamtest1"
stream deploy --name ingest_from_http --properties "app.http.spring.cloud.stream.bindings.output.producer.headerMode=raw"

stream create --name ingest_to_hdfs --definition ":streamtest1 > hdfs --fs-uri=hdfs://<host>:8020 --directory=/tmp/hive/sensedev/streamdemo/ --file-extension=xml --spring.cloud.stream.bindings.input.consumer.headerMode=raw" 

我在/ tmp / hive / sensedev / streamdemo /位置上创建了一个Hive托管表
DROP TABLE IF EXISTS gwdemo.xml_test;
CREATE TABLE gwdemo.xml_test(

id int,

name string

 )

ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'

WITH SERDEPROPERTIES (

"column.xpath.id"="/body/id/text()",

"column.xpath.name"="/body/name/text()"


)

STORED AS

INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'

OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'

LOCATION '/tmp/hive/sensedev/streamdemo'

TBLPROPERTIES (

"xmlinput.start"="<body>",

"xmlinput.end"="</body>")

;

测试:
  • Hive是否能够读取XML:将xml文件放在该位置
    / tmp / hive / sensedev / streamdemo。

  • 文件内容:<body><id>1</id><name>Test1</name></body>
    在表上运行SELECT命令时,它正确显示了上面的记录。
  • 当使用http post在SCDF中发布记录时,我得到了正确的数据
    在Kafka Consumer中,但是当我检查HDFS时,xml文件是
    被创建,但是我在那些文件中收到原始消息。
    例:

    dataflow> http post --target http:/// test
    --data“<body><id>2</id><name>Test2</name></body>” --contentType应用程序/ xml

  • 在Kafka Console使用者中,我能够读取正确的XML消息:<body><id>2</id><name>Test2</name></body>
     $ hdfs dfs -cat /tmp/hive/sensedev/streamdemo/hdfs-sink-2.xml
    [B@31d94539
    


    问题:
    1.我想念什么?如何在HDFS中新创建的XML文件中获取正确的XML记录?

    最佳答案

    HDFS Sink需要一个Java序列化对象。

    关于hadoop - Spring Cloud Dataflow-http |卡夫卡和卡夫卡| hdfs-在HDFS中获取原始消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55123919/

    相关文章:

    hadoop - 如何将新记录更新到临时表中的主表

    apache-spark - Spark的示例在客户端模式下引发FileNotFoundExcept

    sql - 配置单元确定日期重叠的记录

    amazon-web-services - 需要有关数据管道创建的设计输入

    hadoop - HDFS的默认 block 大小为128 MB,而Hive的默认 strip 大小

    ubuntu - docker登录错误: no such host

    hadoop - 为什么 hive 中的CTAS查询没有得到预期的结果?

    python - Fedora 22 : ERROR: No module named '_rpmb

    nginx - 多个网站在Docker容器中运行,如何实现?

    java - 如何在 Hadoop 中对自定义可写类型进行排序