apache-kafka - 为什么 KSQL 查询从流-流连接创建的流中返回空值?

我在流-流连接查询结果中遇到意外行为。

情况

KSQL 版本:5.1.3

  1. 从每个 kafka 主题创建了 2 个流
#1.
CREATE STREAM streamA
(id VARCHAR)
WITH (KAFKA_TOPIC='topicA', VALUE_FORMAT='JSON');

#2.
CREATE STREAM streamB
(id VARCHAR,
date VARCHAR,
count INT)
WITH (KAFKA_TOPIC='topicB', VALUE_FORMAT='JSON')
  1. stream-stream 加入创建 kafka 主题
CREATE STREAM streamC
WITH (KAFKA_TOPIC='topicC', VALUE_FORMAT='JSON', PARTITIONS=5) AS
SELECT b.id AS `id`,
    b.date AS `date`,
    b.count AS `count`
FROM streamB b
INNER JOIN streamA a WITHIN 1 DAY
    on b.id = a.id;

问题

在这种情况下,当我进行以下 2 个查询时,一个可以获取所有信息,但另一个不能。您是否知道为什么会发生这种情况或我的查询存在一些问题?

引用

以下查询返回预期结果。

# OK
ksql> select a.id as `id`, a.date as `date`, a.count as `count` from streamA a inner join streamB b within 1 day on a.id = b.id;
# 00000001 | 2020-06-22 | 3

# OK
ksql> print 'topicC' from beginning;
# {"ROWTIME":1592804456184,"ROWKEY":"00000001","date":"2020-06-22","count":3}

但是,下面的查询返回了一个意外的结果。

# NG
ksql> select * from streamC;
# 1592804456184 | 00000001 | null | null

# (expected result)
# 1592804456184 | 00000001 | 2020-06-22 | 3

附加信息

ksql> DESCRIBE streamC;
Name                 : STREAMC
 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 date    | VARCHAR(STRING)
 count   | INTEGER
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

最佳答案

您正在运行 Confluent Platform 版本 5.1.3 附带的 KSQL 版本,看起来您遇到了错误。我没有找到涵盖您确切问题的问题。但是,最新版本确实有涵盖类似使用模式的测试用例,因此我相信您的问题已经得到解决。您需要做的就是升级到包含修复程序的版本。

我建议升级到 5.5.1 CP 版本、即将发布的 6.0.0 CP 版本或社区 0.10 ksqlDB 版本,因为我知道这些包含修复。

https://stackoverflow.com/questions/62525563/

相关文章:

node.js - 与 Prisma 2 相关的多个过滤器

ios - Moya+Alamofire POST 请求在应用程序之间切换或进入后台时超时

ios - Safari 或 IOS 中视频的 aws-sdk getSignedUrl 不起作用

postgresql - 无法使用 deno.js 连接到 postgres

amazon-web-services - 由于 Amplify 上的身份验证 token 过期较短

python - 如何在 python 中使用 altair 包加载和绘制 csv 文件?

git - $(NugetPackageRoot) 宏在 VS2019 中自动更改为 *.sfpro

node.js - 如何在 Couchbase NodeJS SDK 3X 中设置 operatio

java - 如何在 Tycho 构建中设置 Java 编译器兼容性?

python - Pandas 的 Mypy/typeshed stub