X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-consumer%2Flttng-consumer.c;h=0811e68ca8e8368c7695f92f37a44c906b42b7c0;hp=f031d5a677c4675aa659fcaaf90695e6d5571677;hb=f2ca2e251d8f49b0dbbcca529dd61b3562c1147f;hpb=3bd1e0819b577ffcb44acd7c2f8e02ff09654b7b diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index f031d5a67..0811e68ca 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -63,6 +63,9 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) { struct lttng_consumer_stream *iter; + /* Negative keys are lookup failures */ + if (key < 0) + return NULL; cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { if (iter->key == key) { DBG("Found stream key %d", key); @@ -72,10 +75,22 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) return NULL; } +static void consumer_steal_stream_key(int key) +{ + struct lttng_consumer_stream *stream; + + stream = consumer_find_stream(key); + if (stream) + stream->key = -1; +} + static struct lttng_consumer_channel *consumer_find_channel(int key) { struct lttng_consumer_channel *iter; + /* Negative keys are lookup failures */ + if (key < 0) + return NULL; cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) { if (iter->key == key) { DBG("Found channel key %d", key); @@ -85,6 +100,15 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) return NULL; } +static void consumer_steal_channel_key(int key) +{ + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (channel) + channel->key = -1; +} + /* * Remove a stream from the global list protected by a mutex. This * function is also responsible for freeing its data structures. @@ -105,7 +129,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) } } break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: lttng_ustconsumer_del_stream(stream); break; default: @@ -125,10 +150,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) if (stream->out_fd >= 0) { close(stream->out_fd); } - if (stream->wait_fd >= 0) { + if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) { close(stream->wait_fd); } - if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) { + if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd + && !stream->shm_fd_is_copy) { close(stream->shm_fd); } if (!--stream->chan->refcount) @@ -148,12 +174,14 @@ struct lttng_consumer_stream *consumer_allocate_stream( enum lttng_consumer_stream_state state, uint64_t mmap_len, enum lttng_event_output output, - const char *path_name) + const char *path_name, + uid_t uid, + gid_t gid) { struct lttng_consumer_stream *stream; int ret; - stream = malloc(sizeof(*stream)); + stream = zmalloc(sizeof(*stream)); if (stream == NULL) { perror("malloc struct lttng_consumer_stream"); goto end; @@ -173,13 +201,17 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->mmap_len = mmap_len; stream->mmap_base = NULL; stream->output = output; + stream->uid = uid; + stream->gid = gid; strncpy(stream->path_name, path_name, PATH_MAX - 1); stream->path_name[PATH_MAX - 1] = '\0'; switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + stream->cpu = stream->chan->cpucount++; ret = lttng_ustconsumer_allocate_stream(stream); if (ret) { free(stream); @@ -209,11 +241,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) int ret = 0; pthread_mutex_lock(&consumer_data.lock); - /* Check if already exist */ - if (consumer_find_stream(stream->key)) { - ret = -1; - goto end; - } + /* Steal stream identifier, for UST */ + consumer_steal_stream_key(stream->key); cds_list_add(&stream->list, &consumer_data.stream_list.head); consumer_data.stream_count++; consumer_data.need_update = 1; @@ -221,7 +250,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: /* Streams are in CPU number order (we rely on this) */ stream->cpu = stream->chan->nr_streams++; break; @@ -266,7 +296,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: lttng_ustconsumer_del_channel(channel); break; default: @@ -282,10 +313,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) perror("munmap"); } } - if (channel->wait_fd >= 0) { + if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) { close(channel->wait_fd); } - if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) { + if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd + && !channel->shm_fd_is_copy) { close(channel->shm_fd); } free(channel); @@ -302,7 +334,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( struct lttng_consumer_channel *channel; int ret; - channel = malloc(sizeof(*channel)); + channel = zmalloc(sizeof(*channel)); if (channel == NULL) { perror("malloc struct lttng_consumer_channel"); goto end; @@ -320,7 +352,8 @@ struct lttng_consumer_channel *consumer_allocate_channel( channel->mmap_base = NULL; channel->mmap_len = 0; break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: ret = lttng_ustconsumer_allocate_channel(channel); if (ret) { free(channel); @@ -347,18 +380,12 @@ end: */ int consumer_add_channel(struct lttng_consumer_channel *channel) { - int ret = 0; - pthread_mutex_lock(&consumer_data.lock); - /* Check if already exist */ - if (consumer_find_channel(channel->key)) { - ret = -1; - goto end; - } + /* Steal channel identifier, for UST */ + consumer_steal_channel_key(channel->key); cds_list_add(&channel->list, &consumer_data.channel_list.head); -end: pthread_mutex_unlock(&consumer_data.lock); - return ret; + return 0; } /* @@ -543,7 +570,8 @@ void lttng_consumer_sync_trace_file( */ struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, - int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int stream_key, uint32_t state)) @@ -555,7 +583,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( consumer_data.type == type); consumer_data.type = type; - ctx = malloc(sizeof(struct lttng_consumer_local_data)); + ctx = zmalloc(sizeof(struct lttng_consumer_local_data)); if (ctx == NULL) { perror("allocating context"); goto error; @@ -637,7 +665,8 @@ int lttng_consumer_on_read_subbuffer_mmap( switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len); default: ERR("Unknown consumer_data type"); @@ -657,7 +686,8 @@ int lttng_consumer_on_read_subbuffer_splice( switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return -ENOSYS; default: ERR("Unknown consumer_data type"); @@ -678,7 +708,8 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_take_snapshot(ctx, stream); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_take_snapshot(ctx, stream); default: ERR("Unknown consumer_data type"); @@ -701,7 +732,8 @@ int lttng_consumer_get_produced_snapshot( switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos); default: ERR("Unknown consumer_data type"); @@ -716,7 +748,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll); default: ERR("Unknown consumer_data type"); @@ -741,7 +774,7 @@ void *lttng_consumer_thread_poll_fds(void *data) int tmp2; struct lttng_consumer_local_data *ctx = data; - local_stream = malloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { high_prio = 0; @@ -763,7 +796,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); @@ -771,7 +804,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - local_stream = malloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { perror("local_stream malloc"); @@ -803,7 +836,7 @@ void *lttng_consumer_thread_poll_fds(void *data) goto end; } - /* No FDs and consumer_quit, kconsumer_cleanup the thread */ + /* No FDs and consumer_quit, consumer_cleanup the thread */ if (nb_fd == 0 && consumer_quit == 1) { goto end; } @@ -814,42 +847,51 @@ void *lttng_consumer_thread_poll_fds(void *data) * array. We want to prioritize array update over * low-priority reads. */ - if (pollfd[nb_fd].revents == POLLIN) { + if (pollfd[nb_fd].revents & POLLIN) { DBG("consumer_poll_pipe wake up"); tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1); if (tmp2 < 0) { - perror("read kconsumer poll"); + perror("read consumer poll"); } continue; } /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { - switch(pollfd[i].revents) { - case POLLERR: + if (pollfd[i].revents & POLLPRI) { + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = ctx->on_buffer_ready(local_stream[i], ctx); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLHUP: - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); - num_hup++; - break; - case POLLNVAL: + } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); consumer_del_stream(local_stream[i]); num_hup++; - break; - case POLLPRI: - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - ret = ctx->on_buffer_ready(local_stream[i]); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; + } else if ((pollfd[i].revents & POLLHUP) && + !(pollfd[i].revents & POLLIN)) { + if (consumer_data.type == LTTNG_CONSUMER32_UST + || consumer_data.type == LTTNG_CONSUMER64_UST) { + DBG("Polling fd %d tells it has hung up. Attempting flush and read.", + pollfd[i].fd); + if (!local_stream[i]->hangup_flush_done) { + lttng_ustconsumer_on_stream_hangup(local_stream[i]); + /* read after flush */ + do { + ret = ctx->on_buffer_ready(local_stream[i], ctx); + } while (ret == EAGAIN); + } + } else { + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); } - break; + consumer_del_stream(local_stream[i]); + num_hup++; } } @@ -865,9 +907,9 @@ void *lttng_consumer_thread_poll_fds(void *data) /* Take care of low priority channels. */ if (high_prio == 0) { for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents == POLLIN) { + if (pollfd[i].revents & POLLIN) { DBG("Normal read on fd %d", pollfd[i].fd); - ret = ctx->on_buffer_ready(local_stream[i]); + ret = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable subbuffer */ if (ret == EAGAIN) { ret = 0; @@ -916,11 +958,11 @@ void *lttng_consumer_thread_receive_fds(void *data) goto end; } - DBG("Sending ready command to ltt-sessiond"); + DBG("Sending ready command to lttng-sessiond"); ret = lttng_consumer_send_error(ctx, CONSUMERD_COMMAND_SOCK_READY); /* return < 0 on error, but == 0 is not fatal */ if (ret < 0) { - ERR("Error sending ready command to ltt-sessiond"); + ERR("Error sending ready command to lttng-sessiond"); goto end; } @@ -1000,3 +1042,34 @@ end: } return NULL; } + +int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_read_subbuffer(stream, ctx); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_read_subbuffer(stream, ctx); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +} + +int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) +{ + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + return lttng_kconsumer_on_recv_stream(stream); + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + return lttng_ustconsumer_on_recv_stream(stream); + default: + ERR("Unknown consumer_data type"); + assert(0); + return -ENOSYS; + } +}