Add lttng hash table support to liblttng-consumer
authorDavid Goulet <dgoulet@efficios.com>
Wed, 11 Jan 2012 17:03:07 +0000 (12:03 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Wed, 11 Jan 2012 17:03:07 +0000 (12:03 -0500)
Remove linked list usage from liblttng-consumer and replace them by
lockless RCU hash tables.

Note that there is still a mutex lock protecting those hash tables and
no RCU lock mechanism used. For now, it's OK and a very small
performance hit.

Signed-off-by: David Goulet <dgoulet@efficios.com>
include/lttng/lttng-consumer.h
liblttng-consumer/Makefile.am
liblttng-consumer/lttng-consumer.c
lttng-consumerd/lttng-consumerd.c

index 81fd83e0ff4845574f4ddf3762129a2935bf7138..bba72ee69ec39f82a4b56cf69c65e84c7939b4ab 100644 (file)
@@ -23,8 +23,9 @@
 #include <limits.h>
 #include <poll.h>
 #include <unistd.h>
 #include <limits.h>
 #include <poll.h>
 #include <unistd.h>
-#include <urcu/list.h>
+
 #include <lttng/lttng.h>
 #include <lttng/lttng.h>
+#include <lttng-ht.h>
 
 /*
  * When the receiving thread dies, we need to have a way to make the polling
 
 /*
  * When the receiving thread dies, we need to have a way to make the polling
@@ -58,14 +59,6 @@ enum lttng_consumer_stream_state {
        LTTNG_CONSUMER_DELETE_STREAM,
 };
 
        LTTNG_CONSUMER_DELETE_STREAM,
 };
 
-struct lttng_consumer_channel_list {
-       struct cds_list_head head;
-};
-
-struct lttng_consumer_stream_list {
-       struct cds_list_head head;
-};
-
 enum lttng_consumer_type {
        LTTNG_CONSUMER_UNKNOWN = 0,
        LTTNG_CONSUMER_KERNEL,
 enum lttng_consumer_type {
        LTTNG_CONSUMER_UNKNOWN = 0,
        LTTNG_CONSUMER_KERNEL,
@@ -74,7 +67,7 @@ enum lttng_consumer_type {
 };
 
 struct lttng_consumer_channel {
 };
 
 struct lttng_consumer_channel {
-       struct cds_list_head list;
+       struct lttng_ht_node_ulong node;
        int key;
        uint64_t max_sb_size; /* the subbuffer size for this channel */
        int refcount; /* Number of streams referencing this channel */
        int key;
        uint64_t max_sb_size; /* the subbuffer size for this channel */
        int refcount; /* Number of streams referencing this channel */
@@ -98,7 +91,7 @@ struct lttng_ust_lib_ring_buffer;
  * uniquely a stream.
  */
 struct lttng_consumer_stream {
  * uniquely a stream.
  */
 struct lttng_consumer_stream {
-       struct cds_list_head list;
+       struct lttng_ht_node_ulong node;
        struct lttng_consumer_channel *chan;    /* associated channel */
        /*
         * key is the key used by the session daemon to refer to the
        struct lttng_consumer_channel *chan;    /* associated channel */
        /*
         * key is the key used by the session daemon to refer to the
@@ -187,26 +180,25 @@ struct lttng_consumer_local_data {
  * Library-level data. One instance per process.
  */
 struct lttng_consumer_global_data {
  * Library-level data. One instance per process.
  */
 struct lttng_consumer_global_data {
+
        /*
        /*
-        * consumer_data.lock protects consumer_data.fd_list,
-        * consumer_data.stream_count, and consumer_data.need_update. It
-        * ensures the count matches the number of items in the fd_list.
-        * It ensures the list updates *always* trigger an fd_array
-        * update (therefore need to make list update vs
-        * consumer_data.need_update flag update atomic, and also flag
-        * read, fd array and flag clear atomic).
+        * At this time, this lock is used to ensure coherence between the count
+        * and number of element in the hash table. It's also a protection for
+        * concurrent read/write between threads. Although hash table used are
+        * lockless data structure, appropriate RCU lock mechanism are not yet
+        * implemented in the consumer.
         */
        pthread_mutex_t lock;
         */
        pthread_mutex_t lock;
+
        /*
        /*
-        * Number of streams in the list below. Protected by
-        * consumer_data.lock.
+        * Number of streams in the hash table. Protected by consumer_data.lock.
         */
        int stream_count;
        /*
         */
        int stream_count;
        /*
-        * Lists of streams and channels. Protected by consumer_data.lock.
+        * Hash tables of streams and channels. Protected by consumer_data.lock.
         */
         */
-       struct lttng_consumer_stream_list stream_list;
-       struct lttng_consumer_channel_list channel_list;
+       struct lttng_ht *stream_ht;
+       struct lttng_ht *channel_ht;
        /*
         * Flag specifying if the local array of FDs needs update in the
         * poll function. Protected by consumer_data.lock.
        /*
         * Flag specifying if the local array of FDs needs update in the
         * poll function. Protected by consumer_data.lock.
@@ -215,6 +207,11 @@ struct lttng_consumer_global_data {
        enum lttng_consumer_type type;
 };
 
        enum lttng_consumer_type type;
 };
 
+/*
+ * Init consumer data structures.
+ */
+extern void lttng_consumer_init(void);
+
 /*
  * Set the error socket for communication with a session daemon.
  */
 /*
  * Set the error socket for communication with a session daemon.
  */
index 80f5843dad5f45490969bba43646f87fbd9020d8..aadba52b5067dbbaed915671ec755ef4b712bfa2 100644 (file)
@@ -6,8 +6,8 @@ liblttng_consumer_la_SOURCES = lttng-consumer.c
 
 liblttng_consumer_la_LIBADD = \
                $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \
 
 liblttng_consumer_la_LIBADD = \
                $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \
-               $(top_builddir)/liblttng-kconsumer/liblttng-kconsumer.la
-
+               $(top_builddir)/liblttng-kconsumer/liblttng-kconsumer.la \
+               $(top_builddir)/liblttng-ht/liblttng-ht.la
 
 if HAVE_LIBLTTNG_UST_CTL
 liblttng_consumer_la_LIBADD += \
 
 if HAVE_LIBLTTNG_UST_CTL
 liblttng_consumer_la_LIBADD += \
index 0811e68ca8e8368c7695f92f37a44c906b42b7c0..f4af47404c6053af72f90107a5ea6acf9c3650c2 100644 (file)
@@ -37,8 +37,6 @@
 #include <lttngerr.h>
 
 struct lttng_consumer_global_data consumer_data = {
 #include <lttngerr.h>
 
 struct lttng_consumer_global_data consumer_data = {
-       .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head),
-       .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head),
        .stream_count = 0,
        .need_update = 1,
        .type = LTTNG_CONSUMER_UNKNOWN,
        .stream_count = 0,
        .need_update = 1,
        .type = LTTNG_CONSUMER_UNKNOWN,
@@ -61,18 +59,22 @@ volatile int consumer_quit = 0;
  */
 static struct lttng_consumer_stream *consumer_find_stream(int key)
 {
  */
 static struct lttng_consumer_stream *consumer_find_stream(int key)
 {
-       struct lttng_consumer_stream *iter;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_consumer_stream *stream = NULL;
 
        /* Negative keys are lookup failures */
        if (key < 0)
                return NULL;
 
        /* Negative keys are lookup failures */
        if (key < 0)
                return NULL;
-       cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
-               if (iter->key == key) {
-                       DBG("Found stream key %d", key);
-                       return iter;
-               }
+
+       lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               stream = caa_container_of(node, struct lttng_consumer_stream, node);
        }
        }
-       return NULL;
+
+       return stream;
 }
 
 static void consumer_steal_stream_key(int key)
 }
 
 static void consumer_steal_stream_key(int key)
@@ -86,18 +88,22 @@ static void consumer_steal_stream_key(int key)
 
 static struct lttng_consumer_channel *consumer_find_channel(int key)
 {
 
 static struct lttng_consumer_channel *consumer_find_channel(int key)
 {
-       struct lttng_consumer_channel *iter;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_consumer_channel *channel = NULL;
 
        /* Negative keys are lookup failures */
        if (key < 0)
                return NULL;
 
        /* Negative keys are lookup failures */
        if (key < 0)
                return NULL;
-       cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
-               if (iter->key == key) {
-                       DBG("Found channel key %d", key);
-                       return iter;
-               }
+
+       lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
        }
-       return NULL;
+
+       return channel;
 }
 
 static void consumer_steal_channel_key(int key)
 }
 
 static void consumer_steal_channel_key(int key)
@@ -116,6 +122,7 @@ static void consumer_steal_channel_key(int key)
 void consumer_del_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 void consumer_del_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
+       struct lttng_ht_iter iter;
        struct lttng_consumer_channel *free_chan = NULL;
 
        pthread_mutex_lock(&consumer_data.lock);
        struct lttng_consumer_channel *free_chan = NULL;
 
        pthread_mutex_lock(&consumer_data.lock);
@@ -139,7 +146,13 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                goto end;
        }
 
                goto end;
        }
 
-       cds_list_del(&stream->list);
+       /* Get stream node from hash table */
+       lttng_ht_lookup(consumer_data.stream_ht,
+                       (void *)((unsigned long) stream->key), &iter);
+       /* Remove stream node from hash table */
+       ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+       assert(!ret);
+
        if (consumer_data.stream_count <= 0) {
                goto end;
        }
        if (consumer_data.stream_count <= 0) {
                goto end;
        }
@@ -205,6 +218,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        stream->gid = gid;
        strncpy(stream->path_name, path_name, PATH_MAX - 1);
        stream->path_name[PATH_MAX - 1] = '\0';
        stream->gid = gid;
        strncpy(stream->path_name, path_name, PATH_MAX - 1);
        stream->path_name[PATH_MAX - 1] = '\0';
+       lttng_ht_node_init_ulong(&stream->node, stream->key);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -243,7 +257,7 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
        consumer_steal_stream_key(stream->key);
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
        consumer_steal_stream_key(stream->key);
-       cds_list_add(&stream->list, &consumer_data.stream_list.head);
+       lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
@@ -290,6 +304,7 @@ void consumer_change_stream_state(int stream_key,
 void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
 void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
+       struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
 
 
        pthread_mutex_lock(&consumer_data.lock);
 
@@ -306,7 +321,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
                goto end;
        }
 
-       cds_list_del(&channel->list);
+       lttng_ht_lookup(consumer_data.channel_ht,
+                       (void *)((unsigned long) channel->key), &iter);
+       ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+       assert(!ret);
+
        if (channel->mmap_base != NULL) {
                ret = munmap(channel->mmap_base, channel->mmap_len);
                if (ret != 0) {
        if (channel->mmap_base != NULL) {
                ret = munmap(channel->mmap_base, channel->mmap_len);
                if (ret != 0) {
@@ -346,6 +365,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        channel->max_sb_size = max_sb_size;
        channel->refcount = 0;
        channel->nr_streams = 0;
        channel->max_sb_size = max_sb_size;
        channel->refcount = 0;
        channel->nr_streams = 0;
+       lttng_ht_node_init_ulong(&channel->node, channel->key);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -383,7 +403,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel)
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal channel identifier, for UST */
        consumer_steal_channel_key(channel->key);
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal channel identifier, for UST */
        consumer_steal_channel_key(channel->key);
-       cds_list_add(&channel->list, &consumer_data.channel_list.head);
+       lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
        pthread_mutex_unlock(&consumer_data.lock);
        return 0;
 }
        pthread_mutex_unlock(&consumer_data.lock);
        return 0;
 }
@@ -399,18 +419,20 @@ int consumer_update_poll_array(
                struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
                struct lttng_consumer_stream **local_stream)
 {
                struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
                struct lttng_consumer_stream **local_stream)
 {
-       struct lttng_consumer_stream *iter;
        int i = 0;
        int i = 0;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
 
        DBG("Updating poll fd array");
 
        DBG("Updating poll fd array");
-       cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
-               if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
+                       node.node) {
+               if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
                        continue;
                }
                        continue;
                }
-               DBG("Active FD %d", iter->wait_fd);
-               (*pollfd)[i].fd = iter->wait_fd;
+               DBG("Active FD %d", stream->wait_fd);
+               (*pollfd)[i].fd = stream->wait_fd;
                (*pollfd)[i].events = POLLIN | POLLPRI;
                (*pollfd)[i].events = POLLIN | POLLPRI;
-               local_stream[i] = iter;
+               local_stream[i] = stream;
                i++;
        }
 
                i++;
        }
 
@@ -486,21 +508,28 @@ int lttng_consumer_send_error(
  */
 void lttng_consumer_cleanup(void)
 {
  */
 void lttng_consumer_cleanup(void)
 {
-       struct lttng_consumer_stream *iter, *tmp;
-       struct lttng_consumer_channel *citer, *ctmp;
+       int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream;
+       struct lttng_consumer_channel *channel;
 
        /*
         * close all outfd. Called when there are no more threads
         * running (after joining on the threads), no need to protect
         * list iteration with mutex.
         */
 
        /*
         * close all outfd. Called when there are no more threads
         * running (after joining on the threads), no need to protect
         * list iteration with mutex.
         */
-       cds_list_for_each_entry_safe(iter, tmp,
-                       &consumer_data.stream_list.head, list) {
-               consumer_del_stream(iter);
+       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
+                       node.node) {
+               ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+               assert(!ret);
+               consumer_del_stream(stream);
        }
        }
-       cds_list_for_each_entry_safe(citer, ctmp,
-                       &consumer_data.channel_list.head, list) {
-               consumer_del_channel(citer);
+
+       cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+                       node.node) {
+               ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+               assert(!ret);
+               consumer_del_channel(channel);
        }
 }
 
        }
 }
 
@@ -759,7 +788,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 }
 
 /*
 }
 
 /*
- * This thread polls the fds in the ltt_fd_list to consume the data and write
+ * This thread polls the fds in the set to consume the data and write
  * it to tracefile if necessary.
  */
 void *lttng_consumer_thread_poll_fds(void *data)
  * it to tracefile if necessary.
  */
 void *lttng_consumer_thread_poll_fds(void *data)
@@ -781,7 +810,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                num_hup = 0;
 
                /*
                num_hup = 0;
 
                /*
-                * the ltt_fd_list has been updated, we need to update our
+                * the fds set has been updated, we need to update our
                 * local array as well
                 */
                pthread_mutex_lock(&consumer_data.lock);
                 * local array as well
                 */
                pthread_mutex_lock(&consumer_data.lock);
@@ -1073,3 +1102,13 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
                return -ENOSYS;
        }
 }
                return -ENOSYS;
        }
 }
+
+/*
+ * Allocate and set consumer data hash tables.
+ */
+void lttng_consumer_init(void)
+{
+       consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+}
+
index cf515a9df7cae5090f4f2537b1fda39342fcfffc..e8e511702d1fefb755239d890d61d3b35975c070 100644 (file)
@@ -267,6 +267,10 @@ int main(int argc, char **argv)
                        goto error;
                }
        }
                        goto error;
                }
        }
+
+       /* Init */
+       lttng_consumer_init();
+
        /* create the consumer instance with and assign the callbacks */
        ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer,
                NULL, lttng_consumer_on_recv_stream, NULL);
        /* create the consumer instance with and assign the callbacks */
        ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer,
                NULL, lttng_consumer_on_recv_stream, NULL);
This page took 0.030429 seconds and 4 git commands to generate.