Fix: consumer add channel return value was positive on error
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index d8aec492f774a4c3a2fb1b6bcc0bf3f64a587b3e..11830fc05a087f4f601a3156560ac5b7a6735365 100644 (file)
@@ -83,6 +83,11 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */
 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
@@ -93,6 +98,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+               if (ret > 0) {
+                       ret = -1;
+               }
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
@@ -122,14 +130,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ADD_CHANNEL:
        {
                struct lttng_consumer_channel *new_channel;
+               int ret_recv;
 
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
-
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -155,20 +163,31 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                };
 
                if (ctx->on_recv_channel != NULL) {
-                       ret = ctx->on_recv_channel(new_channel);
-                       if (ret == 0) {
-                               consumer_add_channel(new_channel, ctx);
-                       } else if (ret < 0) {
+                       ret_recv = ctx->on_recv_channel(new_channel);
+                       if (ret_recv == 0) {
+                               ret = consumer_add_channel(new_channel, ctx);
+                       } else if (ret_recv < 0) {
                                goto end_nosignal;
                        }
                } else {
-                       consumer_add_channel(new_channel, ctx);
+                       ret = consumer_add_channel(new_channel, ctx);
+               }
+
+               /* If we received an error in add_channel, we need to report it. */
+               if (ret < 0) {
+                       ret = consumer_send_status_msg(sock, ret);
+                       if (ret < 0) {
+                               goto error_fatal;
+                       }
+                       goto end_nosignal;
                }
+
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_STREAM:
        {
-               int fd, stream_pipe;
+               int fd;
+               struct lttng_pipe *stream_pipe;
                struct consumer_relayd_sock_pair *relayd = NULL;
                struct lttng_consumer_stream *new_stream;
                struct lttng_consumer_channel *channel;
@@ -190,10 +209,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
-               if (ret < 0 || ret_code != LTTNG_OK) {
+               if (ret < 0) {
+                       /*
+                        * Somehow, the session daemon is not responding
+                        * anymore.
+                        */
+                       goto error_fatal;
+               }
+               if (ret_code != LTTNG_OK) {
                        /*
-                        * Somehow, the session daemon is not responding anymore or the
-                        * channel was not found.
+                        * Channel was not found.
                         */
                        goto end_nosignal;
                }
@@ -247,6 +272,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
 
+               /*
+                * We've just assigned the channel to the stream so increment the
+                * refcount right now.
+                */
+               uatomic_inc(&new_stream->chan->refcount);
+
                /*
                 * The buffer flush is done on the session daemon side for the kernel
                 * so no need for the stream "hangup_flush_done" variable to be
@@ -288,18 +319,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       stream_pipe = ctx->consumer_metadata_pipe[1];
+                       stream_pipe = ctx->consumer_metadata_pipe;
                } else {
-                       stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
+                       stream_pipe = ctx->consumer_data_pipe;
                }
 
-               do {
-                       ret = write(stream_pipe, &new_stream, sizeof(new_stream));
-               } while (ret < 0 && errno == EINTR);
+               ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
-                       PERROR("Consumer write %s stream to pipe %d",
+                       ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
-                                       stream_pipe);
+                                       lttng_pipe_get_writefd(stream_pipe));
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
@@ -344,7 +373,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
 
                goto end_nosignal;
@@ -362,6 +391,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
                        PERROR("send data pending ret code");
+                       goto error_fatal;
                }
 
                /*
@@ -382,6 +412,11 @@ end_nosignal:
         * shutdown during the recv() or send() call.
         */
        return 1;
+
+error_fatal:
+       rcu_read_unlock();
+       /* This will issue a consumer stop. */
+       return -1;
 }
 
 /*
This page took 0.025683 seconds and 4 git commands to generate.