apache-kafka - 一对多KStream-KTable连接

我有一个 kStream 的大学 - 什么时候上大学 -

University(universityId: String, name: String, studentIds: Seq[String])

val universityKStream = builder.stream[String, University](...)

还有一个 kTable of Students, 当学生是 -

Student(studentId: String, name: String)

val studentsKtable = builder.table[String, Student](...)

我想加入两者并产生一个 ResolvedUniversity 对象的主题:

ResolvedUniversity(universityId: String, name: String, students: Seq[Student])

我不能使用 universityId groupBy 和聚合学生,因为 universityId 字段在 Student 对象中不存在。

最佳答案

仅使用 DSL,我认为您可以做的最简单的事情是 (Java):

    class Student {
        String studentId;
        String name;
    }
    class University {
        String universityId;
        String name;
        List<String> studentIds;
    }
    class ResolvedUniversity {
        String universityId;
        String name;
        List<Student> students;
    }
    Serde<String> stringSerde = null;
    Serde<Student> studentSerde = null;
    Serde<University> universitySerde = null;
    Serde<ResolvedUniversity> resolvedUniversitySerde = null;

    KStream<String, University> universities = topology
      .stream("universities", Consumed.with(stringSerde, universitySerde));

    KTable<String, Student> students = topology
      .table("students", Consumed.with(stringSerde, studentSerde));

    KTable<String, ResolvedUniversity> resolvedUniversities = universities
      .flatMap((k, v) -> {
          return v.studentIds.stream()
            .map(id -> new KeyValue<>(id, v))
            .collect(Collectors.toList());
      })
      .join(students, Pair::pair, Joined.with(stringSerde, universitySerde, studentSerde))
      .groupBy((k, v) -> v.left().universityId)
      .aggregate(ResolvedUniversity::new,
                 (k, v, a) -> {
                     a.universityId = v.left().universityId;
                     a.name = v.left().name;
                     a.students.add(v.right());
                     return a;
                 },
                 Materialized.with(stringSerde, resolvedUniversitySerde));

对于这种类型的连接,对于历史处理,你的 KTable 大学必须在 KStream 连接之前用它的数据“准备好”。

https://stackoverflow.com/questions/49259477/

相关文章:

reactjs - 我如何将两个 Prop 从 jsx 传递到 scss

python - 类型错误 : '>' not supported between instance

ms-access - 从 VBA Access 中的函数返回数据类型时收到 'Invalid Us

php - Laravel 存储库和多个相关模型

c# - 如何确定在 Simple Injector 中使用哪种生活方式

json - 如何从 Oracle JSON 列获取数组索引?

antd - 为选择的多个组件设置默认初始值

python - 从python字符串中删除大写字母

azure - 尝试了解 azure 云服务中的负载平衡

angular - 为什么在导航路线时 Angular canActivate 函数保护执行两次