apache-kafka - 如何批量处理最大大小的 KStream 或回退到时间窗口?

我想创建一个基于 Kafka 流的应用程序,它处理一个主题并以大小为 X(即 50)的批处理获取消息,但如果流的流量较低,则在 Y 秒内(即5).

因此,我没有一条一条地处理消息,而是处理一个 List[Record],其中列表的大小为 50(或可能更少)。

这是为了让一些 I/O 绑定(bind)处理更有效率。

我知道这可以用经典的 Kafka API 来实现,但我一直在寻找一种基于流的实现,它也可以在本地处理偏移量提交,并考虑到错误/失败。 我找不到任何与他的文档或四处搜索相关的内容,想知道是否有人可以解决这个问题。

最佳答案

@Matthias J. Sax 回答很好,我只想为此添加一个示例,我认为它可能对某些人有用。 假设我们要将传入值组合成以下类型:

public class MultipleValues { private List<String> values; }

要将消息收集到最大尺寸的批处理中,我们需要创建转换器:

public class MultipleValuesTransformer implements Transformer<String, String, KeyValue<String, MultipleValues>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, MultipleValues> keyValueStore;
    private Cancellable scheduledPunctuator;

    public MultipleValuesTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
        scheduledPunctuator = processorContext.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::doPunctuate);
    }

    @Override
    public KeyValue<String, MultipleValues> transform(String key, String value) {
        MultipleValues itemValueFromStore = keyValueStore.get(key);
        if (isNull(itemValueFromStore)) {
            itemValueFromStore = MultipleValues.builder().values(Collections.singletonList(value)).build();
        } else {
            List<String> values = new ArrayList<>(itemValueFromStore.getValues());
            values.add(value);
            itemValueFromStore = itemValueFromStore.toBuilder()
                    .values(values)
                    .build();
        }
        if (itemValueFromStore.getValues().size() >= 50) {
            processorContext.forward(key, itemValueFromStore);
            keyValueStore.put(key, null);
        } else {
            keyValueStore.put(key, itemValueFromStore);
        }
        return null;
    }

    private void doPunctuate(long timestamp) {
        KeyValueIterator<String, MultipleValues> valuesIterator = keyValueStore.all();
        while (valuesIterator.hasNext()) {
            KeyValue<String, MultipleValues> keyValue = valuesIterator.next();
            if (nonNull(keyValue.value)) {
                processorContext.forward(keyValue.key, keyValue.value);
                keyValueStore.put(keyValue.key, null);
            }
        }
    }

    @Override
    public void close() {
        scheduledPunctuator.cancel();
    }
}

我们需要创建键值存储,将其添加到StreamsBuilder,并使用transform 方法构建KStream

Properties props = new Properties();
...
Serde<MultipleValues> multipleValuesSerge = Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(MultipleValues.class));
StreamsBuilder builder = new StreamsBuilder();
String storeName = "multipleValuesStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, MultipleValues>> storeBuilder =
        Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), multipleValuesSerge);
builder.addStateStore(storeBuilder);

builder.stream("source", Consumed.with(Serdes.String(), Serdes.String()))
        .transform(() -> new MultipleValuesTransformer(storeName), storeName)
        .print(Printed.<String, MultipleValues>toSysOut().withLabel("transformedMultipleValues"));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();

通过这种方法,我们使用了我们进行聚合的传入 key 。如果您需要不是通过键而是通过某些消息的字段来收集消息,则需要以下流程来触发 KStream 上的重新平衡(通过使用中间主题):

.selectKey(..)
.through(intermediateTopicName)
.transform( ..)

https://stackoverflow.com/questions/53628143/

相关文章:

react-native - React native如何改变WebView的背景色?

git - 如何通过 GitHub 的 API 进行 cherry-pick

laravel - 如何在 Laravel 迁移中的特定列之后订购多个新列

macos - 如何在 macOS 上获取 awk 版本?

matlab - 根据给定矩阵的对角线和反对角线创建新矩阵

python - 如何修复-没有这样的表 : main. auth_user__old

google-colaboratory - 在 Google Colab notebook 上安装

html-table - 表中 1.5 的 Colspan

python - Pandas - 从索引中提取月份和年份

vue.js - 如何使用 Nuxt.js 在 vuex 状态中获取本地存储数据