Cygwin: Fix handling of wait pipe hangup by properly detecting EOF
[lttng-tools.git] / src / common / consumer.c
index 2604f3d060574c1c7677047fcc16581a5c54818d..c511494356369f59727467d835ef08e82bc99a19 100644 (file)
@@ -636,6 +636,9 @@ void lttng_consumer_cleanup(void)
        }
 
        rcu_read_unlock();
+
+       lttng_ht_destroy(consumer_data.stream_ht);
+       lttng_ht_destroy(consumer_data.channel_ht);
 }
 
 /*
@@ -736,6 +739,20 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       /* set read end of the pipe to non-blocking */
+       ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto error_poll_fcntl;
+       }
+
+       /* set write end of the pipe to non-blocking */
+       ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto error_poll_fcntl;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                perror("Error creating recv pipe");
@@ -760,6 +777,7 @@ error_thread_pipe:
                        PERROR("close");
                }
        }
+error_poll_fcntl:
 error_quit_pipe:
        for (i = 0; i < 2; i++) {
                int err;
@@ -933,8 +951,6 @@ void *lttng_consumer_thread_poll_fds(void *data)
        struct lttng_consumer_stream **local_stream = NULL;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
-       char tmp;
-       int tmp2;
        struct lttng_consumer_local_data *ctx = data;
 
        rcu_register_thread();
@@ -1019,38 +1035,56 @@ void *lttng_consumer_thread_poll_fds(void *data)
                 * low-priority reads.
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
+                       size_t pipe_readlen;
+                       char tmp;
+
                        DBG("consumer_poll_pipe wake up");
-                       tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
-                       if (tmp2 < 0) {
-                               perror("read consumer poll");
-                       }
+                       /* Consume 1 byte of pipe data */
+                       do {
+                               pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+                       } while (pipe_readlen == -1 && errno == EINTR);
                        continue;
                }
 
-               /* Take care of high priority channels first. */
+
+               /* Check if each pipe has data. hack for cygwin. */
                for (i = 0; i < nb_fd; i++) {
-                       if (pollfd[i].revents & POLLPRI) {
-                               ssize_t len;
+                       if ((pollfd[i].revents & POLLIN) ||
+                                       local_stream[i]->hangup_flush_done) {
+                               int check_ret;
 
-                               DBG("Urgent read on fd %d", pollfd[i].fd);
-                               high_prio = 1;
-                               len = ctx->on_buffer_ready(local_stream[i], ctx);
-                               /* it's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN) {
-                                       goto end;
-                               } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                               check_ret = lttng_consumer_check_pipe(local_stream[i], ctx);
+                               if (check_ret != 0) {
+                                       pollfd[i].revents |= POLLHUP;
                                }
                        }
                }
 
-               /*
-                * If we read high prio channel in this loop, try again
-                * for more high prio data.
-                */
-               if (high_prio) {
-                       continue;
-               }
+               /* Take care of high priority channels first. */
+               /* for (i = 0; i < nb_fd; i++) { */
+               /*      DBG("!!! POLL FLAGS: %d", pollfd[i].revents); */
+               /*      if (pollfd[i].revents & POLLPRI) { */
+               /*              ssize_t len; */
+
+               /*              DBG("Urgent read on fd %d", pollfd[i].fd); */
+               /*              high_prio = 1; */
+               /*              len = ctx->on_buffer_ready(local_stream[i], ctx); */
+               /*              /\* it's ok to have an unavailable sub-buffer *\/ */
+               /*              if (len < 0 && len != -EAGAIN) { */
+               /*                      goto end; */
+               /*              } else if (len > 0) { */
+               /*                      local_stream[i]->data_read = 1; */
+               /*              } */
+               /*      } */
+               /* } */
+
+               /* /\* */
+               /*  * If we read high prio channel in this loop, try again */
+               /*  * for more high prio data. */
+               /*  *\/ */
+               /* if (high_prio) { */
+               /*      continue; */
+               /* } */
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
@@ -1202,8 +1236,12 @@ void *lttng_consumer_thread_receive_fds(void *data)
                        DBG("Received STOP command");
                        goto end;
                }
-               if (ret < 0) {
-                       ERR("Communication interrupted on command socket");
+               if (ret <= 0) {
+                       /*
+                        * This could simply be a session daemon quitting. Don't output
+                        * ERR() here.
+                        */
+                       DBG("Communication interrupted on command socket");
                        goto end;
                }
                if (consumer_quit) {
@@ -1228,11 +1266,20 @@ end:
         */
        consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
 
-       /* wake up the polling thread */
-       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
-       if (ret < 0) {
-               perror("poll pipe write");
-       }
+       /*
+        * Wake-up the other end by writing a null byte in the pipe
+        * (non-blocking). Important note: Because writing into the
+        * pipe is non-blocking (and therefore we allow dropping wakeup
+        * data, as long as there is wakeup data present in the pipe
+        * buffer to wake up the other end), the other end should
+        * perform the following sequence for waiting:
+        * 1) empty the pipe (reads).
+        * 2) perform update operation.
+        * 3) wait on the pipe (poll).
+        */
+       do {
+               ret = write(ctx->consumer_poll_pipe[1], "", 1);
+       } while (ret == -1UL && errno == EINTR);
        rcu_unregister_thread();
        return NULL;
 }
@@ -1253,6 +1300,23 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 }
 
+int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               assert(0);
+               return -ENOSYS;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               return lttng_ustconsumer_check_pipe(stream, ctx);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
 {
        switch (consumer_data.type) {
This page took 0.030166 seconds and 4 git commands to generate.