apache-flink - Apache Flink JobListener 无法正常工作

我在flink 1.11.1写了一个flink batch job。作业成功完成后,我想做一些类似调用 http 服务的事情。

我添加了一个简单的作业监听器来 Hook 作业状态。问题是当 kafka 接收器运算符(operator)抛出错误时,不会触发作业监听器。我希望当我的工作失败时,它应该触发我的工作监听器并打印失败日志。

我如何确定工作是否成功完成?

我们将不胜感激。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }

      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })

    env.createInput(input)
      .filter(r => Option(r.token).getOrElse("").nonEmpty)
      .addSink(kafkaProducer)

最佳答案

如果您尝试在集群上运行该作业,您可以使用您的作业 ID 在控制台中查看您的记录器消息和标准输出。请引用随附的屏幕截图,

如果您在本地集群上运行,则默认 url 可以是 http://localhost:8081。

同样,以下不是检查您的工作是否成功的正确方法。

if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }

https://stackoverflow.com/questions/63618060/

相关文章:

mongodb - Azure cosmosDB : Getting error on sortin

markdown - 如何在 Sublime Text 3 中为 Markdown 启用 Emmet

python-attrs - attrs转换器的装饰器

html - CSS Grid - 在自动流列上自动生成新的行中断

php - 如何使用 line bot webhook 请求在线路组中使用 @mention/@ta

javascript - Greasemonkey 用户脚本被内容安全策略阻止

linux - 安装包 : installed grub-efi-amd64 package pos

javascript - 如果导航到 React Native 中的其他屏幕,如何保留 formik

python - (-212 :Parsing error) in Object Detection

visual-studio - error : The imported project was n