Fix possible NULL UST session on start trace
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
index 2fcb39a27290ea2d1f9a060948df6252698deaf8..c7c7b7a7e4db220f8cbb512cb976746213e24dbe 100644 (file)
@@ -305,7 +305,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        struct lttng_consumer_channel *channel;
        int ret;
 
-       channel = malloc(sizeof(*channel));
+       channel = zmalloc(sizeof(*channel));
        if (channel == NULL) {
                perror("malloc struct lttng_consumer_channel");
                goto end;
@@ -546,7 +546,8 @@ void lttng_consumer_sync_trace_file(
  */
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
-               int (*buffer_ready)(struct lttng_consumer_stream *stream),
+               int (*buffer_ready)(struct lttng_consumer_stream *stream,
+                       struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(int stream_key, uint32_t state))
@@ -806,7 +807,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        goto end;
                }
 
-               /* No FDs and consumer_quit, kconsumer_cleanup the thread */
+               /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
                        goto end;
                }
@@ -817,42 +818,52 @@ void *lttng_consumer_thread_poll_fds(void *data)
                 * array. We want to prioritize array update over
                 * low-priority reads.
                 */
-               if (pollfd[nb_fd].revents == POLLIN) {
+               if (pollfd[nb_fd].revents & POLLIN) {
                        DBG("consumer_poll_pipe wake up");
                        tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
                        if (tmp2 < 0) {
-                               perror("read kconsumer poll");
+                               perror("read consumer poll");
                        }
                        continue;
                }
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
-                       switch(pollfd[i].revents) {
-                       case POLLERR:
+                       if (pollfd[i].revents & POLLPRI) {
+                               DBG("Urgent read on fd %d", pollfd[i].fd);
+                               high_prio = 1;
+                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                               /* it's ok to have an unavailable sub-buffer */
+                               if (ret == EAGAIN) {
+                                       ret = 0;
+                               }
+                       } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                consumer_del_stream(local_stream[i]);
                                num_hup++;
-                               break;
-                       case POLLHUP:
-                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
-                               num_hup++;
-                               break;
-                       case POLLNVAL:
+                       } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                consumer_del_stream(local_stream[i]);
                                num_hup++;
-                               break;
-                       case POLLPRI:
-                               DBG("Urgent read on fd %d", pollfd[i].fd);
-                               high_prio = 1;
-                               ret = ctx->on_buffer_ready(local_stream[i]);
-                               /* it's ok to have an unavailable sub-buffer */
-                               if (ret == EAGAIN) {
-                                       ret = 0;
+                       } else if ((pollfd[i].revents & POLLHUP) &&
+                                       !(pollfd[i].revents & POLLIN)) {
+                               if (consumer_data.type == LTTNG_CONSUMER_UST) {
+                                       DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+                                               pollfd[i].fd);
+                                       if (!local_stream[i]->hangup_flush_done) {
+                                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+                                               /* try reading after flush */
+                                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                                               /* it's ok to have an unavailable sub-buffer */
+                                               if (ret == EAGAIN) {
+                                                       ret = 0;
+                                               }
+                                       }
+                               } else {
+                                       DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                }
-                               break;
+                               consumer_del_stream(local_stream[i]);
+                               num_hup++;
                        }
                }
 
@@ -868,9 +879,9 @@ void *lttng_consumer_thread_poll_fds(void *data)
                /* Take care of low priority channels. */
                if (high_prio == 0) {
                        for (i = 0; i < nb_fd; i++) {
-                               if (pollfd[i].revents == POLLIN) {
+                               if (pollfd[i].revents & POLLIN) {
                                        DBG("Normal read on fd %d", pollfd[i].fd);
-                                       ret = ctx->on_buffer_ready(local_stream[i]);
+                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
                                        /* it's ok to have an unavailable subbuffer */
                                        if (ret == EAGAIN) {
                                                ret = 0;
@@ -1003,3 +1014,32 @@ end:
        }
        return NULL;
 }
+
+int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_read_subbuffer(stream, ctx);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_read_subbuffer(stream, ctx);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
+int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_on_recv_stream(stream);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_on_recv_stream(stream);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
This page took 0.024809 seconds and 4 git commands to generate.