asynchronous - 带有 zmq_proxy 和回复的 ZeroMQ 异步客户端

我正在尝试使用 libzmq C API 使 C 模型中的异步客户端/服务器运行。

我在 Linux 平台上使用 ZeroMQ 3.2.2,我尝试使用的模式如下所示:

>Client -> DEALER  
>
>Router -> ROUTER  
>---   proxy   ---  
>Dealer -> DEALER  
>
>Workers-> DEALER  

我需要客户端是非阻塞的,还需要接收对消息的响应。从我看到的示例中,我的理解是将 ROUTER-DEALER 与 zmq_proxy 一起使用应该将消息返回给初始客户端。

但是,通过将捕获套接字附加到 zmq_proxy,似乎没有将响应路由回。

当我将客户端更改为 REQ 并将工作人员更改为 REP 时,一切都按预期工作。 欢迎任何对我哪里出错或误解的反馈。

下面是 3 个组件(客户端、代理和工作程序)。

测试客户端

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>

int main (void)
{
    printf ("Connecting to hello world broker...\n");

    void *ctx = zmq_ctx_new ();
    void *requester = zmq_socket (ctx, ZMQ_DEALER);
     //void *requester = zmq_socket (ctx, ZMQ_REQ);
    zmq_connect (requester, "tcp://10.1.1.31:5555");

    printf ("Sending Request  : HELLO \n");
    int rc = zmq_send (requester, "HELLO", 6, 0);

    if (rc > 0) {
        printf ("Success : Sent size ... %d!\n",rc);
    } else {
        printf("Error: %s\n", zmq_strerror(errno)); 
    }

    printf ("Wait for response ..\n");

    char buffer [256];
    zmq_recv (requester, buffer, 256, 0);

    printf ("Response Received : %s\n",&buffer);

    zmq_close (requester);
    zmq_ctx_destroy (ctx);
    return 0;
}

测试代理

#define _MULTI_THREADED
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>

#define ZFRAME_MORE     1
#define ZFRAME_REUSE    2
#define ZFRAME_DONTWAIT 4

static void *proxy_capture (void *ctx)
{
    int zerr = 0 ;
    int rRes;
    void *worker = zmq_socket (ctx, ZMQ_DEALER);
    zerr = zmq_connect (worker, "ipc://capture.ipc");

    if (zerr != 0)
    {
        printf ("\n-------------- > proxy_capture bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    while (1) {
        char buf [256];
        int rc = zmq_recv (worker, buf, 256, 0); 
        assert (rc != -1);
        printf ("Capture value : %s !\n", &buf);
    }
}

int main(int argc, char *argv[])
{
    int zerr = 0 ;
    int rc = 0 ;
    int rRes;

    // Frontend socket talks to clients over TCP Port
    void *ctx = zmq_ctx_new ();
    void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
    zerr = zmq_bind (frontend, "tcp://10.1.1.31:5555");

    if (zerr != 0)
    {
        printf ("\nFrontend bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    // Backend socket talks to workers 
    void *backend = zmq_socket (ctx, ZMQ_DEALER);
    zerr = zmq_bind (backend, "tcp://10.1.1.31:6555");

    if (zerr != 0)
    {
        printf ("\nBackend bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    void *capture = zmq_socket (ctx, ZMQ_DEALER);
    zerr = zmq_bind (capture, "ipc://capture.ipc");

    if (zerr != 0)
    {
        printf ("\nCapture bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    pthread_t capworker;
    rc = pthread_create(&capworker, NULL, proxy_capture, ctx);

    zmq_proxy (frontend, backend, capture);

    while (1) {
        printf ("Broker loop …\n");
        sleep(1);
    }

    sleep(1);
    zmq_ctx_destroy (&ctx);
    printf ("\nEnd server…\n");

    return 0;
 }

测试人员

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>

#define ZFRAME_MORE     1
#define ZFRAME_REUSE    2
#define ZFRAME_DONTWAIT 4

int main(int argc, char *argv[])
{
    int zerr = 0 ;
    int rc = 0 ;
    int rRes;

    void *ctx = zmq_ctx_new ();
    //void *worker = zmq_socket (ctx, ZMQ_DEALER);
    void *worker = zmq_socket (ctx, ZMQ_REP);
    zerr = zmq_connect (worker, "tcp://10.1.1.31:6555");

    sleep(1);

    if (zerr != 0)
    {
        printf ("Worker connect error : %s\n", zmq_strerror(errno));
        return 0;
    }

    while (1) {
        char buf [256];
        rc = zmq_recv (worker, buf, 256, 0); 
        assert (rc != -1);
        printf ("Received : %s !\n", &buf);
        printf ("Responding to Client... !\n");

        rc = zmq_send(worker, "WORLD", 6, 0);

        if (rc > 0) {
            printf ("Success : Sent size ... %d!\n",rc);
            //break;
        } else {
            printf("Error: %s\n", zmq_strerror(errno)); 
        }
    }
    zmq_close (worker);
    zmq_ctx_destroy (ctx);

    return 0;
 }

非工作输出(客户端 DEALER 和 worker DEALER)

TestClient

Connecting to hello world broker...
Sending Request : HELLO
Success : Sent size ... 6!
Wait for response ..

TestBroker

Capture value : !
Capture value : HELLO ! <-- Req from client Capture
value : WORLD ! <-- Resp from worker
Capture value : WORLD !

TestWorker

Received : !
Responding to Client... !
Success : Sent size ... 6!
Received : HELLO !
Responding to Client... !
Success : Sent size ...
6!

所以看起来工作人员响应了,但是响应丢失了或者路由器错误地定向到客户端?

感谢任何帮助

最佳答案

我在捕获部分偶然发现了您对我的 zmq_proxy 问题的回答,我正试图理解这一点。

对于您的问题,您希望客户端是异步的,而不是使用 REQ,而是使用 DEALER。

下面是我如何让它异步的。我的 zeromq 版本是 4.2.1。

        //  Socket to talk to server
       void *context = zmq_ctx_new();

        void *requester = zmq_socket (context, ZMQ_REQ);
        int timeout = 5000; //Timeout of 5 seconds to make sure not having it hang either while sending or receving...
        int linger = 0;
        zmq_setsockopt (requester, ZMQ_LINGER, &linger, sizeof(int));
        zmq_setsockopt (requester, ZMQ_SNDTIMEO, &timeout, sizeof(int));
        zmq_setsockopt (requester, ZMQ_RCVTIMEO, &timeout, sizeof(int));
        int connection_status = zmq_connect (requester, "tcp://localhost:5559");

您可以通过将其设置为 0 来使用 linger,也可以在从 REQ 发送到代理时使用标志 ZMQ_DONTWAIT

https://stackoverflow.com/questions/23470208/

相关文章:

python - 用python编写的鼠标运动跟踪程序

sql - 在 NLTK 中将英语转换为 SQL

mongodb - 为什么 mongo 的升序/降序排序顺序会为日期字段翻转?

macos - 通过 XPC 与应用程序通信并启动以 root 身份运行的守护进程

asp.net-web-api - 当 WebApi 序列化失败时,我希望异常冒泡而不是 null

xml - 使用 XSLT 转换在 WordProcessingML 中保留换行符和空格

r - 警告 : replacing previous import ‘head’ when loa

session - Laravel 4.1 session 变量被随机遗忘

ios - 解码Code128条码?

php - Laravel:模型内验证。多重验证规则