1. 程式人生 > >ZeroMQ介面函式之 :zmq_socket_monitor

ZeroMQ介面函式之 :zmq_socket_monitor

1 #include <stdio.h> 2 #include <zmq.h> 3 #include <pthread.h> 4 #include <string.h> 5 #include <assert.h> 6 7 static int read_msg(void* s, zmq_event_t* event, char* ep) 8 { 9 int rc ; 10 zmq_msg_t msg1; // binary part 11 zmq_msg_init (&msg1);
12 zmq_msg_t msg2; // address part 13 zmq_msg_init (&msg2); 14 rc = zmq_msg_recv (&msg1, s, 0); 15 if (rc == -1 && zmq_errno() == ETERM) 16 return 1 ; 17 assert (rc != -1); 18 assert (zmq_msg_more(&msg1) != 0); 19 rc = zmq_msg_recv (&msg2, s, 0
); 20 if (rc == -1 && zmq_errno() == ETERM) 21 return 1; 22 assert (rc != -1); 23 assert (zmq_msg_more(&msg2) == 0); 24 // copy binary data to event struct 25 const char* data = (char*)zmq_msg_data(&msg1); 26 memcpy(&(event->event), data, sizeof
(event->event)); 27 memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value)); 28 // copy address part 29 const size_t len = zmq_msg_size(&msg2) ; 30 ep = memcpy(ep, zmq_msg_data(&msg2), len); 31 *(ep + len) = 0 ; 32 return 0 ; 33 } 34 35 // REP socket monitor thread 36 static void *rep_socket_monitor (void *ctx) 37 { 38 zmq_event_t event; 39 static char addr[1025] ; 40 int rc; 41 printf("starting monitor...\n"); 42 void *s = zmq_socket (ctx, ZMQ_PAIR); 43 assert (s); 44 rc = zmq_connect (s, "inproc://monitor.rep"); 45 assert (rc == 0); 46 while (!read_msg(s, &event, addr)) { 47 switch (event.event) { 48 case ZMQ_EVENT_LISTENING: 49 printf ("listening socket descriptor %d\n", event.value); 50 printf ("listening socket address %s\n", addr); 51 break; 52 case ZMQ_EVENT_ACCEPTED: 53 printf ("accepted socket descriptor %d\n", event.value); 54 printf ("accepted socket address %s\n", addr); 55 break; 56 case ZMQ_EVENT_CLOSE_FAILED: 57 printf ("socket close failure error code %d\n", event.value); 58 printf ("socket address %s\n", addr); 59 break; 60 case ZMQ_EVENT_CLOSED: 61 printf ("closed socket descriptor %d\n", event.value); 62 printf ("closed socket address %s\n", addr); 63 break; 64 case ZMQ_EVENT_DISCONNECTED: 65 printf ("disconnected socket descriptor %d\n", event.value); 66 printf ("disconnected socket address %s\n", addr); 67 break; 68 } 69 } 70 zmq_close (s); 71 return NULL; 72 } 73 74 int main() 75 { 76 const char* addr = "tcp://127.0.0.1:6666" ; 77 pthread_t thread ; 78 // Create the infrastructure 79 void *ctx = zmq_init (1); 80 assert (ctx); 81 // REP socket 82 void* rep = zmq_socket (ctx, ZMQ_REP); 83 assert (rep); 84 // REP socket monitor, all events 85 int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); 86 assert (rc == 0); 87 rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx); 88 assert (rc == 0); 89 rc = zmq_bind (rep, addr); 90 assert (rc == 0); 91 // Allow some time for event detection 92 zmq_sleep (1); 93 // Close the REP socket 94 rc = zmq_close (rep); 95 assert (rc == 0); 96 zmq_term (ctx); 97 return 0 ; 98 }