google-cloud-platform - 为什么 Airflow PubSubPullOper

我在 airflow 中使用 PubSubPullOperator 从 gcp 订阅中拉取消息。

pull_messages_task = PubSubPullOperator(
        task_id="pull_messages",
        ack_messages=True,
        project_id=GCP_PROJECT_ID,
        subscription="k8s-sub",
        gcp_conn_id=GCP_CONN_ID,
        max_messages=50
    )

从订阅中提取消息并保存在 Xcom 中效果很好。 我的问题是为什么 PubSubPullOperator 不能每次都拉取等于 max_messages 的消息?

例如,我向 GCP 主题发布 250 条消息。我的 Dag 每分钟运行一次,每次拉取 50 条消息。

下面是来自airflow的进程日志:

[2022-05-17 14:53:04,630] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:53:06,661] {pubsub.py:550} INFO - Pulled 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:54:04,312] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:54:06,239] {pubsub.py:550} INFO - Pulled 16 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:55:04,055] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:55:05,259] {pubsub.py:550} INFO - Pulled 4 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:56:04,590] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:56:06,527] {pubsub.py:550} INFO - Pulled 20 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:57:04,083] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:57:07,428] {pubsub.py:550} INFO - Pulled 38 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:58:05,561] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:58:07,431] {pubsub.py:550} INFO - Pulled 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 14:59:04,348] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 14:59:05,462] {pubsub.py:550} INFO - Pulled 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 15:00:06,882] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 15:00:08,710] {pubsub.py:550} INFO - Pulled 2 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

[2022-05-17 15:01:03,519] {pubsub.py:536} INFO - Pulling max 50 messages from subscription (path) projects/production-1/subscriptions/k8s-sub
[2022-05-17 15:01:03,688] {pubsub.py:550} INFO - Pulled 20 messages from subscription (path) projects/production-1/subscriptions/k8s-sub

我很确定每个 dag 的运行时间都在 1 分钟以内。并且 50 条消息大小未超过 Xcom 限制 (48KB)。

有人知道这种情况吗?或者有人知道 Operator 是如何决定拉取多少条消息的吗?

非常感谢。

最佳答案

似乎此功能源自 google 的 pubsub 客户端库,而不是 Airflow 运算符本身的功能/问题。

来自谷歌documentation

  The maximum number of messages to return for this request. Must be a positive integer. The Pub/Sub system may return fewer than the number specified.

运算符依赖 PubSubPullOperator使用 PubSubHook使用 SubscriberClient

关于google-cloud-platform - 为什么 Airflow PubSubPullOperator 没有提取最大消息数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72276934/

相关文章:

mysql - 运行 mysql_secure_installation 时出现错误

docker-compose - 如何获取分离容器的 docker-compose exec 的控制

visual-studio - 如何一次查找和替换多个不同的键值对?

rust - 为什么 cargo build 在 git 更改锁时忽略锁文件?

amazon-web-services - 如何找出弹性 beantalk 部署失败的原因?

laravel - 对 Laravel Controller 策略中的 $this->authori

angular - 如何重置表单中的特定字段?

javascript - React.useEffect 在依赖项发生变化时不会触发

aws-lambda - 让 Alexa 通过自定义技能通过从后端调用任何 API 而不说 'Ale

python - 最大方程长度