Fix: Add missing call rcu and read side lock
[lttng-tools.git] / src / common / consumer.c
index 16a6c47f4269e4ad9155ba5d242a5e99ed0c8805..b55fd153d7830e1d817ed4144dbd65fdf776b00b 100644 (file)
@@ -69,8 +69,9 @@ static struct lttng_consumer_stream *consumer_find_stream(int key)
        struct lttng_consumer_stream *stream = NULL;
 
        /* Negative keys are lookup failures */
-       if (key < 0)
+       if (key < 0) {
                return NULL;
+       }
 
        rcu_read_lock();
 
@@ -111,8 +112,9 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
        struct lttng_consumer_channel *channel = NULL;
 
        /* Negative keys are lookup failures */
-       if (key < 0)
+       if (key < 0) {
                return NULL;
+       }
 
        rcu_read_lock();
 
@@ -175,7 +177,7 @@ static void consumer_rcu_free_relayd(struct rcu_head *head)
  *
  * This function MUST be called with the consumer_data lock acquired.
  */
-void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -218,7 +220,7 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
 
        /* Destroy the relayd if refcount is 0 */
        if (uatomic_read(&relayd->refcount) == 0) {
-               consumer_destroy_relayd(relayd);
+               destroy_relayd(relayd);
        }
 }
 
@@ -242,7 +244,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                if (stream->mmap_base != NULL) {
                        ret = munmap(stream->mmap_base, stream->mmap_len);
                        if (ret != 0) {
-                               perror("munmap");
+                               PERROR("munmap");
                        }
                }
                break;
@@ -314,23 +316,25 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       consumer_destroy_relayd(relayd);
+                       destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
 
-       if (!--stream->chan->refcount) {
+       uatomic_dec(&stream->chan->refcount);
+       if (!uatomic_read(&stream->chan->refcount)
+                       && !uatomic_read(&stream->chan->nb_init_streams)) {
                free_chan = stream->chan;
        }
 
-
        call_rcu(&stream->node.head, consumer_free_stream);
 end:
        consumer_data.need_update = 1;
        pthread_mutex_unlock(&consumer_data.lock);
 
-       if (free_chan)
+       if (free_chan) {
                consumer_del_channel(free_chan);
+       }
 }
 
 struct lttng_consumer_stream *consumer_allocate_stream(
@@ -343,20 +347,28 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                uid_t uid,
                gid_t gid,
                int net_index,
-               int metadata_flag)
+               int metadata_flag,
+               int *alloc_ret)
 {
        struct lttng_consumer_stream *stream;
        int ret;
 
        stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
-               perror("malloc struct lttng_consumer_stream");
+               PERROR("malloc struct lttng_consumer_stream");
+               *alloc_ret = -ENOMEM;
                goto end;
        }
+
+       /*
+        * Get stream's channel reference. Needed when adding the stream to the
+        * global hash table.
+        */
        stream->chan = consumer_find_channel(channel_key);
        if (!stream->chan) {
-               perror("Unable to find channel key");
-               goto end;
+               *alloc_ret = -ENOENT;
+               ERR("Unable to find channel for stream %d", stream_key);
+               goto error;
        }
        stream->chan->refcount++;
        stream->key = stream_key;
@@ -385,24 +397,38 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                stream->cpu = stream->chan->cpucount++;
                ret = lttng_ustconsumer_allocate_stream(stream);
                if (ret) {
-                       free(stream);
-                       return NULL;
+                       *alloc_ret = -EINVAL;
+                       goto error;
                }
                break;
        default:
                ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
+               *alloc_ret = -EINVAL;
+               goto error;
+       }
+
+       /*
+        * When nb_init_streams reaches 0, we don't need to trigger any action in
+        * terms of destroying the associated channel, because the action that
+        * causes the count to become 0 also causes a stream to be added. The
+        * channel deletion will thus be triggered by the following removal of this
+        * stream.
+        */
+       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+               uatomic_dec(&stream->chan->nb_init_streams);
        }
-       DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
-                       stream->path_name, stream->key,
-                       stream->shm_fd,
-                       stream->wait_fd,
-                       (unsigned long long) stream->mmap_len,
-                       stream->out_fd,
+
+       DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
+                       " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
+                       stream->shm_fd, stream->wait_fd,
+                       (unsigned long long) stream->mmap_len, stream->out_fd,
                        stream->net_seq_idx);
-end:
        return stream;
+
+error:
+       free(stream);
+end:
+       return NULL;
 }
 
 /*
@@ -452,8 +478,7 @@ end:
  * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
  * be acquired before calling this.
  */
-
-int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
+static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret = 0;
        struct lttng_ht_node_ulong *node;
@@ -647,7 +672,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->mmap_base != NULL) {
                ret = munmap(channel->mmap_base, channel->mmap_len);
                if (ret != 0) {
-                       perror("munmap");
+                       PERROR("munmap");
                }
        }
        if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
@@ -672,14 +697,15 @@ struct lttng_consumer_channel *consumer_allocate_channel(
                int channel_key,
                int shm_fd, int wait_fd,
                uint64_t mmap_len,
-               uint64_t max_sb_size)
+               uint64_t max_sb_size,
+               unsigned int nb_init_streams)
 {
        struct lttng_consumer_channel *channel;
        int ret;
 
        channel = zmalloc(sizeof(*channel));
        if (channel == NULL) {
-               perror("malloc struct lttng_consumer_channel");
+               PERROR("malloc struct lttng_consumer_channel");
                goto end;
        }
        channel->key = channel_key;
@@ -688,6 +714,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
        channel->mmap_len = mmap_len;
        channel->max_sb_size = max_sb_size;
        channel->refcount = 0;
+       channel->nb_init_streams = nb_init_streams;
        lttng_ht_node_init_ulong(&channel->node, channel->key);
 
        switch (consumer_data.type) {
@@ -802,7 +829,7 @@ restart:
                if (errno == EINTR) {
                        goto restart;
                }
-               perror("Poll error");
+               PERROR("Poll error");
                goto exit;
        }
        if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
@@ -894,7 +921,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
                ret = write(ctx->consumer_should_quit[1], "4", 1);
        } while (ret < 0 && errno == EINTR);
        if (ret < 0) {
-               perror("write consumer quit");
+               PERROR("write consumer quit");
        }
 }
 
@@ -966,7 +993,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
        if (ctx == NULL) {
-               perror("allocating context");
+               PERROR("allocating context");
                goto error;
        }
 
@@ -979,33 +1006,33 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ret = pipe(ctx->consumer_poll_pipe);
        if (ret < 0) {
-               perror("Error creating poll pipe");
+               PERROR("Error creating poll pipe");
                goto error_poll_pipe;
        }
 
        /* set read end of the pipe to non-blocking */
        ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto error_poll_fcntl;
        }
 
        /* set write end of the pipe to non-blocking */
        ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto error_poll_fcntl;
        }
 
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
-               perror("Error creating recv pipe");
+               PERROR("Error creating recv pipe");
                goto error_quit_pipe;
        }
 
        ret = pipe(ctx->consumer_thread_pipe);
        if (ret < 0) {
-               perror("Error creating thread pipe");
+               PERROR("Error creating thread pipe");
                goto error_thread_pipe;
        }
 
@@ -1489,9 +1516,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 }
 
 /*
- * Iterate over all stream element of the hashtable and free them. This is race
- * free since the hashtable received MUST be in a race free synchronization
- * state. It's the caller responsability to make sure of that.
+ * Iterate over all streams of the hashtable and free them properly.
  */
 static void destroy_stream_ht(struct lttng_ht *ht)
 {
@@ -1503,12 +1528,14 @@ static void destroy_stream_ht(struct lttng_ht *ht)
                return;
        }
 
+       rcu_read_lock();
        cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
                ret = lttng_ht_del(ht, &iter);
                assert(!ret);
 
-               free(stream);
+               call_rcu(&stream->node.head, consumer_free_stream);
        }
+       rcu_read_unlock();
 
        lttng_ht_destroy(ht);
 }
@@ -1519,7 +1546,6 @@ static void destroy_stream_ht(struct lttng_ht *ht)
 static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
-       struct lttng_consumer_channel *free_chan = NULL;
        struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
@@ -1594,22 +1620,20 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       consumer_destroy_relayd(relayd);
+                       destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
 
        /* Atomically decrement channel refcount since other threads can use it. */
        uatomic_dec(&stream->chan->refcount);
-       if (!uatomic_read(&stream->chan->refcount)) {
-               free_chan = stream->chan;
-       }
-
-       if (free_chan) {
-               consumer_del_channel(free_chan);
+       if (!uatomic_read(&stream->chan->refcount)
+                       && !uatomic_read(&stream->chan->nb_init_streams)) {
+               /* Go for channel deletion! */
+               consumer_del_channel(stream->chan);
        }
 
-       free(stream);
+       call_rcu(&stream->node.head, consumer_free_stream);
 }
 
 /*
@@ -1696,7 +1720,7 @@ restart:
 
                        /* Check the metadata pipe for incoming metadata. */
                        if (pollfd == ctx->consumer_metadata_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
+                               if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
                                        /*
                                         * Remove the pipe from the poll set and continue the loop
@@ -1706,20 +1730,13 @@ restart:
                                        close(ctx->consumer_metadata_pipe[0]);
                                        continue;
                                } else if (revents & LPOLLIN) {
-                                       stream = zmalloc(sizeof(struct lttng_consumer_stream));
-                                       if (stream == NULL) {
-                                               PERROR("zmalloc metadata consumer stream");
-                                               goto error;
-                                       }
-
                                        do {
-                                               /* Get the stream and add it to the local hash table */
-                                               ret = read(pollfd, stream,
-                                                               sizeof(struct lttng_consumer_stream));
+                                               /* Get the stream pointer received */
+                                               ret = read(pollfd, &stream, sizeof(stream));
                                        } while (ret < 0 && errno == EINTR);
-                                       if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
+                                       if (ret < 0 ||
+                                                       ret < sizeof(struct lttng_consumer_stream *)) {
                                                PERROR("read metadata stream");
-                                               free(stream);
                                                /*
                                                 * Let's continue here and hope we can still work
                                                 * without stopping the consumer. XXX: Should we?
@@ -1730,9 +1747,11 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
+                                       rcu_read_lock();
                                        /* The node should be init at this point */
                                        lttng_ht_add_unique_ulong(metadata_ht,
                                                        &stream->waitfd_node);
+                                       rcu_read_unlock();
 
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
@@ -1747,11 +1766,13 @@ restart:
 
                        /* From here, the event is a metadata wait fd */
 
+                       rcu_read_lock();
                        lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
                                        &iter);
                        node = lttng_ht_iter_get_node_ulong(&iter);
                        if (node == NULL) {
                                /* FD not found, continue loop */
+                               rcu_read_unlock();
                                continue;
                        }
 
@@ -1766,6 +1787,7 @@ restart:
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN) {
+                                       rcu_read_unlock();
                                        goto end;
                                } else if (len > 0) {
                                        stream->data_read = 1;
@@ -1776,7 +1798,7 @@ restart:
                         * Remove the stream from the hash table since there is no data
                         * left on the fd because we previously did a read on the buffer.
                         */
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLNVAL)) {
+                       if (revents & (LPOLLERR | LPOLLHUP)) {
                                DBG("Metadata fd %d is hup|err|nval.", pollfd);
                                if (!stream->hangup_flush_done
                                                && (consumer_data.type == LTTNG_CONSUMER32_UST
@@ -1788,15 +1810,18 @@ restart:
                                        len = ctx->on_buffer_ready(stream, ctx);
                                        /* It's ok to have an unavailable sub-buffer */
                                        if (len < 0 && len != -EAGAIN) {
+                                               rcu_read_unlock();
                                                goto end;
                                        }
                                }
 
                                /* Removing it from hash table, poll set and free memory */
                                lttng_ht_del(metadata_ht, &iter);
+
                                lttng_poll_del(&events, stream->wait_fd);
                                consumer_del_metadata_stream(stream);
                        }
+                       rcu_read_unlock();
                }
        }
 
@@ -1864,7 +1889,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        /* allocate for all fds + 1 for the consumer_poll_pipe */
                        pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
-                               perror("pollfd malloc");
+                               PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
@@ -1873,7 +1898,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
-                               perror("local_stream malloc");
+                               PERROR("local_stream malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
@@ -1905,7 +1930,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        if (errno == EINTR) {
                                goto restart;
                        }
-                       perror("Poll error");
+                       PERROR("Poll error");
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                        goto end;
                } else if (num_rdy == 0) {
@@ -2077,7 +2102,7 @@ void *lttng_consumer_thread_receive_fds(void *data)
 
        ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto end;
        }
 
@@ -2100,7 +2125,7 @@ void *lttng_consumer_thread_receive_fds(void *data)
        }
        ret = fcntl(sock, F_SETFL, O_NONBLOCK);
        if (ret < 0) {
-               perror("fcntl O_NONBLOCK");
+               PERROR("fcntl O_NONBLOCK");
                goto end;
        }
 
@@ -2290,7 +2315,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
-       consumer_add_relayd(relayd);
+       add_relayd(relayd);
 
        /* All good! */
        ret = 0;
This page took 0.029889 seconds and 4 git commands to generate.