Fix: cleanup relayd on any I/O error in read subbuffer mmap
[lttng-tools.git] / src / common / consumer.c
index 9d5a36970551fd19a5cb2288ca15026f1bf8dc70..ffbecf7163ef394b4ffce3b8bf6874d3b5b4375f 100644 (file)
@@ -150,6 +150,31 @@ error:
        return (int) ret;
 }
 
+/*
+ * Cleanup the stream list of a channel. Those streams are not yet globally
+ * visible
+ */
+static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
+{
+       struct lttng_consumer_stream *stream, *stmp;
+
+       assert(channel);
+
+       /* Delete streams that might have been left in the stream list. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               cds_list_del(&stream->send_node);
+               /*
+                * Once a stream is added to this list, the buffers were created so we
+                * have a guarantee that this call will succeed. Setting the monitor
+                * mode to 0 so we don't lock nor try to delete the stream from the
+                * global hash table.
+                */
+               stream->monitor = 0;
+               consumer_stream_destroy(stream, NULL);
+       }
+}
+
 /*
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
@@ -292,23 +317,14 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
        struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream, *stmp;
 
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
 
-       /* Delete streams that might have been left in the stream list. */
-       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                       send_node) {
-               cds_list_del(&stream->send_node);
-               /*
-                * Once a stream is added to this list, the buffers were created so
-                * we have a guarantee that this call will succeed.
-                */
-               consumer_stream_destroy(stream, NULL);
-       }
+       /* Destroy streams that might have been left in the stream list. */
+       clean_channel_stream_list(channel);
 
        if (channel->live_timer_enabled == 1) {
                consumer_timer_live_stop(channel);
@@ -1441,7 +1457,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 {
        unsigned long mmap_offset;
        void *mmap_base;
-       ssize_t ret = 0, written = 0;
+       ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
@@ -1465,9 +1481,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        case LTTNG_CONSUMER_KERNEL:
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
-               if (ret != 0) {
+               if (ret < 0) {
+                       ret = -errno;
                        PERROR("tracer ctl get_mmap_read_offset");
-                       written = -errno;
                        goto end;
                }
                break;
@@ -1476,13 +1492,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                mmap_base = lttng_ustctl_get_mmap_base(stream);
                if (!mmap_base) {
                        ERR("read mmap get mmap base for stream %s", stream->name);
-                       written = -EPERM;
+                       ret = -EPERM;
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
                if (ret != 0) {
                        PERROR("tracer ctl get_mmap_read_offset");
-                       written = ret;
+                       ret = -EINVAL;
                        goto end;
                }
                break;
@@ -1506,30 +1522,20 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
 
                ret = write_relayd_stream_header(stream, netlen, padding, relayd);
-               if (ret >= 0) {
-                       /* Use the returned socket. */
-                       outfd = ret;
+               if (ret < 0) {
+                       relayd_hang_up = 1;
+                       goto write_error;
+               }
+               /* Use the returned socket. */
+               outfd = ret;
 
-                       /* Write metadata stream id before payload */
-                       if (stream->metadata_flag) {
-                               ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
-                               if (ret < 0) {
-                                       written = ret;
-                                       /* Socket operation failed. We consider the relayd dead */
-                                       if (ret == -EPIPE || ret == -EINVAL) {
-                                               relayd_hang_up = 1;
-                                               goto write_error;
-                                       }
-                                       goto end;
-                               }
-                       }
-               } else {
-                       /* Socket operation failed. We consider the relayd dead */
-                       if (ret == -EPIPE || ret == -EINVAL) {
+               /* Write metadata stream id before payload */
+               if (stream->metadata_flag) {
+                       ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+                       if (ret < 0) {
                                relayd_hang_up = 1;
                                goto write_error;
                        }
-                       /* Else, use the default set before which is the filesystem. */
                }
        } else {
                /* No streaming, we have to set the len with the full padding */
@@ -1586,13 +1592,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                 * amount written.
                 */
                if (ret < 0) {
-                       written = -errno;
-               } else {
-                       written = ret;
+                       ret = -errno;
                }
+               relayd_hang_up = 1;
 
                /* Socket operation failed. We consider the relayd dead */
-               if (errno == EPIPE || errno == EINVAL) {
+               if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
                        /*
                         * This is possible if the fd is closed on the other side
                         * (outfd) or any write problem. It can be verbose a bit for a
@@ -1600,16 +1605,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                         * abruptly. This can happen so set this to a DBG statement.
                         */
                        DBG("Consumer mmap write detected relayd hang up");
-                       relayd_hang_up = 1;
-                       goto write_error;
+               } else {
+                       /* Unhandled error, print it and stop function right now. */
+                       PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
                }
-
-               /* Unhandled error, print it and stop function right now. */
-               PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
-               goto end;
+               goto write_error;
        }
        stream->output_written += ret;
-       written = ret;
 
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
@@ -1636,7 +1638,7 @@ end:
        }
 
        rcu_read_unlock();
-       return written;
+       return ret;
 }
 
 /*
@@ -2652,12 +2654,17 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
                        break;
                case LTTNG_CONSUMER32_UST:
                case LTTNG_CONSUMER64_UST:
-                       /*
-                        * Note: a mutex is taken internally within
-                        * liblttng-ust-ctl to protect timer wakeup_fd
-                        * use from concurrent close.
-                        */
-                       lttng_ustconsumer_close_stream_wakeup(stream);
+                       if (stream->metadata_flag) {
+                               /* Safe and protected by the stream lock. */
+                               lttng_ustconsumer_close_metadata(stream->chan);
+                       } else {
+                               /*
+                                * Note: a mutex is taken internally within
+                                * liblttng-ust-ctl to protect timer wakeup_fd
+                                * use from concurrent close.
+                                */
+                               lttng_ustconsumer_close_stream_wakeup(stream);
+                       }
                        break;
                default:
                        ERR("Unknown consumer_data type");
@@ -2812,7 +2819,14 @@ restart:
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
-                                               struct lttng_consumer_stream *stream, *stmp;
+                                               /*
+                                                * This command should never be called if the channel
+                                                * has streams monitored by either the data or metadata
+                                                * thread. The consumer only notify this thread with a
+                                                * channel del. command if it receives a destroy
+                                                * channel command from the session daemon that send it
+                                                * if a command prior to the GET_CHANNEL failed.
+                                                */
 
                                                rcu_read_lock();
                                                chan = consumer_find_channel(key);
@@ -2825,24 +2839,15 @@ restart:
                                                iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
-                                               consumer_close_channel_streams(chan);
 
                                                switch (consumer_data.type) {
                                                case LTTNG_CONSUMER_KERNEL:
                                                        break;
                                                case LTTNG_CONSUMER32_UST:
                                                case LTTNG_CONSUMER64_UST:
-                                                       /* Delete streams that might have been left in the stream list. */
-                                                       cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
-                                                                       send_node) {
-                                                               health_code_update();
-
-                                                               cds_list_del(&stream->send_node);
-                                                               lttng_ustconsumer_del_stream(stream);
-                                                               uatomic_sub(&stream->chan->refcount, 1);
-                                                               assert(&chan->refcount);
-                                                               free(stream);
-                                                       }
+                                                       health_code_update();
+                                                       /* Destroy streams that might have been left in the stream list. */
+                                                       clean_channel_stream_list(chan);
                                                        break;
                                                default:
                                                        ERR("Unknown consumer_data type");
@@ -2895,6 +2900,12 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
+
+                               /*
+                                * This will close the wait fd for each stream associated to
+                                * this channel AND monitored by the data/metadata thread thus
+                                * will be clean by the right thread.
+                                */
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
This page took 0.026055 seconds and 4 git commands to generate.