Fix: make ust consumer posix compliant for poll flags
[lttng-tools.git] / src / common / consumer.c
index 2e5ec5c35de08459d0b77b2d224f14ae90632bb3..ae59b6b602909a2752dc142d857dbe1fb503c703 100644 (file)
@@ -646,7 +646,7 @@ void lttng_consumer_sync_trace_file(
  */
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
-               int (*buffer_ready)(struct lttng_consumer_stream *stream,
+               ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
                        struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
@@ -734,7 +734,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
  *
  * Returns the number of bytes written
  */
-int lttng_consumer_on_read_subbuffer_mmap(
+ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
@@ -757,7 +757,7 @@ int lttng_consumer_on_read_subbuffer_mmap(
  *
  * Returns the number of bytes spliced.
  */
-int lttng_consumer_on_read_subbuffer_splice(
+ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
@@ -903,6 +903,10 @@ void *lttng_consumer_thread_poll_fds(void *data)
                }
                pthread_mutex_unlock(&consumer_data.lock);
 
+               /* No FDs and consumer_quit, consumer_cleanup the thread */
+               if (nb_fd == 0 && consumer_quit == 1) {
+                       goto end;
+               }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
@@ -923,11 +927,6 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        goto end;
                }
 
-               /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && consumer_quit == 1) {
-                       goto end;
-               }
-
                /*
                 * If the consumer_poll_pipe triggered poll go
                 * directly to the beginning of the loop to update the
@@ -946,69 +945,90 @@ void *lttng_consumer_thread_poll_fds(void *data)
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        if (pollfd[i].revents & POLLPRI) {
+                               ssize_t len;
+
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
-                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
-                               if (ret == EAGAIN) {
-                                       ret = 0;
+                               if (len < 0 && len != -EAGAIN) {
+                                       goto end;
+                               } else if (len > 0) {
+                                       local_stream[i]->data_read = 1;
                                }
-                       } else if (pollfd[i].revents & POLLERR) {
-                               ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               rcu_read_lock();
-                               consumer_del_stream_rcu(&local_stream[i]->node.head);
-                               rcu_read_unlock();
-                               num_hup++;
-                       } else if (pollfd[i].revents & POLLNVAL) {
-                               ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               rcu_read_lock();
-                               consumer_del_stream_rcu(&local_stream[i]->node.head);
-                               rcu_read_unlock();
-                               num_hup++;
-                       } else if ((pollfd[i].revents & POLLHUP) &&
-                                       !(pollfd[i].revents & POLLIN)) {
-                               if (consumer_data.type == LTTNG_CONSUMER32_UST
-                                               || consumer_data.type == LTTNG_CONSUMER64_UST) {
-                                       DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
-                                               pollfd[i].fd);
-                                       if (!local_stream[i]->hangup_flush_done) {
-                                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
-                                               /* read after flush */
-                                               do {
-                                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
-                                               } while (ret == EAGAIN);
-                                       }
-                               } else {
-                                       DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               }
-                               rcu_read_lock();
-                               consumer_del_stream_rcu(&local_stream[i]->node.head);
-                               rcu_read_unlock();
-                               num_hup++;
                        }
                }
 
-               /* If every buffer FD has hung up, we end the read loop here */
-               if (nb_fd > 0 && num_hup == nb_fd) {
-                       DBG("every buffer FD has hung up\n");
-                       if (consumer_quit == 1) {
-                               goto end;
-                       }
+               /*
+                * If we read high prio channel in this loop, try again
+                * for more high prio data.
+                */
+               if (high_prio) {
                        continue;
                }
 
                /* Take care of low priority channels. */
-               if (high_prio == 0) {
-                       for (i = 0; i < nb_fd; i++) {
-                               if (pollfd[i].revents & POLLIN) {
-                                       DBG("Normal read on fd %d", pollfd[i].fd);
-                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
-                                       /* it's ok to have an unavailable subbuffer */
-                                       if (ret == EAGAIN) {
-                                               ret = 0;
-                                       }
+               for (i = 0; i < nb_fd; i++) {
+                       if ((pollfd[i].revents & POLLIN) ||
+                                       local_stream[i]->hangup_flush_done) {
+                               ssize_t len;
+
+                               assert(!(pollfd[i].revents & POLLERR));
+                               assert(!(pollfd[i].revents & POLLNVAL));
+                               DBG("Normal read on fd %d", pollfd[i].fd);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx);
+                               /* it's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN) {
+                                       goto end;
+                               } else if (len > 0) {
+                                       local_stream[i]->data_read = 1;
+                               }
+                       }
+               }
+
+               /* Handle hangup and errors */
+               for (i = 0; i < nb_fd; i++) {
+                       if (!local_stream[i]->hangup_flush_done
+                                       && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
+                                       && (consumer_data.type == LTTNG_CONSUMER32_UST
+                                               || consumer_data.type == LTTNG_CONSUMER64_UST)) {
+                               DBG("fd %d is hup|err|nval. Attempting flush and read.",
+                                       pollfd[i].fd);
+                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+                               /* Attempt read again, for the data we just flushed. */
+                               local_stream[i]->data_read = 1;
+                       }
+                       /*
+                        * If the poll flag is HUP/ERR/NVAL and we have
+                        * read no data in this pass, we can remove the
+                        * stream from its hash table.
+                        */
+                       if ((pollfd[i].revents & POLLHUP)) {
+                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               if (!local_stream[i]->data_read) {
+                                       rcu_read_lock();
+                                       consumer_del_stream_rcu(&local_stream[i]->node.head);
+                                       rcu_read_unlock();
+                                       num_hup++;
+                               }
+                       } else if (pollfd[i].revents & POLLERR) {
+                               ERR("Error returned in polling fd %d.", pollfd[i].fd);
+                               if (!local_stream[i]->data_read) {
+                                       rcu_read_lock();
+                                       consumer_del_stream_rcu(&local_stream[i]->node.head);
+                                       rcu_read_unlock();
+                                       num_hup++;
+                               }
+                       } else if (pollfd[i].revents & POLLNVAL) {
+                               ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
+                               if (!local_stream[i]->data_read) {
+                                       rcu_read_lock();
+                                       consumer_del_stream_rcu(&local_stream[i]->node.head);
+                                       rcu_read_unlock();
+                                       num_hup++;
                                }
                        }
+                       local_stream[i]->data_read = 0;
                }
        }
 end:
@@ -1140,7 +1160,7 @@ end:
        return NULL;
 }
 
-int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        switch (consumer_data.type) {
This page took 0.025671 seconds and 4 git commands to generate.