Fix trace UST destroy channel
[lttng-tools.git] / liblttng-consumer / lttng-consumer.c
index de15dd3b72379276cd0d934e79f2435ef67dbb8a..617282d9c38b3690f5969f883690c7922c478b2b 100644 (file)
@@ -37,8 +37,6 @@
 #include <lttngerr.h>
 
 struct lttng_consumer_global_data consumer_data = {
-       .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
-       .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
        .stream_count = 0,
        .need_update = 1,
        .type = LTTNG_CONSUMER_UNKNOWN,
@@ -61,28 +59,68 @@ volatile int consumer_quit = 0;
  */
 static struct lttng_consumer_stream *consumer_find_stream(int key)
 {
-       struct lttng_consumer_stream *iter;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_consumer_stream *stream = NULL;
 
-       cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
-               if (iter->key == key) {
-                       DBG("Found stream key %d", key);
-                       return iter;
-               }
+       /* Negative keys are lookup failures */
+       if (key < 0)
+               return NULL;
+
+       rcu_read_lock();
+
+       lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               stream = caa_container_of(node, struct lttng_consumer_stream, node);
        }
-       return NULL;
+
+       rcu_read_unlock();
+
+       return stream;
+}
+
+static void consumer_steal_stream_key(int key)
+{
+       struct lttng_consumer_stream *stream;
+
+       stream = consumer_find_stream(key);
+       if (stream)
+               stream->key = -1;
 }
 
 static struct lttng_consumer_channel *consumer_find_channel(int key)
 {
-       struct lttng_consumer_channel *iter;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_consumer_channel *channel = NULL;
 
-       cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
-               if (iter->key == key) {
-                       DBG("Found channel key %d", key);
-                       return iter;
-               }
+       /* Negative keys are lookup failures */
+       if (key < 0)
+               return NULL;
+
+       rcu_read_lock();
+
+       lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
-       return NULL;
+
+       rcu_read_unlock();
+
+       return channel;
+}
+
+static void consumer_steal_channel_key(int key)
+{
+       struct lttng_consumer_channel *channel;
+
+       channel = consumer_find_channel(key);
+       if (channel)
+               channel->key = -1;
 }
 
 /*
@@ -92,6 +130,7 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
 void consumer_del_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
+       struct lttng_ht_iter iter;
        struct lttng_consumer_channel *free_chan = NULL;
 
        pthread_mutex_lock(&consumer_data.lock);
@@ -105,7 +144,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                        }
                }
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -114,7 +154,17 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                goto end;
        }
 
-       cds_list_del(&stream->list);
+       rcu_read_lock();
+
+       /* Get stream node from hash table */
+       lttng_ht_lookup(consumer_data.stream_ht,
+                       (void *)((unsigned long) stream->key), &iter);
+       /* Remove stream node from hash table */
+       ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+       assert(!ret);
+
+       rcu_read_unlock();
+
        if (consumer_data.stream_count <= 0) {
                goto end;
        }
@@ -143,18 +193,30 @@ end:
                consumer_del_channel(free_chan);
 }
 
+static void consumer_del_stream_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_consumer_stream *stream =
+               caa_container_of(node, struct lttng_consumer_stream, node);
+
+       consumer_del_stream(stream);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(
                int channel_key, int stream_key,
                int shm_fd, int wait_fd,
                enum lttng_consumer_stream_state state,
                uint64_t mmap_len,
                enum lttng_event_output output,
-               const char *path_name)
+               const char *path_name,
+               uid_t uid,
+               gid_t gid)
 {
        struct lttng_consumer_stream *stream;
        int ret;
 
-       stream = malloc(sizeof(*stream));
+       stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
                perror("malloc struct lttng_consumer_stream");
                goto end;
@@ -174,13 +236,17 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        stream->mmap_len = mmap_len;
        stream->mmap_base = NULL;
        stream->output = output;
+       stream->uid = uid;
+       stream->gid = gid;
        strncpy(stream->path_name, path_name, PATH_MAX - 1);
        stream->path_name[PATH_MAX - 1] = '\0';
+       lttng_ht_node_init_ulong(&stream->node, stream->key);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                stream->cpu = stream->chan->cpucount++;
                ret = lttng_ustconsumer_allocate_stream(stream);
                if (ret) {
@@ -211,19 +277,19 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        int ret = 0;
 
        pthread_mutex_lock(&consumer_data.lock);
-       /* Check if already exist */
-       if (consumer_find_stream(stream->key)) {
-               ret = -1;
-               goto end;
-       }
-       cds_list_add(&stream->list, &consumer_data.stream_list.head);
+       /* Steal stream identifier, for UST */
+       consumer_steal_stream_key(stream->key);
+       rcu_read_lock();
+       lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+       rcu_read_unlock();
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                /* Streams are in CPU number order (we rely on this) */
                stream->cpu = stream->chan->nr_streams++;
                break;
@@ -262,13 +328,15 @@ void consumer_change_stream_state(int stream_key,
 void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
+       struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                lttng_ustconsumer_del_channel(channel);
                break;
        default:
@@ -277,7 +345,15 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
-       cds_list_del(&channel->list);
+       rcu_read_lock();
+
+       lttng_ht_lookup(consumer_data.channel_ht,
+                       (void *)((unsigned long) channel->key), &iter);
+       ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+       assert(!ret);
+
+       rcu_read_unlock();
+
        if (channel->mmap_base != NULL) {
                ret = munmap(channel->mmap_base, channel->mmap_len);
                if (ret != 0) {
@@ -296,6 +372,16 @@ end:
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
+static void consumer_del_channel_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_consumer_channel *channel=
+               caa_container_of(node, struct lttng_consumer_channel, node);
+
+       consumer_del_channel(channel);
+}
+
 struct lttng_consumer_channel *consumer_allocate_channel(
                int channel_key,
                int shm_fd, int wait_fd,
@@ -305,7 +391,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        struct lttng_consumer_channel *channel;
        int ret;
 
-       channel = malloc(sizeof(*channel));
+       channel = zmalloc(sizeof(*channel));
        if (channel == NULL) {
                perror("malloc struct lttng_consumer_channel");
                goto end;
@@ -317,13 +403,15 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        channel->max_sb_size = max_sb_size;
        channel->refcount = 0;
        channel->nr_streams = 0;
+       lttng_ht_node_init_ulong(&channel->node, channel->key);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                channel->mmap_base = NULL;
                channel->mmap_len = 0;
                break;
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                ret = lttng_ustconsumer_allocate_channel(channel);
                if (ret) {
                        free(channel);
@@ -350,18 +438,14 @@ end:
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel)
 {
-       int ret = 0;
-
        pthread_mutex_lock(&consumer_data.lock);
-       /* Check if already exist */
-       if (consumer_find_channel(channel->key)) {
-               ret = -1;
-               goto end;
-       }
-       cds_list_add(&channel->list, &consumer_data.channel_list.head);
-end:
+       /* Steal channel identifier, for UST */
+       consumer_steal_channel_key(channel->key);
+       rcu_read_lock();
+       lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+       rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
-       return ret;
+       return 0;
 }
 
 /*
@@ -375,18 +459,20 @@ int consumer_update_poll_array(
                struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
                struct lttng_consumer_stream **local_stream)
 {
-       struct lttng_consumer_stream *iter;
        int i = 0;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
 
        DBG("Updating poll fd array");
-       cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
-               if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
+                       node.node) {
+               if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
                        continue;
                }
-               DBG("Active FD %d", iter->wait_fd);
-               (*pollfd)[i].fd = iter->wait_fd;
+               DBG("Active FD %d", stream->wait_fd);
+               (*pollfd)[i].fd = stream->wait_fd;
                (*pollfd)[i].events = POLLIN | POLLPRI;
-               local_stream[i] = iter;
+               local_stream[i] = stream;
                i++;
        }
 
@@ -462,22 +548,31 @@ int lttng_consumer_send_error(
  */
 void lttng_consumer_cleanup(void)
 {
-       struct lttng_consumer_stream *iter, *tmp;
-       struct lttng_consumer_channel *citer, *ctmp;
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+
+       rcu_read_lock();
 
        /*
-        * close all outfd. Called when there are no more threads
-        * running (after joining on the threads), no need to protect
-        * list iteration with mutex.
+        * close all outfd. Called when there are no more threads running (after
+        * joining on the threads), no need to protect list iteration with mutex.
         */
-       cds_list_for_each_entry_safe(iter, tmp,
-                       &consumer_data.stream_list.head, list) {
-               consumer_del_stream(iter);
+       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
+                       node) {
+               ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+               assert(!ret);
+               call_rcu(&node->head, consumer_del_stream_rcu);
        }
-       cds_list_for_each_entry_safe(citer, ctmp,
-                       &consumer_data.channel_list.head, list) {
-               consumer_del_channel(citer);
+
+       cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
+                       node) {
+               ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+               assert(!ret);
+               call_rcu(&node->head, consumer_del_channel_rcu);
        }
+
+       rcu_read_unlock();
 }
 
 /*
@@ -559,7 +654,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                consumer_data.type == type);
        consumer_data.type = type;
 
-       ctx = malloc(sizeof(struct lttng_consumer_local_data));
+       ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
        if (ctx == NULL) {
                perror("allocating context");
                goto error;
@@ -641,7 +736,8 @@ int lttng_consumer_on_read_subbuffer_mmap(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_on_read_subbuffer_mmap(ctx, stream, len);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_on_read_subbuffer_mmap(ctx, stream, len);
        default:
                ERR("Unknown consumer_data type");
@@ -661,7 +757,8 @@ int lttng_consumer_on_read_subbuffer_splice(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_on_read_subbuffer_splice(ctx, stream, len);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return -ENOSYS;
        default:
                ERR("Unknown consumer_data type");
@@ -682,7 +779,8 @@ int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_take_snapshot(ctx, stream);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_take_snapshot(ctx, stream);
        default:
                ERR("Unknown consumer_data type");
@@ -705,7 +803,8 @@ int lttng_consumer_get_produced_snapshot(
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
        default:
                ERR("Unknown consumer_data type");
@@ -720,7 +819,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
        default:
                ERR("Unknown consumer_data type");
@@ -730,7 +830,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 }
 
 /*
- * This thread polls the fds in the ltt_fd_list to consume the data and write
+ * This thread polls the fds in the set to consume the data and write
  * it to tracefile if necessary.
  */
 void *lttng_consumer_thread_poll_fds(void *data)
@@ -745,14 +845,14 @@ void *lttng_consumer_thread_poll_fds(void *data)
        int tmp2;
        struct lttng_consumer_local_data *ctx = data;
 
-       local_stream = malloc(sizeof(struct lttng_consumer_stream));
+       local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
 
        while (1) {
                high_prio = 0;
                num_hup = 0;
 
                /*
-                * the ltt_fd_list has been updated, we need to update our
+                * the fds set has been updated, we need to update our
                 * local array as well
                 */
                pthread_mutex_lock(&consumer_data.lock);
@@ -767,7 +867,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
-                       pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+                       pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                perror("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -775,7 +875,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        }
 
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
-                       local_stream = malloc((consumer_data.stream_count + 1) *
+                       local_stream = zmalloc((consumer_data.stream_count + 1) *
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
                                perror("local_stream malloc");
@@ -839,16 +939,35 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
+                               rcu_read_lock();
+                               consumer_del_stream_rcu(&local_stream[i]->node.head);
+                               rcu_read_unlock();
                                num_hup++;
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
+                               rcu_read_lock();
+                               consumer_del_stream_rcu(&local_stream[i]->node.head);
+                               rcu_read_unlock();
                                num_hup++;
                        } else if ((pollfd[i].revents & POLLHUP) &&
                                        !(pollfd[i].revents & POLLIN)) {
-                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
+                               if (consumer_data.type == LTTNG_CONSUMER32_UST
+                                               || consumer_data.type == LTTNG_CONSUMER64_UST) {
+                                       DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+                                               pollfd[i].fd);
+                                       if (!local_stream[i]->hangup_flush_done) {
+                                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+                                               /* read after flush */
+                                               do {
+                                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                                               } while (ret == EAGAIN);
+                                       }
+                               } else {
+                                       DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               }
+                               rcu_read_lock();
+                               consumer_del_stream_rcu(&local_stream[i]->node.head);
+                               rcu_read_unlock();
                                num_hup++;
                        }
                }
@@ -1007,7 +1126,8 @@ int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_read_subbuffer(stream, ctx);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_read_subbuffer(stream, ctx);
        default:
                ERR("Unknown consumer_data type");
@@ -1021,7 +1141,8 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                return lttng_kconsumer_on_recv_stream(stream);
-       case LTTNG_CONSUMER_UST:
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
                return lttng_ustconsumer_on_recv_stream(stream);
        default:
                ERR("Unknown consumer_data type");
@@ -1029,3 +1150,13 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
                return -ENOSYS;
        }
 }
+
+/*
+ * Allocate and set consumer data hash tables.
+ */
+void lttng_consumer_init(void)
+{
+       consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+}
+
This page took 0.030121 seconds and 4 git commands to generate.