我在flink 1.11.1写了一个flink batch job。作业成功完成后,我想做一些类似调用 http 服务的事情。
我添加了一个简单的作业监听器来 Hook 作业状态。问题是当 kafka 接收器运算符(operator)抛出错误时,不会触发作业监听器。我希望当我的工作失败时,它应该触发我的工作监听器并打印失败日志。
我如何确定工作是否成功完成?
我们将不胜感激。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
})
env.createInput(input)
.filter(r => Option(r.token).getOrElse("").nonEmpty)
.addSink(kafkaProducer)
最佳答案
如果您尝试在集群上运行该作业,您可以使用您的作业 ID 在控制台中查看您的记录器消息和标准输出。请引用随附的屏幕截图,
如果您在本地集群上运行,则默认 url 可以是 http://localhost:8081。
同样,以下不是检查您的工作是否成功的正确方法。
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
https://stackoverflow.com/questions/63618060/
相关文章:
mongodb - Azure cosmosDB : Getting error on sortin
markdown - 如何在 Sublime Text 3 中为 Markdown 启用 Emmet
html - CSS Grid - 在自动流列上自动生成新的行中断
php - 如何使用 line bot webhook 请求在线路组中使用 @mention/@ta
javascript - Greasemonkey 用户脚本被内容安全策略阻止
linux - 安装包 : installed grub-efi-amd64 package pos
javascript - 如果导航到 React Native 中的其他屏幕,如何保留 formik