Add hash table argument to helper functions
[lttng-tools.git] / src / common / consumer.c
index 53806b08d8ee4a0dcd54b80ee71edaca221fd784..161bf7e324ba38abcc6976f91f219c9f2257d4af 100644 (file)
@@ -62,20 +62,23 @@ volatile int consumer_quit = 0;
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
  */
-static struct lttng_consumer_stream *consumer_find_stream(int key)
+static struct lttng_consumer_stream *consumer_find_stream(int key,
+               struct lttng_ht *ht)
 {
        struct lttng_ht_iter iter;
        struct lttng_ht_node_ulong *node;
        struct lttng_consumer_stream *stream = NULL;
 
+       assert(ht);
+
        /* Negative keys are lookup failures */
-       if (key < 0)
+       if (key < 0) {
                return NULL;
+       }
 
        rcu_read_lock();
 
-       lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
-                       &iter);
+       lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
                stream = caa_container_of(node, struct lttng_consumer_stream, node);
@@ -86,12 +89,12 @@ static struct lttng_consumer_stream *consumer_find_stream(int key)
        return stream;
 }
 
-static void consumer_steal_stream_key(int key)
+static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
        rcu_read_lock();
-       stream = consumer_find_stream(key);
+       stream = consumer_find_stream(key, ht);
        if (stream) {
                stream->key = -1;
                /*
@@ -111,8 +114,9 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
        struct lttng_consumer_channel *channel = NULL;
 
        /* Negative keys are lookup failures */
-       if (key < 0)
+       if (key < 0) {
                return NULL;
+       }
 
        rcu_read_lock();
 
@@ -242,7 +246,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                if (stream->mmap_base != NULL) {
                        ret = munmap(stream->mmap_base, stream->mmap_len);
                        if (ret != 0) {
-                               perror("munmap");
+                               PERROR("munmap");
                        }
                }
                break;
@@ -353,13 +357,19 @@ struct lttng_consumer_stream *consumer_allocate_stream(
 
        stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
-               perror("malloc struct lttng_consumer_stream");
+               PERROR("malloc struct lttng_consumer_stream");
                *alloc_ret = -ENOMEM;
-               return NULL;
+               goto end;
        }
+
+       /*
+        * Get stream's channel reference. Needed when adding the stream to the
+        * global hash table.
+        */
        stream->chan = consumer_find_channel(channel_key);
        if (!stream->chan) {
                *alloc_ret = -ENOENT;
+               ERR("Unable to find channel for stream %d", stream_key);
                goto error;
        }
        stream->chan->refcount++;
@@ -419,6 +429,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
 
 error:
        free(stream);
+end:
        return NULL;
 }
 
@@ -434,7 +445,7 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
 
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
-       consumer_steal_stream_key(stream->key);
+       consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
 
        rcu_read_lock();
        lttng_ht_lookup(consumer_data.stream_ht,
@@ -611,7 +622,7 @@ void consumer_change_stream_state(int stream_key,
        struct lttng_consumer_stream *stream;
 
        pthread_mutex_lock(&consumer_data.lock);
-       stream = consumer_find_stream(stream_key);
+       stream = consumer_find_stream(stream_key, consumer_data.stream_ht);
        if (stream) {
                stream->state = state;
        }
@@ -663,7 +674,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->mmap_base != NULL) {
                ret = munmap(channel->mmap_base, channel->mmap_len);
                if (ret != 0) {
-                       perror("munmap");
+                       PERROR("munmap");
                }
        }
        if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
@@ -696,7 +707,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
 
        channel = zmalloc(sizeof(*channel));
        if (channel == NULL) {
-               perror("malloc struct lttng_consumer_channel");
+               PERROR("malloc struct lttng_consumer_channel");
                goto end;
        }
        channel->key = channel_key;
@@ -820,7 +831,7 @@ restart:
                if (errno == EINTR) {
                        goto restart;
                }
-               perror("Poll error");
+               PERROR("Poll error");
                goto exit;
        }
        if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
@@ -912,7 +923,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
                ret = write(ctx->consumer_should_quit[1], "4", 1);
        } while (ret < 0 && errno == EINTR);
        if (ret < 0) {
-               perror("write consumer quit");
+               PERROR("write consumer quit");
        }
 }
 
@@ -984,7 +995,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
        if (ctx == NULL) {
-               perror("allocating context");
+               PERROR("allocating context");
                goto error;
        }
 
@@ -997,33 +1008,33 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ret = pipe(ctx->consumer_poll_pipe);
        if (ret < 0) {
-               perror("Error creating poll pipe");
+               PERROR("Error creating poll pipe");
                goto error_poll_pipe;
        }
 
        /* set read end of the pipe to non-blocking */
        ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto error_poll_fcntl;
        }
 
        /* set write end of the pipe to non-blocking */
        ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto error_poll_fcntl;
        }
 
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
-               perror("Error creating recv pipe");
+               PERROR("Error creating recv pipe");
                goto error_quit_pipe;
        }
 
        ret = pipe(ctx->consumer_thread_pipe);
        if (ret < 0) {
-               perror("Error creating thread pipe");
+               PERROR("Error creating thread pipe");
                goto error_thread_pipe;
        }
 
@@ -1507,9 +1518,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 }
 
 /*
- * Iterate over all stream element of the hashtable and free them. This is race
- * free since the hashtable received MUST be in a race free synchronization
- * state. It's the caller responsability to make sure of that.
+ * Iterate over all streams of the hashtable and free them properly.
  */
 static void destroy_stream_ht(struct lttng_ht *ht)
 {
@@ -1526,7 +1535,7 @@ static void destroy_stream_ht(struct lttng_ht *ht)
                ret = lttng_ht_del(ht, &iter);
                assert(!ret);
 
-               free(stream);
+               call_rcu(&stream->node.head, consumer_free_stream);
        }
        rcu_read_unlock();
 
@@ -1626,7 +1635,7 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
                consumer_del_channel(stream->chan);
        }
 
-       free(stream);
+       call_rcu(&stream->node.head, consumer_free_stream);
 }
 
 /*
@@ -1882,7 +1891,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
                        pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
-                               perror("pollfd malloc");
+                               PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
@@ -1891,7 +1900,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
-                               perror("local_stream malloc");
+                               PERROR("local_stream malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
@@ -1923,7 +1932,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        if (errno == EINTR) {
                                goto restart;
                        }
-                       perror("Poll error");
+                       PERROR("Poll error");
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                        goto end;
                } else if (num_rdy == 0) {
@@ -2095,7 +2104,7 @@ void *lttng_consumer_thread_receive_fds(void *data)
 
        ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto end;
        }
 
@@ -2118,7 +2127,7 @@ void *lttng_consumer_thread_receive_fds(void *data)
        }
        ret = fcntl(sock, F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto end;
        }
 
This page took 0.029233 seconds and 4 git commands to generate.