zmq中间件框架入门

02-29 1840阅读

1. 简介:

  • zmq(全称:ZeroMQ)表面看起来像是一个嵌入式网络连接库,实际上是一个并发框架。
  • zmq框架提供地套接字可以满足多种协议之间传输原子信息,如:线程间、进程间、TCP、广播等。
  • zmq框架可以构建多对多地连接方式,如:扇出、发布-订阅、任务分发、请求-应答等。
  • zmq框架的高速使其能胜任分布式应用场景
  • zmq框架的异步IO机制让你能够构建多核应用程序,完成异步消息处理任务。
  • zmq框架有着多语言支持,并能在几乎所有操作系统上运行。

    zmq中间件框架入门

    • 概括地来说,这是一个高速并发消息通信框架,这是一个跨平台、跨系统、多语言的中间件软件。
    • ZeroMQ官方相关文档
      • https://zeromq.org/get-started/
      • https://zguide.zeromq.org/
      • 官方文档的中文翻译版本:https://gitee.com/solym/ZeroMQ-Guide-Zh
      • 官方示例代码仓库:https://github.com/imatix/zguide

        2. Request-Reply mode(请求应答模式)

        zmq中间件框架入门
        The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that’s all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly, the service issues zmq_recv() and then zmq_send() in that order, as often as it needs to.
        ZeroMQ uses C as its reference language and this is the main language we’ll use for examples. If you’re reading this online, the link below the example takes you to translations into other programming languages.

        C++风格示例:

        //
        //  Hello World server in C++
        //  Binds REP socket to tcp://*:5555
        //  Expects "Hello" from client, replies with "World"
        //
        #include 
        #include 
        #include 
        #ifndef _WIN32
        #include 
        #else
        #include 
        #define sleep(n)	Sleep(n)
        #endif
        int main () {
            //  Prepare our context and socket
            zmq::context_t context (2);
            zmq::socket_t socket (context, zmq::socket_type::rep);
            socket.bind ("tcp://*:5555");
            while (true) {
                zmq::message_t request;
                //  Wait for next request from client
                socket.recv (request, zmq::recv_flags::none);
                std::cout 
            //  Prepare our context and socket
            zmq::context_t context (1);
            zmq::socket_t socket (context, zmq::socket_type::req);
            std::cout 
                zmq::message_t request (5);
                memcpy (request.data (), "Hello", 5);
                std::cout 
            //  Socket to talk to clients
            void *context = zmq_ctx_new ();
            void *responder = zmq_socket (context, ZMQ_REP);
            int rc = zmq_bind (responder, "tcp://*:5555");
            assert (rc == 0);
            while (1) {
                char buffer [10];
                zmq_recv (responder, buffer, 10, 0);
                printf ("Received Hello\n");
                sleep (1);          //  Do some 'work'
                zmq_send (responder, "World", 5, 0);
            }
            return 0;
        }
        
            printf ("Connecting to hello world server...\n");
            void *context = zmq_ctx_new ();
            void *requester = zmq_socket (context, ZMQ_REQ);
            zmq_connect (requester, "tcp://localhost:5555");
            int request_nbr;
            for (request_nbr = 0; request_nbr != 10; request_nbr++) {
                char buffer [10];
                printf ("Sending Hello %d...\n", request_nbr);
                zmq_send (requester, "Hello", 5, 0);
                zmq_recv (requester, buffer, 10, 0);
                printf ("Received World %d\n", request_nbr);
            }
            zmq_close (requester);
            zmq_ctx_destroy (context);
            return 0;
        }
        
            //  Prepare our context and publisher
            zmq::context_t context (1);
            zmq::socket_t publisher (context, zmq::socket_type::pub);
            publisher.bind("tcp://*:5556");
            publisher.bind("ipc://weather.ipc");				// Not usable on Windows.
            //  Initialize random number generator
            srandom ((unsigned) time (NULL));
            while (1) {
                int zipcode, temperature, relhumidity;
                //  Get values that will fool the boss
                zipcode     = within (100000);
                temperature = within (215) - 80;
                relhumidity = within (50) + 10;
                //  Send message to all subscribers
                zmq::message_t message(20);
                snprintf ((char *) message.data(), 20 ,
                	"%05d %d %d", zipcode, temperature, relhumidity);
                publisher.send(message, zmq::send_flags::none);
            }
            return 0;
        }
        
            zmq::context_t context (1);
            //  Socket to talk to server
            std::cout 
                zmq::message_t update;
                int zipcode, temperature, relhumidity;
                subscriber.recv(update, zmq::recv_flags::none);
                std::istringstream iss(static_cast
            //  Prepare our context and publisher
            void *context = zmq_ctx_new ();
            void *publisher = zmq_socket (context, ZMQ_PUB);
            int rc = zmq_bind (publisher, "tcp://*:5556");
            assert (rc == 0);
            //  Initialize random number generator
            srandom ((unsigned) time (NULL));
            while (1) {
                //  Get values that will fool the boss
                int zipcode, temperature, relhumidity;
                zipcode     = randof (100000);
                temperature = randof (215) - 80;
                relhumidity = randof (50) + 10;
                //  Send message to all subscribers
                char update [20];
                sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
                s_send (publisher, update);
            }
            zmq_close (publisher);
            zmq_ctx_destroy (context);
            return 0;
        }
        
            //  Socket to talk to server
            printf ("Collecting updates from weather server...\n");
            void *context = zmq_ctx_new ();
            void *subscriber = zmq_socket (context, ZMQ_SUB);
            int rc = zmq_connect (subscriber, "tcp://localhost:5556");
            assert (rc == 0);
            //  Subscribe to zipcode, default is NYC, 10001
            const char *filter = (argc  1)? argv [1]: "10001 ";
            rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                                 filter, strlen (filter));
            assert (rc == 0);
            //  Process 100 updates
            int update_nbr;
            long total_temp = 0;
            for (update_nbr = 0; update_nbr 
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]