X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-consumer%2Flttng-consumer.c;h=54338e800619a728f70ca44673c6f346a0ebecc0;hp=081d6142b2d76505f3f4de4520654380d3c220cc;hb=d056b47720cf547dd8c4ca59076ffcd215d58f2c;hpb=322585731ced1adba36cddcb8bdd5d997d1b2e3e diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index 081d6142b..54338e800 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -125,10 +125,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) if (stream->out_fd >= 0) { close(stream->out_fd); } - if (stream->wait_fd >= 0) { + if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) { close(stream->wait_fd); } - if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) { + if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd + && !stream->shm_fd_is_copy) { close(stream->shm_fd); } if (!--stream->chan->refcount) @@ -180,6 +181,7 @@ struct lttng_consumer_stream *consumer_allocate_stream( case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER_UST: + stream->cpu = stream->chan->cpucount++; ret = lttng_ustconsumer_allocate_stream(stream); if (ret) { free(stream); @@ -282,10 +284,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) perror("munmap"); } } - if (channel->wait_fd >= 0) { + if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) { close(channel->wait_fd); } - if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) { + if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd + && !channel->shm_fd_is_copy) { close(channel->shm_fd); } free(channel); @@ -543,7 +546,8 @@ void lttng_consumer_sync_trace_file( */ struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, - int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int stream_key, uint32_t state)) @@ -803,7 +807,7 @@ void *lttng_consumer_thread_poll_fds(void *data) goto end; } - /* No FDs and consumer_quit, kconsumer_cleanup the thread */ + /* No FDs and consumer_quit, consumer_cleanup the thread */ if (nb_fd == 0 && consumer_quit == 1) { goto end; } @@ -814,42 +818,52 @@ void *lttng_consumer_thread_poll_fds(void *data) * array. We want to prioritize array update over * low-priority reads. */ - if (pollfd[nb_fd].revents == POLLIN) { + if (pollfd[nb_fd].revents & POLLIN) { DBG("consumer_poll_pipe wake up"); tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); if (tmp2 < 0) { - perror("read kconsumer poll"); + perror("read consumer poll"); } continue; } /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { - switch(pollfd[i].revents) { - case POLLERR: + if (pollfd[i].revents & POLLPRI) { + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = ctx->on_buffer_ready(local_stream[i], ctx); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLHUP: - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); - num_hup++; - break; - case POLLNVAL: + } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLPRI: - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - ret = ctx->on_buffer_ready(local_stream[i]); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; + } else if ((pollfd[i].revents & POLLHUP) && + !(pollfd[i].revents & POLLIN)) { + if (consumer_data.type == LTTNG_CONSUMER_UST) { + DBG("Polling fd %d tells it has hung up. Attempting flush and read.", + pollfd[i].fd); + if (!local_stream[i]->hangup_flush_done) { + lttng_ustconsumer_on_stream_hangup(local_stream[i]); + /* try reading after flush */ + ret = ctx->on_buffer_ready(local_stream[i], ctx); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + } + } else { + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); } - break; + consumer_del_stream(local_stream[i]); + num_hup++; } } @@ -865,9 +879,9 @@ void *lttng_consumer_thread_poll_fds(void *data) /* Take care of low priority channels. */ if (high_prio == 0) { for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents == POLLIN) { + if (pollfd[i].revents & POLLIN) { DBG("Normal read on fd %d", pollfd[i].fd); - ret = ctx->on_buffer_ready(local_stream[i]); + ret = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable subbuffer */ if (ret == EAGAIN) { ret = 0; @@ -1000,3 +1014,32 @@ end: } return NULL; } + +int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_read_subbuffer(stream, ctx); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_read_subbuffer(stream, ctx); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + +int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_on_recv_stream(stream); + case LTTNG_CONSUMER_UST: + return lttng_ustconsumer_on_recv_stream(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +}