根据提供的文档 here ,我正在尝试使用 POC 将消息发送到 same documentation 中提到的监听器。 , 下面是我写的配置。
@Configuration
public class KafkaConsumerConfig {
public static final String TEST_TOPIC_ID = "record-stream";
@Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
private String topic;
@Value("${kafka.address:localhost:9092}")
private String brokerAddress;
/*
@Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String>
kafkaMessageDrivenChannelAdapter = new
KafkaMessageDrivenChannelAdapter<>( container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return
kafkaMessageDrivenChannelAdapter; }
@Bean public QueueChannel received() { return new QueueChannel(); }
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
/*
* @Bean public KafkaMessageListenerContainer<String, String> container()
* throws Exception { ContainerProperties properties = new
* ContainerProperties(this.topic); // set more properties return new
* KafkaMessageListenerContainer<>(consumerFactory(), properties); }
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest
// smallest
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
监听器如下,
@Service
public class Listener {
private Logger log = Logger.getLogger(Listener.class);
@KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory")
public void process(String message/* , Acknowledgment ack */) {
Gson gson = new Gson();
Record record = gson.fromJson(message, Record.class);
log.info(record.getId() + " " + record.getName());
// ack.acknowledge();
}
}
即使我正在为同一主题生成消息并且此使用者正在处理同一主题,但监听器并未执行。
我正在运行 Kafka 0.10.0.1,这是我当前的 pom。与许多命令行示例不同,此使用者作为 Spring Boot Web 应用程序工作。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
我花了很多时间来弄清楚为什么当主题有消息时这个监听器没有被命中,我做错了什么。
我知道我可以使用 channel 接收消息(我已经在代码中注释掉了配置部分),但是这里的并发处理是干净的。
这种实现是否可以通过异步消息消费实现。
最佳答案
您必须在 @Configuration
旁边添加 @EnableKafka
。
将add很快就会有一些描述。
同时:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
关于java - Spring Integration Kafka Consumer Listener 不接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39903880/
相关文章:
java - 如何使用spring websocket向自定义用户发送自定义消息?
java - spring mvc 表单 :select tag
java - 使用 Spring JavaConfig 和 @Autowired 注入(inject
spring - 如何在运行时将新用户添加到 Spring Security
java - Spring WS : How to get and save XSD validat
java - Spring Security 3 - 总是返回错误 302
java - Spring MVC : Resolving the view based on Us