Add cleanup function for UST app
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
index f031d5a677c4675aa659fcaaf90695e6d5571677..c7c7b7a7e4db220f8cbb512cb976746213e24dbe 100644 (file)
@@ -125,10 +125,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        if (stream->out_fd >= 0) {
                close(stream->out_fd);
        }
-       if (stream->wait_fd >= 0) {
+       if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
                close(stream->wait_fd);
        }
-       if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
+       if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd
+                       && !stream->shm_fd_is_copy) {
                close(stream->shm_fd);
        }
        if (!--stream->chan->refcount)
@@ -180,6 +181,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        case LTTNG_CONSUMER_KERNEL:
                break;
        case LTTNG_CONSUMER_UST:
+               stream->cpu = stream->chan->cpucount++;
                ret = lttng_ustconsumer_allocate_stream(stream);
                if (ret) {
                        free(stream);
@@ -282,10 +284,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                        perror("munmap");
                }
        }
-       if (channel->wait_fd >= 0) {
+       if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
                close(channel->wait_fd);
        }
-       if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
+       if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd
+                       && !channel->shm_fd_is_copy) {
                close(channel->shm_fd);
        }
        free(channel);
@@ -302,7 +305,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        struct lttng_consumer_channel *channel;
        int ret;
 
-       channel = malloc(sizeof(*channel));
+       channel = zmalloc(sizeof(*channel));
        if (channel == NULL) {
                perror("malloc struct lttng_consumer_channel");
                goto end;
@@ -543,7 +546,8 @@ void lttng_consumer_sync_trace_file(
  */
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
-               int (*buffer_ready)(struct lttng_consumer_stream *stream),
+               int (*buffer_ready)(struct lttng_consumer_stream *stream,
+                       struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(int stream_key, uint32_t state))
@@ -803,7 +807,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        goto end;
                }
 
-               /* No FDs and consumer_quit, kconsumer_cleanup the thread */
+               /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
                        goto end;
                }
@@ -814,42 +818,52 @@ void *lttng_consumer_thread_poll_fds(void *data)
                 * array. We want to prioritize array update over
                 * low-priority reads.
                 */
-               if (pollfd[nb_fd].revents == POLLIN) {
+               if (pollfd[nb_fd].revents & POLLIN) {
                        DBG("consumer_poll_pipe wake up");
                        tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
                        if (tmp2 < 0) {
-                               perror("read kconsumer poll");
+                               perror("read consumer poll");
                        }
                        continue;
                }
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
-                       switch(pollfd[i].revents) {
-                       case POLLERR:
+                       if (pollfd[i].revents & POLLPRI) {
+                               DBG("Urgent read on fd %d", pollfd[i].fd);
+                               high_prio = 1;
+                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                               /* it's ok to have an unavailable sub-buffer */
+                               if (ret == EAGAIN) {
+                                       ret = 0;
+                               }
+                       } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                consumer_del_stream(local_stream[i]);
                                num_hup++;
-                               break;
-                       case POLLHUP:
-                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
-                               num_hup++;
-                               break;
-                       case POLLNVAL:
+                       } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                consumer_del_stream(local_stream[i]);
                                num_hup++;
-                               break;
-                       case POLLPRI:
-                               DBG("Urgent read on fd %d", pollfd[i].fd);
-                               high_prio = 1;
-                               ret = ctx->on_buffer_ready(local_stream[i]);
-                               /* it's ok to have an unavailable sub-buffer */
-                               if (ret == EAGAIN) {
-                                       ret = 0;
+                       } else if ((pollfd[i].revents & POLLHUP) &&
+                                       !(pollfd[i].revents & POLLIN)) {
+                               if (consumer_data.type == LTTNG_CONSUMER_UST) {
+                                       DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+                                               pollfd[i].fd);
+                                       if (!local_stream[i]->hangup_flush_done) {
+                                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+                                               /* try reading after flush */
+                                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                                               /* it's ok to have an unavailable sub-buffer */
+                                               if (ret == EAGAIN) {
+                                                       ret = 0;
+                                               }
+                                       }
+                               } else {
+                                       DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                }
-                               break;
+                               consumer_del_stream(local_stream[i]);
+                               num_hup++;
                        }
                }
 
@@ -865,9 +879,9 @@ void *lttng_consumer_thread_poll_fds(void *data)
                /* Take care of low priority channels. */
                if (high_prio == 0) {
                        for (i = 0; i < nb_fd; i++) {
-                               if (pollfd[i].revents == POLLIN) {
+                               if (pollfd[i].revents & POLLIN) {
                                        DBG("Normal read on fd %d", pollfd[i].fd);
-                                       ret = ctx->on_buffer_ready(local_stream[i]);
+                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
                                        /* it's ok to have an unavailable subbuffer */
                                        if (ret == EAGAIN) {
                                                ret = 0;
@@ -916,11 +930,11 @@ void *lttng_consumer_thread_receive_fds(void *data)
                goto end;
        }
 
-       DBG("Sending ready command to ltt-sessiond");
+       DBG("Sending ready command to lttng-sessiond");
        ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY);
        /* return < 0 on error, but == 0 is not fatal */
        if (ret < 0) {
-               ERR("Error sending ready command to ltt-sessiond");
+               ERR("Error sending ready command to lttng-sessiond");
                goto end;
        }
 
@@ -1000,3 +1014,32 @@ end:
        }
        return NULL;
 }
+
+int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_read_subbuffer(stream, ctx);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_read_subbuffer(stream, ctx);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
+int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               return lttng_kconsumer_on_recv_stream(stream);
+       case LTTNG_CONSUMER_UST:
+               return lttng_ustconsumer_on_recv_stream(stream);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
This page took 0.025545 seconds and 4 git commands to generate.