Fix flush buffer after wait quiescent
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
index c7c7b7a7e4db220f8cbb512cb976746213e24dbe..893df720882b7afd6798e789ca7fd90c967836fd 100644 (file)
@@ -63,6 +63,9 @@ static struct lttng_consumer_stream *consumer_find_stream(int key)
 {
        struct lttng_consumer_stream *iter;
 
+       /* Negative keys are lookup failures */
+       if (key < 0)
+               return NULL;
        cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
                if (iter->key == key) {
                        DBG("Found stream key %d", key);
@@ -72,10 +75,22 @@ static struct lttng_consumer_stream *consumer_find_stream(int key)
        return NULL;
 }
 
+static void consumer_steal_stream_key(int key)
+{
+       struct lttng_consumer_stream *stream;
+
+       stream = consumer_find_stream(key);
+       if (stream)
+               stream->key = -1;
+}
+
 static struct lttng_consumer_channel *consumer_find_channel(int key)
 {
        struct lttng_consumer_channel *iter;
 
+       /* Negative keys are lookup failures */
+       if (key < 0)
+               return NULL;
        cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
                if (iter->key == key) {
                        DBG("Found channel key %d", key);
@@ -85,6 +100,15 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
        return NULL;
 }
 
+static void consumer_steal_channel_key(int key)
+{
+       struct lttng_consumer_channel *channel;
+
+       channel = consumer_find_channel(key);
+       if (channel)
+               channel->key = -1;
+}
+
 /*
  * Remove a stream from the global list protected by a mutex. This
  * function is also responsible for freeing its data structures.
@@ -105,7 +129,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                        }
                }
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -154,7 +179,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        struct lttng_consumer_stream *stream;
        int ret;
 
-       stream = malloc(sizeof(*stream));
+       stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
                perror("malloc struct lttng_consumer_stream");
                goto end;
@@ -180,7 +205,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                stream->cpu = stream->chan->cpucount++;
                ret = lttng_ustconsumer_allocate_stream(stream);
                if (ret) {
@@ -211,11 +237,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        int ret = 0;
 
        pthread_mutex_lock(&consumer_data.lock);
-       /* Check if already exist */
-       if (consumer_find_stream(stream->key)) {
-               ret = -1;
-               goto end;
-       }
+       /* Steal stream identifier, for UST */
+       consumer_steal_stream_key(stream->key);
        cds_list_add(&stream->list, &consumer_data.stream_list.head);
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
@@ -223,7 +246,8 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                /* Streams are in CPU number order (we rely on this) */
                stream->cpu = stream->chan->nr_streams++;
                break;
@@ -268,7 +292,8 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                lttng_ustconsumer_del_channel(channel);
                break;
        default:
@@ -323,7 +348,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(
                channel->mmap_base = NULL;
                channel->mmap_len = 0;
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                ret = lttng_ustconsumer_allocate_channel(channel);
                if (ret) {
                        free(channel);
@@ -350,18 +376,12 @@ end:
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel)
 {
-       int ret = 0;
-
        pthread_mutex_lock(&consumer_data.lock);
-       /* Check if already exist */
-       if (consumer_find_channel(channel->key)) {
-               ret = -1;
-               goto end;
-       }
+       /* Steal channel identifier, for UST */
+       consumer_steal_channel_key(channel->key);
        cds_list_add(&channel->list, &consumer_data.channel_list.head);
-end:
        pthread_mutex_unlock(&consumer_data.lock);
-       return ret;
+       return 0;
 }
 
 /*
@@ -559,7 +579,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                consumer_data.type == type);
        consumer_data.type = type;
 
-       ctx = malloc(sizeof(struct lttng_consumer_local_data));
+       ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
        if (ctx == NULL) {
                perror("allocating context");
                goto error;
@@ -641,7 +661,8 @@ int lttng_consumer_on_read_subbuffer_mmap(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
        default:
                ERR("Unknown consumer_data type");
@@ -661,7 +682,8 @@ int lttng_consumer_on_read_subbuffer_splice(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return -ENOSYS;
        default:
                ERR("Unknown consumer_data type");
@@ -682,7 +704,8 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_take_snapshot(ctx, stream);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_take_snapshot(ctx, stream);
        default:
                ERR("Unknown consumer_data type");
@@ -705,7 +728,8 @@ int lttng_consumer_get_produced_snapshot(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
        default:
                ERR("Unknown consumer_data type");
@@ -720,7 +744,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
        default:
                ERR("Unknown consumer_data type");
@@ -745,7 +770,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
        int tmp2;
        struct lttng_consumer_local_data *ctx = data;
 
-       local_stream = malloc(sizeof(struct lttng_consumer_stream));
+       local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
 
        while (1) {
                high_prio = 0;
@@ -767,7 +792,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
-                       pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                perror("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -775,7 +800,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
-                       local_stream = malloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 1) *
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
                                perror("local_stream malloc");
@@ -847,17 +872,16 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                num_hup++;
                        } else if ((pollfd[i].revents & POLLHUP) &&
                                        !(pollfd[i].revents & POLLIN)) {
-                               if (consumer_data.type == LTTNG_CONSUMER_UST) {
+                               if (consumer_data.type == LTTNG_CONSUMER32_UST
+                                               || consumer_data.type == LTTNG_CONSUMER64_UST) {
                                        DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
                                                pollfd[i].fd);
                                        if (!local_stream[i]->hangup_flush_done) {
                                                lttng_ustconsumer_on_stream_hangup(local_stream[i]);
-                                               /* try reading after flush */
-                                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
-                                               /* it's ok to have an unavailable sub-buffer */
-                                               if (ret == EAGAIN) {
-                                                       ret = 0;
-                                               }
+                                               /* read after flush */
+                                               do {
+                                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                                               } while (ret == EAGAIN);
                                        }
                                } else {
                                        DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
@@ -1021,7 +1045,8 @@ int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_read_subbuffer(stream, ctx);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_read_subbuffer(stream, ctx);
        default:
                ERR("Unknown consumer_data type");
@@ -1035,7 +1060,8 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_on_recv_stream(stream);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_on_recv_stream(stream);
        default:
                ERR("Unknown consumer_data type");
This page took 0.026193 seconds and 4 git commands to generate.