X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=liblttng-consumer%2Flttng-consumer.c;h=0d8dd00143fb6d75b52b0e53c81088e853abaccc;hb=8da9ba32e8e815bf83113f8785c3f51e74593ad9;hp=54338e800619a728f70ca44673c6f346a0ebecc0;hpb=d056b47720cf547dd8c4ca59076ffcd215d58f2c;p=lttng-tools.git diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index 54338e800..0d8dd0014 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -63,6 +63,9 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) { struct lttng_consumer_stream *iter; + /* Negative keys are lookup failures */ + if (key < 0) + return NULL; cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { if (iter->key == key) { DBG("Found stream key %d", key); @@ -72,10 +75,22 @@ static struct lttng_consumer_stream *consumer_find_stream(int key) return NULL; } +static void consumer_steal_stream_key(int key) +{ + struct lttng_consumer_stream *stream; + + stream = consumer_find_stream(key); + if (stream) + stream->key = -1; +} + static struct lttng_consumer_channel *consumer_find_channel(int key) { struct lttng_consumer_channel *iter; + /* Negative keys are lookup failures */ + if (key < 0) + return NULL; cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) { if (iter->key == key) { DBG("Found channel key %d", key); @@ -85,6 +100,15 @@ static struct lttng_consumer_channel *consumer_find_channel(int key) return NULL; } +static void consumer_steal_channel_key(int key) +{ + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (channel) + channel->key = -1; +} + /* * Remove a stream from the global list protected by a mutex. This * function is also responsible for freeing its data structures. @@ -154,7 +178,7 @@ struct lttng_consumer_stream *consumer_allocate_stream( struct lttng_consumer_stream *stream; int ret; - stream = malloc(sizeof(*stream)); + stream = zmalloc(sizeof(*stream)); if (stream == NULL) { perror("malloc struct lttng_consumer_stream"); goto end; @@ -211,11 +235,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) int ret = 0; pthread_mutex_lock(&consumer_data.lock); - /* Check if already exist */ - if (consumer_find_stream(stream->key)) { - ret = -1; - goto end; - } + /* Steal stream identifier, for UST */ + consumer_steal_stream_key(stream->key); cds_list_add(&stream->list, &consumer_data.stream_list.head); consumer_data.stream_count++; consumer_data.need_update = 1; @@ -305,7 +326,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( struct lttng_consumer_channel *channel; int ret; - channel = malloc(sizeof(*channel)); + channel = zmalloc(sizeof(*channel)); if (channel == NULL) { perror("malloc struct lttng_consumer_channel"); goto end; @@ -350,18 +371,12 @@ end: */ int consumer_add_channel(struct lttng_consumer_channel *channel) { - int ret = 0; - pthread_mutex_lock(&consumer_data.lock); - /* Check if already exist */ - if (consumer_find_channel(channel->key)) { - ret = -1; - goto end; - } + /* Steal channel identifier, for UST */ + consumer_steal_channel_key(channel->key); cds_list_add(&channel->list, &consumer_data.channel_list.head); -end: pthread_mutex_unlock(&consumer_data.lock); - return ret; + return 0; } /* @@ -559,7 +574,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( consumer_data.type == type); consumer_data.type = type; - ctx = malloc(sizeof(struct lttng_consumer_local_data)); + ctx = zmalloc(sizeof(struct lttng_consumer_local_data)); if (ctx == NULL) { perror("allocating context"); goto error; @@ -745,7 +760,7 @@ void *lttng_consumer_thread_poll_fds(void *data) int tmp2; struct lttng_consumer_local_data *ctx = data; - local_stream = malloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); while (1) { high_prio = 0; @@ -767,7 +782,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); @@ -775,7 +790,7 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* allocate for all fds + 1 for the consumer_poll_pipe */ - local_stream = malloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { perror("local_stream malloc"); @@ -852,12 +867,10 @@ void *lttng_consumer_thread_poll_fds(void *data) pollfd[i].fd); if (!local_stream[i]->hangup_flush_done) { lttng_ustconsumer_on_stream_hangup(local_stream[i]); - /* try reading after flush */ - ret = ctx->on_buffer_ready(local_stream[i], ctx); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; - } + /* read after flush */ + do { + ret = ctx->on_buffer_ready(local_stream[i], ctx); + } while (ret == EAGAIN); } } else { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);