X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttng-consumer%2Flttng-consumer.c;h=617282d9c38b3690f5969f883690c7922c478b2b;hp=de15dd3b72379276cd0d934e79f2435ef67dbb8a;hb=7bd3904781fa78155c13952f5d5cb9ab42f44a08;hpb=1c3c14ace23b807f049d036b5719093b3bdd6aa6 diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index de15dd3b7..617282d9c 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -37,8 +37,6 @@ #include struct lttng_consumer_global_data consumer_data = { - .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head), - .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head), .stream_count = 0, .need_update = 1, .type = LTTNG_CONSUMER_UNKNOWN, @@ -61,28 +59,68 @@ volatile int consumer_quit = 0; */ static struct lttng_consumer_stream *consumer_find_stream(int key) { - struct lttng_consumer_stream *iter; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_stream *stream = NULL; - cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { - if (iter->key == key) { - DBG("Found stream key %d", key); - return iter; - } + /* Negative keys are lookup failures */ + if (key < 0) + return NULL; + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + stream = caa_container_of(node, struct lttng_consumer_stream, node); } - return NULL; + + rcu_read_unlock(); + + return stream; +} + +static void consumer_steal_stream_key(int key) +{ + struct lttng_consumer_stream *stream; + + stream = consumer_find_stream(key); + if (stream) + stream->key = -1; } static struct lttng_consumer_channel *consumer_find_channel(int key) { - struct lttng_consumer_channel *iter; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_channel *channel = NULL; - cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) { - if (iter->key == key) { - DBG("Found channel key %d", key); - return iter; - } + /* Negative keys are lookup failures */ + if (key < 0) + return NULL; + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + channel = caa_container_of(node, struct lttng_consumer_channel, node); } - return NULL; + + rcu_read_unlock(); + + return channel; +} + +static void consumer_steal_channel_key(int key) +{ + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (channel) + channel->key = -1; } /* @@ -92,6 +130,7 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) void consumer_del_stream(struct lttng_consumer_stream *stream) { int ret; + struct lttng_ht_iter iter; struct lttng_consumer_channel *free_chan = NULL; pthread_mutex_lock(&consumer_data.lock); @@ -105,7 +144,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) } } break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: lttng_ustconsumer_del_stream(stream); break; default: @@ -114,7 +154,17 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) goto end; } - cds_list_del(&stream->list); + rcu_read_lock(); + + /* Get stream node from hash table */ + lttng_ht_lookup(consumer_data.stream_ht, + (void *)((unsigned long) stream->key), &iter); + /* Remove stream node from hash table */ + ret = lttng_ht_del(consumer_data.stream_ht, &iter); + assert(!ret); + + rcu_read_unlock(); + if (consumer_data.stream_count <= 0) { goto end; } @@ -143,18 +193,30 @@ end: consumer_del_channel(free_chan); } +static void consumer_del_stream_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_consumer_stream *stream = + caa_container_of(node, struct lttng_consumer_stream, node); + + consumer_del_stream(stream); +} + struct lttng_consumer_stream *consumer_allocate_stream( int channel_key, int stream_key, int shm_fd, int wait_fd, enum lttng_consumer_stream_state state, uint64_t mmap_len, enum lttng_event_output output, - const char *path_name) + const char *path_name, + uid_t uid, + gid_t gid) { struct lttng_consumer_stream *stream; int ret; - stream = malloc(sizeof(*stream)); + stream = zmalloc(sizeof(*stream)); if (stream == NULL) { perror("malloc struct lttng_consumer_stream"); goto end; @@ -174,13 +236,17 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->mmap_len = mmap_len; stream->mmap_base = NULL; stream->output = output; + stream->uid = uid; + stream->gid = gid; strncpy(stream->path_name, path_name, PATH_MAX - 1); stream->path_name[PATH_MAX - 1] = '\0'; + lttng_ht_node_init_ulong(&stream->node, stream->key); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: stream->cpu = stream->chan->cpucount++; ret = lttng_ustconsumer_allocate_stream(stream); if (ret) { @@ -211,19 +277,19 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) int ret = 0; pthread_mutex_lock(&consumer_data.lock); - /* Check if already exist */ - if (consumer_find_stream(stream->key)) { - ret = -1; - goto end; - } - cds_list_add(&stream->list, &consumer_data.stream_list.head); + /* Steal stream identifier, for UST */ + consumer_steal_stream_key(stream->key); + rcu_read_lock(); + lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); + rcu_read_unlock(); consumer_data.stream_count++; consumer_data.need_update = 1; switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: /* Streams are in CPU number order (we rely on this) */ stream->cpu = stream->chan->nr_streams++; break; @@ -262,13 +328,15 @@ void consumer_change_stream_state(int stream_key, void consumer_del_channel(struct lttng_consumer_channel *channel) { int ret; + struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: lttng_ustconsumer_del_channel(channel); break; default: @@ -277,7 +345,15 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - cds_list_del(&channel->list); + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.channel_ht, + (void *)((unsigned long) channel->key), &iter); + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); + + rcu_read_unlock(); + if (channel->mmap_base != NULL) { ret = munmap(channel->mmap_base, channel->mmap_len); if (ret != 0) { @@ -296,6 +372,16 @@ end: pthread_mutex_unlock(&consumer_data.lock); } +static void consumer_del_channel_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct lttng_consumer_channel *channel= + caa_container_of(node, struct lttng_consumer_channel, node); + + consumer_del_channel(channel); +} + struct lttng_consumer_channel *consumer_allocate_channel( int channel_key, int shm_fd, int wait_fd, @@ -305,7 +391,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( struct lttng_consumer_channel *channel; int ret; - channel = malloc(sizeof(*channel)); + channel = zmalloc(sizeof(*channel)); if (channel == NULL) { perror("malloc struct lttng_consumer_channel"); goto end; @@ -317,13 +403,15 @@ struct lttng_consumer_channel *consumer_allocate_channel( channel->max_sb_size = max_sb_size; channel->refcount = 0; channel->nr_streams = 0; + lttng_ht_node_init_ulong(&channel->node, channel->key); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: channel->mmap_base = NULL; channel->mmap_len = 0; break; - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: ret = lttng_ustconsumer_allocate_channel(channel); if (ret) { free(channel); @@ -350,18 +438,14 @@ end: */ int consumer_add_channel(struct lttng_consumer_channel *channel) { - int ret = 0; - pthread_mutex_lock(&consumer_data.lock); - /* Check if already exist */ - if (consumer_find_channel(channel->key)) { - ret = -1; - goto end; - } - cds_list_add(&channel->list, &consumer_data.channel_list.head); -end: + /* Steal channel identifier, for UST */ + consumer_steal_channel_key(channel->key); + rcu_read_lock(); + lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node); + rcu_read_unlock(); pthread_mutex_unlock(&consumer_data.lock); - return ret; + return 0; } /* @@ -375,18 +459,20 @@ int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream) { - struct lttng_consumer_stream *iter; int i = 0; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; DBG("Updating poll fd array"); - cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { - if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) { + cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, + node.node) { + if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { continue; } - DBG("Active FD %d", iter->wait_fd); - (*pollfd)[i].fd = iter->wait_fd; + DBG("Active FD %d", stream->wait_fd); + (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = iter; + local_stream[i] = stream; i++; } @@ -462,22 +548,31 @@ int lttng_consumer_send_error( */ void lttng_consumer_cleanup(void) { - struct lttng_consumer_stream *iter, *tmp; - struct lttng_consumer_channel *citer, *ctmp; + int ret; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + + rcu_read_lock(); /* - * close all outfd. Called when there are no more threads - * running (after joining on the threads), no need to protect - * list iteration with mutex. + * close all outfd. Called when there are no more threads running (after + * joining on the threads), no need to protect list iteration with mutex. */ - cds_list_for_each_entry_safe(iter, tmp, - &consumer_data.stream_list.head, list) { - consumer_del_stream(iter); + cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node, + node) { + ret = lttng_ht_del(consumer_data.stream_ht, &iter); + assert(!ret); + call_rcu(&node->head, consumer_del_stream_rcu); } - cds_list_for_each_entry_safe(citer, ctmp, - &consumer_data.channel_list.head, list) { - consumer_del_channel(citer); + + cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node, + node) { + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); + call_rcu(&node->head, consumer_del_channel_rcu); } + + rcu_read_unlock(); } /* @@ -559,7 +654,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( consumer_data.type == type); consumer_data.type = type; - ctx = malloc(sizeof(struct lttng_consumer_local_data)); + ctx = zmalloc(sizeof(struct lttng_consumer_local_data)); if (ctx == NULL) { perror("allocating context"); goto error; @@ -641,7 +736,8 @@ int lttng_consumer_on_read_subbuffer_mmap( switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len); default: ERR("Unknown consumer_data type"); @@ -661,7 +757,8 @@ int lttng_consumer_on_read_subbuffer_splice( switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return -ENOSYS; default: ERR("Unknown consumer_data type"); @@ -682,7 +779,8 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_take_snapshot(ctx, stream); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_take_snapshot(ctx, stream); default: ERR("Unknown consumer_data type"); @@ -705,7 +803,8 @@ int lttng_consumer_get_produced_snapshot( switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos); default: ERR("Unknown consumer_data type"); @@ -720,7 +819,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll); default: ERR("Unknown consumer_data type"); @@ -730,7 +830,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } /* - * This thread polls the fds in the ltt_fd_list to consume the data and write + * This thread polls the fds in the set to consume the data and write * it to tracefile if necessary. */ void *lttng_consumer_thread_poll_fds(void *data) @@ -745,14 +845,14 @@ void *lttng_consumer_thread_poll_fds(void *data) int tmp2; struct lttng_consumer_local_data *ctx = data; - local_stream = malloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { high_prio = 0; num_hup = 0; /* - * the ltt_fd_list has been updated, we need to update our + * the fds set has been updated, we need to update our * local array as well */ pthread_mutex_lock(&consumer_data.lock); @@ -767,7 +867,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); @@ -775,7 +875,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - local_stream = malloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { perror("local_stream malloc"); @@ -839,16 +939,35 @@ void *lttng_consumer_thread_poll_fds(void *data) } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } else if ((pollfd[i].revents & POLLHUP) && !(pollfd[i].revents & POLLIN)) { - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - consumer_del_stream(local_stream[i]); + if (consumer_data.type == LTTNG_CONSUMER32_UST + || consumer_data.type == LTTNG_CONSUMER64_UST) { + DBG("Polling fd %d tells it has hung up. Attempting flush and read.", + pollfd[i].fd); + if (!local_stream[i]->hangup_flush_done) { + lttng_ustconsumer_on_stream_hangup(local_stream[i]); + /* read after flush */ + do { + ret = ctx->on_buffer_ready(local_stream[i], ctx); + } while (ret == EAGAIN); + } + } else { + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); + } + rcu_read_lock(); + consumer_del_stream_rcu(&local_stream[i]->node.head); + rcu_read_unlock(); num_hup++; } } @@ -1007,7 +1126,8 @@ int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_read_subbuffer(stream, ctx); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_read_subbuffer(stream, ctx); default: ERR("Unknown consumer_data type"); @@ -1021,7 +1141,8 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: return lttng_kconsumer_on_recv_stream(stream); - case LTTNG_CONSUMER_UST: + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: return lttng_ustconsumer_on_recv_stream(stream); default: ERR("Unknown consumer_data type"); @@ -1029,3 +1150,13 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) return -ENOSYS; } } + +/* + * Allocate and set consumer data hash tables. + */ +void lttng_consumer_init(void) +{ + consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); +} +