Fix: increment channel refcount on add_stream
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 385af875a2d9b69aeeff6c58f5dcf8017338e16d..f47c498777531ef6702b41d34b153abba3320e1e 100644 (file)
@@ -34,7 +34,9 @@
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/compat/fcntl.h>
+#include <common/pipe.h>
 #include <common/relayd/relayd.h>
+#include <common/utils.h>
 
 #include "kernel-consumer.h"
 
@@ -120,6 +122,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ADD_CHANNEL:
        {
                struct lttng_consumer_channel *new_channel;
+               int ret_recv;
 
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
@@ -127,7 +130,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
-
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -153,20 +155,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                };
 
                if (ctx->on_recv_channel != NULL) {
-                       ret = ctx->on_recv_channel(new_channel);
-                       if (ret == 0) {
-                               consumer_add_channel(new_channel, ctx);
-                       } else if (ret < 0) {
+                       ret_recv = ctx->on_recv_channel(new_channel);
+                       if (ret_recv == 0) {
+                               ret = consumer_add_channel(new_channel, ctx);
+                       } else if (ret_recv < 0) {
                                goto end_nosignal;
                        }
                } else {
-                       consumer_add_channel(new_channel, ctx);
+                       ret = consumer_add_channel(new_channel, ctx);
+               }
+
+               /* If we received an error in add_channel, we need to report it. */
+               if (ret != 0) {
+                       consumer_send_status_msg(sock, ret);
+                       goto end_nosignal;
                }
+
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_STREAM:
        {
-               int fd, stream_pipe;
+               int fd;
+               struct lttng_pipe *stream_pipe;
                struct consumer_relayd_sock_pair *relayd = NULL;
                struct lttng_consumer_stream *new_stream;
                struct lttng_consumer_channel *channel;
@@ -245,6 +255,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
 
+               /* Metadata chan refcount is increment in add_metadata_stream */
+               if (new_stream->chan->type != CONSUMER_CHANNEL_TYPE_METADATA) {
+                       /* Update channel refcount */
+                       uatomic_inc(&new_stream->chan->refcount);
+               }
+
                /*
                 * The buffer flush is done on the session daemon side for the kernel
                 * so no need for the stream "hangup_flush_done" variable to be
@@ -261,7 +277,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        ret = relayd_add_stream(&relayd->control_sock,
                                        new_stream->name, new_stream->chan->pathname,
-                                       &new_stream->relayd_stream_id);
+                                       &new_stream->relayd_stream_id,
+                                       new_stream->chan->tracefile_size,
+                                       new_stream->chan->tracefile_count);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
                                consumer_del_stream(new_stream, NULL);
@@ -284,18 +302,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       stream_pipe = ctx->consumer_metadata_pipe[1];
+                       stream_pipe = ctx->consumer_metadata_pipe;
                } else {
-                       stream_pipe = ctx->consumer_data_pipe[1];
+                       stream_pipe = ctx->consumer_data_pipe;
                }
 
-               do {
-                       ret = write(stream_pipe, &new_stream, sizeof(new_stream));
-               } while (ret < 0 && errno == EINTR);
+               ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
-                       PERROR("Consumer write %s stream to pipe %d",
+                       ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
-                                       stream_pipe);
+                                       lttng_pipe_get_writefd(stream_pipe));
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
@@ -506,10 +522,16 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
-       ret = lttng_create_output_file(stream);
-       if (ret < 0) {
-               ERR("Creating output file");
-               goto error;
+       /* Don't create anything if this is set for streaming. */
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+               ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+                               stream->chan->tracefile_size, stream->tracefile_count_current,
+                               stream->uid, stream->gid);
+               if (ret < 0) {
+                       goto error;
+               }
+               stream->out_fd = ret;
+               stream->tracefile_size_current = 0;
        }
 
        if (stream->output == LTTNG_EVENT_MMAP) {
This page took 0.024505 seconds and 4 git commands to generate.