Fix: destroy streams in consumer del channel
[lttng-tools.git] / src / common / consumer.c
index cbcb2f74d27f8dfb8297b8c37f04b67112f6806e..34751969dbedee0f6f35a08a057f836d32e609d8 100644 (file)
@@ -45,6 +45,7 @@
 
 #include "consumer.h"
 #include "consumer-stream.h"
+#include "consumer-testpoint.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -224,16 +225,6 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        return channel;
 }
 
-static void free_stream_rcu(struct rcu_head *head)
-{
-       struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
-       struct lttng_consumer_stream *stream =
-               caa_container_of(node, struct lttng_consumer_stream, node);
-
-       free(stream);
-}
-
 static void free_channel_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
@@ -1583,44 +1574,49 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
        }
 
-       while (len > 0) {
-               ret = lttng_write(outfd, mmap_base + mmap_offset, len);
-               DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-               if (ret < len) {
-                       /*
-                        * This is possible if the fd is closed on the other side (outfd)
-                        * or any write problem. It can be verbose a bit for a normal
-                        * execution if for instance the relayd is stopped abruptly. This
-                        * can happen so set this to a DBG statement.
-                        */
-                       DBG("Error in file write mmap");
-                       if (written == 0) {
-                               written = -errno;
-                       }
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EPIPE || errno == EINVAL) {
-                               relayd_hang_up = 1;
-                               goto write_error;
-                       }
-                       goto end;
-               } else if (ret > len) {
-                       PERROR("Error in file write (ret %zd > len %lu)", ret, len);
-                       written += ret;
-                       goto end;
+       /*
+        * This call guarantee that len or less is returned. It's impossible to
+        * receive a ret value that is bigger than len.
+        */
+       ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+       DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+       if (ret < 0 || ((size_t) ret != len)) {
+               /*
+                * Report error to caller if nothing was written else at least send the
+                * amount written.
+                */
+               if (ret < 0) {
+                       written = -errno;
                } else {
-                       len -= ret;
-                       mmap_offset += ret;
+                       written = ret;
                }
 
-               /* This call is useless on a socket so better save a syscall. */
-               if (!relayd) {
-                       /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
-                                       SYNC_FILE_RANGE_WRITE);
-                       stream->out_fd_offset += ret;
+               /* Socket operation failed. We consider the relayd dead */
+               if (errno == EPIPE || errno == EINVAL) {
+                       /*
+                        * This is possible if the fd is closed on the other side
+                        * (outfd) or any write problem. It can be verbose a bit for a
+                        * normal execution if for instance the relayd is stopped
+                        * abruptly. This can happen so set this to a DBG statement.
+                        */
+                       DBG("Consumer mmap write detected relayd hang up");
+                       relayd_hang_up = 1;
+                       goto write_error;
                }
-               stream->output_written += ret;
-               written += ret;
+
+               /* Unhandled error, print it and stop function right now. */
+               PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+               goto end;
+       }
+       stream->output_written += ret;
+       written = ret;
+
+       /* This call is useless on a socket so better save a syscall. */
+       if (!relayd) {
+               /* This won't block, but will start writeout asynchronously */
+               lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+                               SYNC_FILE_RANGE_WRITE);
+               stream->out_fd_offset += len;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
 
@@ -1789,11 +1785,11 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe, ret %zd", ret_splice);
                if (ret_splice < 0) {
-                       PERROR("Error in relay splice");
+                       ret = errno;
                        if (written == 0) {
                                written = ret_splice;
                        }
-                       ret = errno;
+                       PERROR("Error in relay splice");
                        goto splice_error;
                }
 
@@ -1819,27 +1815,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("Consumer splice pipe to file, ret %zd", ret_splice);
                if (ret_splice < 0) {
-                       PERROR("Error in file splice");
+                       ret = errno;
                        if (written == 0) {
                                written = ret_splice;
                        }
                        /* Socket operation failed. We consider the relayd dead */
-                       if (errno == EBADF || errno == EPIPE) {
+                       if (errno == EBADF || errno == EPIPE || errno == ESPIPE) {
                                WARN("Remote relayd disconnected. Stopping");
                                relayd_hang_up = 1;
                                goto write_error;
                        }
-                       ret = errno;
+                       PERROR("Error in file splice");
                        goto splice_error;
                } else if (ret_splice > len) {
-                       errno = EINVAL;
-                       PERROR("Wrote more data than requested %zd (len: %lu)",
-                                       ret_splice, len);
+                       /*
+                        * We don't expect this code path to be executed but you never know
+                        * so this is an extra protection agains a buggy splice().
+                        */
                        written += ret_splice;
                        ret = errno;
+                       PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
+                                       len);
                        goto splice_error;
+               } else {
+                       /* All good, update current len and continue. */
+                       len -= ret_splice;
                }
-               len -= ret_splice;
 
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
@@ -1852,9 +1853,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
-
-       ret = ret_splice;
-
        goto end;
 
 write_error:
@@ -1948,7 +1946,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
 }
 
-void lttng_consumer_close_metadata(void)
+void lttng_consumer_close_all_metadata(void)
 {
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1966,7 +1964,7 @@ void lttng_consumer_close_metadata(void)
                 * because at this point we are sure that the metadata producer is
                 * either dead or blocked.
                 */
-               lttng_ustconsumer_close_metadata(metadata_ht);
+               lttng_ustconsumer_close_all_metadata(metadata_ht);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -1980,10 +1978,7 @@ void lttng_consumer_close_metadata(void)
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
-       int ret;
-       struct lttng_ht_iter iter;
        struct lttng_consumer_channel *free_chan = NULL;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        /*
@@ -1994,96 +1989,17 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 
        DBG3("Consumer delete metadata stream %d", stream->wait_fd);
 
-       if (ht == NULL) {
-               /* Means the stream was allocated but not successfully added */
-               goto free_stream_rcu;
-       }
-
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (stream->mmap_base != NULL) {
-                       ret = munmap(stream->mmap_base, stream->mmap_len);
-                       if (ret != 0) {
-                               PERROR("munmap metadata stream");
-                       }
-               }
-               if (stream->wait_fd >= 0) {
-                       ret = close(stream->wait_fd);
-                       if (ret < 0) {
-                               PERROR("close kernel metadata wait_fd");
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               if (stream->monitor) {
-                       /* close the write-side in close_metadata */
-                       ret = close(stream->ust_metadata_poll_pipe[0]);
-                       if (ret < 0) {
-                               PERROR("Close UST metadata read-side poll pipe");
-                       }
-               }
-               lttng_ustconsumer_del_stream(stream);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
-       }
-
-       rcu_read_lock();
-       iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(ht, &iter);
-       assert(!ret);
+       /* Remove any reference to that stream. */
+       consumer_stream_delete(stream, ht);
 
-       iter.iter.node = &stream->node_channel_id.node;
-       ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_session_id.node;
-       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
-       assert(!ret);
-       rcu_read_unlock();
-
-       if (stream->out_fd >= 0) {
-               ret = close(stream->out_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       /* Check and cleanup relayd */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_dec(&relayd->refcount);
-               assert(uatomic_read(&relayd->refcount) >= 0);
-
-               /* Closing streams requires to lock the control socket. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_close_stream(&relayd->control_sock,
-                               stream->relayd_stream_id, stream->next_net_seq_num - 1);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       DBG("Unable to close stream on the relayd. Continuing");
-                       /*
-                        * Continue here. There is nothing we can do for the relayd.
-                        * Chances are that the relayd has closed the socket so we just
-                        * continue cleaning up.
-                        */
-               }
-
-               /* Both conditions are met, we destroy the relayd. */
-               if (uatomic_read(&relayd->refcount) == 0 &&
-                               uatomic_read(&relayd->destroy_flag)) {
-                       consumer_destroy_relayd(relayd);
-               }
-       }
-       rcu_read_unlock();
+       /* Close down everything including the relayd if one. */
+       consumer_stream_close(stream);
+       /* Destroy tracer buffers of the stream. */
+       consumer_stream_destroy_buffers(stream);
 
        /* Atomically decrement channel refcount since other threads can use it. */
        if (!uatomic_sub_return(&stream->chan->refcount, 1)
@@ -2092,11 +2008,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                free_chan = stream->chan;
        }
 
-end:
        /*
         * Nullify the stream reference so it is not used after deletion. The
-        * channel lock MUST be acquired before being able to check for
-        * a NULL pointer value.
+        * channel lock MUST be acquired before being able to check for a NULL
+        * pointer value.
         */
        stream->chan->metadata_stream = NULL;
 
@@ -2108,8 +2023,7 @@ end:
                consumer_del_channel(free_chan);
        }
 
-free_stream_rcu:
-       call_rcu(&stream->node.head, free_stream_rcu);
+       consumer_stream_free(stream);
 }
 
 /*
@@ -2254,6 +2168,10 @@ void *consumer_thread_metadata_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
 
+       if (testpoint(consumerd_thread_metadata)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        DBG("Thread metadata poll started");
@@ -2342,7 +2260,7 @@ restart:
 
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
-                                                       LPOLLIN | LPOLLPRI);
+                                                       LPOLLIN | LPOLLPRI | LPOLLHUP);
                                }
 
                                /* Handle other stream */
@@ -2428,6 +2346,7 @@ end:
 
        lttng_poll_clean(&events);
 end_poll:
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2456,6 +2375,10 @@ void *consumer_thread_data_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
 
+       if (testpoint(consumerd_thread_data)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
@@ -2687,6 +2610,7 @@ end:
         */
        (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
@@ -2728,12 +2652,17 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
                        break;
                case LTTNG_CONSUMER32_UST:
                case LTTNG_CONSUMER64_UST:
-                       /*
-                        * Note: a mutex is taken internally within
-                        * liblttng-ust-ctl to protect timer wakeup_fd
-                        * use from concurrent close.
-                        */
-                       lttng_ustconsumer_close_stream_wakeup(stream);
+                       if (stream->metadata_flag) {
+                               /* Safe and protected by the stream lock. */
+                               lttng_ustconsumer_close_metadata(stream->chan);
+                       } else {
+                               /*
+                                * Note: a mutex is taken internally within
+                                * liblttng-ust-ctl to protect timer wakeup_fd
+                                * use from concurrent close.
+                                */
+                               lttng_ustconsumer_close_stream_wakeup(stream);
+                       }
                        break;
                default:
                        ERR("Unknown consumer_data type");
@@ -2787,6 +2716,10 @@ void *consumer_thread_channel_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
 
+       if (testpoint(consumerd_thread_channel)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
@@ -2886,6 +2819,15 @@ restart:
                                        {
                                                struct lttng_consumer_stream *stream, *stmp;
 
+                                               /*
+                                                * This command should never be called if the channel
+                                                * has streams monitored by either the data or metadata
+                                                * thread. The consumer only notify this thread with a
+                                                * channel del. command if it receives a destroy
+                                                * channel command from the session daemon that send it
+                                                * if a command prior to the GET_CHANNEL failed.
+                                                */
+
                                                rcu_read_lock();
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
@@ -2897,7 +2839,6 @@ restart:
                                                iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
-                                               consumer_close_channel_streams(chan);
 
                                                switch (consumer_data.type) {
                                                case LTTNG_CONSUMER_KERNEL:
@@ -2908,12 +2849,9 @@ restart:
                                                        cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
                                                                        send_node) {
                                                                health_code_update();
-
+                                                               /* Remove from the list and destroy it. */
                                                                cds_list_del(&stream->send_node);
-                                                               lttng_ustconsumer_del_stream(stream);
-                                                               uatomic_sub(&stream->chan->refcount, 1);
-                                                               assert(&chan->refcount);
-                                                               free(stream);
+                                                               consumer_stream_destroy(stream, NULL);
                                                        }
                                                        break;
                                                default:
@@ -2967,6 +2905,12 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
+
+                               /*
+                                * This will close the wait fd for each stream associated to
+                                * this channel AND monitored by the data/metadata thread thus
+                                * will be clean by the right thread.
+                                */
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
@@ -2988,6 +2932,7 @@ end:
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
+error_testpoint:
        DBG("Channel poll thread exiting");
        if (err) {
                health_error();
@@ -3043,6 +2988,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
 
+       if (testpoint(consumerd_thread_sessiond)) {
+               goto error_testpoint;
+       }
+
        health_code_update();
 
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
@@ -3147,7 +3096,7 @@ end:
         *
         * NOTE: for now, this only applies to the UST tracer.
         */
-       lttng_consumer_close_metadata();
+       lttng_consumer_close_all_metadata();
 
        /*
         * when all fds have hung up, the polling thread
@@ -3179,6 +3128,7 @@ end:
                }
        }
 
+error_testpoint:
        if (err) {
                health_error();
                ERR("Health error occurred in %s", __func__);
This page took 0.029886 seconds and 4 git commands to generate.