有没有人在 Django 中使用 Elasticsearch 进行并行测试?如果是这样,您能否分享实现它所需的配置更改?
我已经尝试了几乎所有我能想到的方法来让它工作,包括解决方案 outlined here .从 Django 本身如何处理并行数据库中汲取灵感,我目前创建了一个自定义的新 ParallelTestSuite
,它覆盖了 init_worker 以遍历每个索引/文档类型并粗略地更改索引名称,如下所示:
_worker_id = 0
def _elastic_search_init_worker(counter):
global _worker_id
with counter.get_lock():
counter.value += 1
_worker_id = counter.value
for alias in connections:
connection = connections[alias]
settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
# connection.settings_dict must be updated in place for changes to be
# reflected in django.db.connections. If the following line assigned
# connection.settings_dict = settings_dict, new threads would connect
# to the default database instead of the appropriate clone.
connection.settings_dict.update(settings_dict)
connection.close()
### Everything above this is from the Django version of this function ###
# Update index names in doctypes
for doc in registry.get_documents():
doc._doc_type.index += f"_{_worker_id}"
# Update index names for indexes and create new indexes
for index in registry.get_indices():
index._name += f"_{_worker_id}"
index.delete(ignore=[404])
index.create()
print(f"Started thread # {_worker_id}")
这似乎通常有效,但是,有些奇怪的事情似乎是随机发生的(即再次运行测试套件并不能可靠地重现问题和/或错误消息发生变化)。以下是我遇到的各种错误,似乎每次测试运行时其中一个错误随机失败:
_process_bulk_chunk
函数中没有 items
字典值我在想在连接层发生了一些奇怪的事情(比如 Django 测试运行程序进程之间的连接以某种方式混淆了响应?)但我不知道这怎么可能因为 Django 使用多处理来并行化测试,因此它们每个都在自己的进程中运行。是否有可能衍生进程仍在尝试使用原始进程的连接池或其他东西?我真的不知道从这里可以尝试的其他事情,非常感谢一些提示,甚至只是确认这实际上是可能的。
最佳答案
I'm thinking that there's something weird going on at the connection layer (like somehow the connections between Django test runner processes are getting the responses mixed up?) but I'm at a loss as to how that would be even possible since Django uses multiprocessing to parallelize the tests and thus they are each running in their own process. Is it somehow possible that the spun-off processes are still trying to use the connection pool of the original process or something?
这正是正在发生的事情。来自 the Elasticsearch DSL docs :
Since we use persistent connections throughout the client it means that the client doesn’t tolerate fork very well. If your application calls for multiple processes make sure you create a fresh client after call to fork. Note that Python’s multiprocessing module uses fork to create new processes on POSIX systems.
我观察到的情况是,响应非常奇怪地与可能已启动请求的看似随机的客户端交织在一起。因此,对文档建立索引的请求可能会以创建索引的响应结束,该索引具有非常不同的属性。
修复是为了确保每个测试工作人员都有自己的 Elasticsearch
客户端。这可以通过创建特定于 worker 的连接别名,然后用特定于 worker 的别名覆盖当前连接别名(使用私有(private)属性 _using
)来完成。以下是您随更改发布的代码的修改版本
_worker_id = 0
def _elastic_search_init_worker(counter):
global _worker_id
with counter.get_lock():
counter.value += 1
_worker_id = counter.value
for alias in connections:
connection = connections[alias]
settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
# connection.settings_dict must be updated in place for changes to be
# reflected in django.db.connections. If the following line assigned
# connection.settings_dict = settings_dict, new threads would connect
# to the default database instead of the appropriate clone.
connection.settings_dict.update(settings_dict)
connection.close()
### Everything above this is from the Django version of this function ###
from elasticsearch_dsl.connections import connections
# each worker needs its own connection to elasticsearch, the ElasticsearchClient uses
# global connection objects that do not play nice otherwise
worker_connection_postfix = f"_worker_{_worker_id}"
for alias in connections:
connections.configure(**{alias + worker_connection_postfix: settings.ELASTICSEARCH_DSL["default"]})
# Update index names in doctypes
for doc in registry.get_documents():
doc._doc_type.index += f"_{_worker_id}"
# Use the worker-specific connection
doc._doc_type._using = doc.doc_type._using + worker_connection_postfix
# Update index names for indexes and create new indexes
for index in registry.get_indices():
index._name += f"_{_worker_id}"
index._using = doc.doc_type._using + worker_connection_postfix
index.delete(ignore=[404])
index.create()
print(f"Started thread # {_worker_id}")
https://stackoverflow.com/questions/63238907/
相关文章:
android - 使用 Jetpack Navigation 在深度链接 URI 中使用字符串常量
laravel - 如何在 Laravel 中显示带有法语符号的验证消息?
angular - 在 Angular 10 中将 HTML 转换为 PDF
html - 网络套件 : FreeMarker/BFO Images Cutting Off at
java - 使用 Spring WebClient 重复过滤响应
php - 使用 Laravel 的 Eloquent 将数据发送到数据库的问题