python - 如何使用 Elasticsearch-dsl 在 Django 中进行并行测试?

有没有人在 Django 中使用 Elasticsearch 进行并行测试?如果是这样,您能否分享实现它所需的配置更改?

我已经尝试了几乎所有我能想到的方法来让它工作,包括解决方案 outlined here .从 Django 本身如何处理并行数据库中汲取灵感,我目前创建了一个自定义的新 ParallelTestSuite,它覆盖了 init_worker 以遍历每个索引/文档类型并粗略地更改索引名称,如下所示:

_worker_id = 0
def _elastic_search_init_worker(counter):
    global _worker_id

    with counter.get_lock():
        counter.value += 1
        _worker_id = counter.value

    for alias in connections:
        connection = connections[alias]
        settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
        # connection.settings_dict must be updated in place for changes to be
        # reflected in django.db.connections. If the following line assigned
        # connection.settings_dict = settings_dict, new threads would connect
        # to the default database instead of the appropriate clone.
        connection.settings_dict.update(settings_dict)
        connection.close()

    ### Everything above this is from the Django version of this function ###

    # Update index names in doctypes
    for doc in registry.get_documents():
        doc._doc_type.index += f"_{_worker_id}"

    # Update index names for indexes and create new indexes
    for index in registry.get_indices():
        index._name += f"_{_worker_id}"
        index.delete(ignore=[404])
        index.create()

    print(f"Started thread # {_worker_id}")

这似乎通常有效,但是,有些奇怪的事情似乎是随机发生的(即再次运行测试套件并不能可靠地重现问题和/或错误消息发生变化)。以下是我遇到的各种错误,似乎每次测试运行时其中一个错误随机失败:

  • 尝试在上面的函数中创建索引时引发 404(我已经确认它是从 PUT 请求返回的 404,但是在 Elasticsearch 服务器日志中它说它创建了没有问题的索引)
  • 尝试创建索引时出现 500,虽然这一次已经有一段时间没有发生了,所以我认为这是由其他东西修复的
  • 查询响应有时在 elasticsearch 库的 _process_bulk_chunk 函数中没有 items 字典值

我在想在连接层发生了一些奇怪的事情(比如 Django 测试运行程序进程之间的连接以某种方式混淆了响应?)但我不知道这怎么可能因为 Django 使用多处理来并行化测试,因此它们每个都在自己的进程中运行。是否有可能衍生进程仍在尝试使用原始进程的连接池或其他东西?我真的不知道从这里可以尝试的其他事情,非常感谢一些提示,甚至只是确认这实际上是可能的。

最佳答案

I'm thinking that there's something weird going on at the connection layer (like somehow the connections between Django test runner processes are getting the responses mixed up?) but I'm at a loss as to how that would be even possible since Django uses multiprocessing to parallelize the tests and thus they are each running in their own process. Is it somehow possible that the spun-off processes are still trying to use the connection pool of the original process or something?

这正是正在发生的事情。来自 the Elasticsearch DSL docs :

Since we use persistent connections throughout the client it means that the client doesn’t tolerate fork very well. If your application calls for multiple processes make sure you create a fresh client after call to fork. Note that Python’s multiprocessing module uses fork to create new processes on POSIX systems.

我观察到的情况是,响应非常奇怪地与可能已启动请求的看似随机的客户端交织在一起。因此,对文档建立索引的请求可能会以创建索引的响应结束,该索引具有非常不同的属性。

修复是为了确保每个测试工作人员都有自己的 Elasticsearch 客户端。这可以通过创建特定于 worker 的连接别名,然后用特定于 worker 的别名覆盖当前连接别名(使用私有(private)属性 _using)来完成。以下是您随更改发布的代码的修改版本

_worker_id = 0
def _elastic_search_init_worker(counter):
    global _worker_id

    with counter.get_lock():
        counter.value += 1
        _worker_id = counter.value

    for alias in connections:
        connection = connections[alias]
        settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
        # connection.settings_dict must be updated in place for changes to be
        # reflected in django.db.connections. If the following line assigned
        # connection.settings_dict = settings_dict, new threads would connect
        # to the default database instead of the appropriate clone.
        connection.settings_dict.update(settings_dict)
        connection.close()

    ### Everything above this is from the Django version of this function ###

    from elasticsearch_dsl.connections import connections

    # each worker needs its own connection to elasticsearch, the ElasticsearchClient uses
    # global connection objects that do not play nice otherwise
    worker_connection_postfix = f"_worker_{_worker_id}"
    for alias in connections:
        connections.configure(**{alias + worker_connection_postfix: settings.ELASTICSEARCH_DSL["default"]})

    # Update index names in doctypes
    for doc in registry.get_documents():
        doc._doc_type.index += f"_{_worker_id}"
        # Use the worker-specific connection
        doc._doc_type._using = doc.doc_type._using + worker_connection_postfix

    # Update index names for indexes and create new indexes
    for index in registry.get_indices():
        index._name += f"_{_worker_id}"
        index._using = doc.doc_type._using + worker_connection_postfix
        index.delete(ignore=[404])
        index.create()

    print(f"Started thread # {_worker_id}")

https://stackoverflow.com/questions/63238907/

相关文章:

android - 使用 Jetpack Navigation 在深度链接 URI 中使用字符串常量

reactjs - 在 Jest 中测试 Axios

laravel - 如何在 Laravel 中显示带有法语符号的验证消息?

angular - 在 Angular 10 中将 HTML 转换为 PDF

html - 网络套件 : FreeMarker/BFO Images Cutting Off at

java - 使用 Spring WebClient 重复过滤响应

php - 使用 Laravel 的 Eloquent 将数据发送到数据库的问题

python-3.x - 无论如何,您是否可以检查 Azure CLI 在后台执行哪些 API 调用

python - Django 信号卡在循环中

reactjs - react : Fix missing dependency warning u