我在SCDF(本地服务器1.7.3)中创建一个基本流,在其中配置2个流。
1. HTTP-> Kafka主题
2. Kafka主题-> HDFS
流:
stream create --name ingest_from_http --definition "http --port=8000 --path-pattern=/test > :streamtest1"
stream deploy --name ingest_from_http --properties "app.http.spring.cloud.stream.bindings.output.producer.headerMode=raw"
stream create --name ingest_to_hdfs --definition ":streamtest1 > hdfs --fs-uri=hdfs://<host>:8020 --directory=/tmp/hive/sensedev/streamdemo/ --file-extension=xml --spring.cloud.stream.bindings.input.consumer.headerMode=raw"
DROP TABLE IF EXISTS gwdemo.xml_test;
CREATE TABLE gwdemo.xml_test(
id int,
name string
)
ROW FORMAT SERDE 'com.ibm.spss.hive.serde2.xml.XmlSerDe'
WITH SERDEPROPERTIES (
"column.xpath.id"="/body/id/text()",
"column.xpath.name"="/body/name/text()"
)
STORED AS
INPUTFORMAT 'com.ibm.spss.hive.serde2.xml.XmlInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION '/tmp/hive/sensedev/streamdemo'
TBLPROPERTIES (
"xmlinput.start"="<body>",
"xmlinput.end"="</body>")
;
<body><id>1</id><name>Test1</name></body>
<body><id>2</id><name>Test2</name></body>
” --contentType应用程序/ xml <body><id>2</id><name>Test2</name></body>
$ hdfs dfs -cat /tmp/hive/sensedev/streamdemo/hdfs-sink-2.xml [B@31d94539
最佳答案
HDFS Sink需要一个Java序列化对象。
关于hadoop - Spring Cloud Dataflow-http |卡夫卡和卡夫卡| hdfs-在HDFS中获取原始消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55123919/