Fix: increment channel refcount on add_stream
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 196deee9a153e521447703915d9ee9d2790502a4..f47c498777531ef6702b41d34b153abba3320e1e 100644 (file)
@@ -34,7 +34,9 @@
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/compat/fcntl.h>
+#include <common/pipe.h>
 #include <common/relayd/relayd.h>
+#include <common/utils.h>
 
 #include "kernel-consumer.h"
 
@@ -47,8 +49,7 @@ extern volatile int consumer_quit;
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
        int infd = stream->wait_fd;
@@ -67,9 +68,7 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx,
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_kconsumer_get_produced_snapshot(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream,
+int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos)
 {
        int ret;
@@ -88,6 +87,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
+       enum lttng_error_code ret_code = LTTNG_OK;
        struct lttcomm_consumer_msg msg;
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
@@ -96,6 +96,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
+               /*
+                * Notify the session daemon that the command is completed.
+                *
+                * On transport layer error, the function call will print an error
+                * message so handling the returned code is a bit useless since we
+                * return an error code anyway.
+                */
+               (void) consumer_send_status_msg(sock, ret_code);
                return -ENOENT;
        }
 
@@ -105,44 +113,99 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
+               /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock);
+                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_CHANNEL:
        {
                struct lttng_consumer_channel *new_channel;
+               int ret_recv;
 
-               DBG("consumer_add_channel %d", msg.u.channel.channel_key);
+               /* First send a status message before receiving the fds. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
-                               -1, -1,
-                               msg.u.channel.mmap_len,
-                               msg.u.channel.max_sb_size,
-                               msg.u.channel.nb_init_streams);
+                               msg.u.channel.session_id, msg.u.channel.pathname,
+                               msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
+                               msg.u.channel.relayd_id, msg.u.channel.output,
+                               msg.u.channel.tracefile_size,
+                               msg.u.channel.tracefile_count);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
                }
+               new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+
+               /* Translate and save channel type. */
+               switch (msg.u.channel.type) {
+               case CONSUMER_CHANNEL_TYPE_DATA:
+               case CONSUMER_CHANNEL_TYPE_METADATA:
+                       new_channel->type = msg.u.channel.type;
+                       break;
+               default:
+                       assert(0);
+                       goto end_nosignal;
+               };
+
                if (ctx->on_recv_channel != NULL) {
-                       ret = ctx->on_recv_channel(new_channel);
-                       if (ret == 0) {
-                               consumer_add_channel(new_channel);
-                       } else if (ret < 0) {
+                       ret_recv = ctx->on_recv_channel(new_channel);
+                       if (ret_recv == 0) {
+                               ret = consumer_add_channel(new_channel, ctx);
+                       } else if (ret_recv < 0) {
                                goto end_nosignal;
                        }
                } else {
-                       consumer_add_channel(new_channel);
+                       ret = consumer_add_channel(new_channel, ctx);
+               }
+
+               /* If we received an error in add_channel, we need to report it. */
+               if (ret != 0) {
+                       consumer_send_status_msg(sock, ret);
+                       goto end_nosignal;
                }
+
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_STREAM:
        {
-               int fd, stream_pipe;
+               int fd;
+               struct lttng_pipe *stream_pipe;
                struct consumer_relayd_sock_pair *relayd = NULL;
                struct lttng_consumer_stream *new_stream;
+               struct lttng_consumer_channel *channel;
                int alloc_ret = 0;
 
+               /*
+                * Get stream's channel reference. Needed when adding the stream to the
+                * global hash table.
+                */
+               channel = consumer_find_channel(msg.u.stream.channel_key);
+               if (!channel) {
+                       /*
+                        * We could not find the channel. Can happen if cpu hotplug
+                        * happens while tearing down.
+                        */
+                       ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
+                       ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+               }
+
+               /* First send a status message before receiving the fds. */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0 || ret_code != LTTNG_OK) {
+                       /*
+                        * Somehow, the session daemon is not responding anymore or the
+                        * channel was not found.
+                        */
+                       goto end_nosignal;
+               }
+
                /* block */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
@@ -157,19 +220,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
-               new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
-                               msg.u.stream.stream_key,
-                               fd, fd,
-                               msg.u.stream.state,
-                               msg.u.stream.mmap_len,
-                               msg.u.stream.output,
-                               msg.u.stream.path_name,
-                               msg.u.stream.uid,
-                               msg.u.stream.gid,
-                               msg.u.stream.net_index,
-                               msg.u.stream.metadata_flag,
-                               msg.u.stream.session_id,
-                               &alloc_ret);
+               /*
+                * Send status code to session daemon only if the recv works. If the
+                * above recv() failed, the session daemon is notified through the
+                * error socket and the teardown is eventually done.
+                */
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+
+               new_stream = consumer_allocate_stream(channel->key,
+                               fd,
+                               LTTNG_CONSUMER_ACTIVE_STREAM,
+                               channel->name,
+                               channel->uid,
+                               channel->gid,
+                               channel->relayd_id,
+                               channel->session_id,
+                               msg.u.stream.cpu,
+                               &alloc_ret,
+                               channel->type);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
@@ -177,16 +249,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        default:
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                                break;
-                       case -ENOENT:
-                               /*
-                                * We could not find the channel. Can happen if cpu hotplug
-                                * happens while tearing down.
-                                */
-                               DBG3("Could not find channel");
-                               break;
                        }
                        goto end_nosignal;
                }
+               new_stream->chan = channel;
+               new_stream->wait_fd = fd;
+
+               /* Metadata chan refcount is increment in add_metadata_stream */
+               if (new_stream->chan->type != CONSUMER_CHANNEL_TYPE_METADATA) {
+                       /* Update channel refcount */
+                       uatomic_inc(&new_stream->chan->refcount);
+               }
 
                /*
                 * The buffer flush is done on the session daemon side for the kernel
@@ -198,21 +271,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                new_stream->hangup_flush_done = 0;
 
                /* The stream is not metadata. Get relayd reference if exists. */
-               relayd = consumer_find_relayd(msg.u.stream.net_index);
+               relayd = consumer_find_relayd(new_stream->net_seq_idx);
                if (relayd != NULL) {
                        /* Add stream on the relayd */
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        ret = relayd_add_stream(&relayd->control_sock,
-                                       msg.u.stream.name, msg.u.stream.path_name,
-                                       &new_stream->relayd_stream_id);
+                                       new_stream->name, new_stream->chan->pathname,
+                                       &new_stream->relayd_stream_id,
+                                       new_stream->chan->tracefile_size,
+                                       new_stream->chan->tracefile_count);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
-               } else if (msg.u.stream.net_index != -1) {
-                       ERR("Network sequence index %d unknown. Not adding stream.",
-                                       msg.u.stream.net_index);
+               } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
+                                       new_stream->net_seq_idx);
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
@@ -227,24 +302,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       stream_pipe = ctx->consumer_metadata_pipe[1];
+                       stream_pipe = ctx->consumer_metadata_pipe;
                } else {
-                       stream_pipe = ctx->consumer_data_pipe[1];
+                       stream_pipe = ctx->consumer_data_pipe;
                }
 
-               do {
-                       ret = write(stream_pipe, &new_stream, sizeof(new_stream));
-               } while (ret < 0 && errno == EINTR);
+               ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
-                       PERROR("Consumer write %s stream to pipe %d",
+                       ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
-                                       stream_pipe);
+                                       lttng_pipe_get_writefd(stream_pipe));
                        consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
 
                DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
-                               msg.u.stream.path_name, fd, new_stream->relayd_stream_id);
+                               new_stream->name, fd, new_stream->relayd_stream_id);
                break;
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
@@ -262,8 +335,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Get relayd reference if exists. */
                relayd = consumer_find_relayd(index);
                if (relayd == NULL) {
-                       ERR("Unable to find relayd %" PRIu64, index);
-                       goto end_nosignal;
+                       DBG("Unable to find relayd %" PRIu64, index);
+                       ret_code = LTTNG_ERR_NO_CONSUMER;
                }
 
                /*
@@ -276,24 +349,37 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 *
                 * The destroy can happen either here or when a stream fd hangs up.
                 */
-               consumer_flag_relayd_for_destroy(relayd);
+               if (relayd) {
+                       consumer_flag_relayd_for_destroy(relayd);
+               }
+
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
 
                goto end_nosignal;
        }
-       case LTTNG_CONSUMER_DATA_AVAILABLE:
+       case LTTNG_CONSUMER_DATA_PENDING:
        {
                int32_t ret;
-               uint64_t id = msg.u.data_available.session_id;
+               uint64_t id = msg.u.data_pending.session_id;
 
-               DBG("Kernel consumer data available command for id %" PRIu64, id);
+               DBG("Kernel consumer data pending command for id %" PRIu64, id);
 
-               ret = consumer_data_available(id);
+               ret = consumer_data_pending(id);
 
                /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
-                       PERROR("send data available ret code");
+                       PERROR("send data pending ret code");
                }
+
+               /*
+                * No need to send back a status message since the data pending
+                * returned value is the response.
+                */
                break;
        }
        default:
@@ -346,7 +432,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       switch (stream->output) {
+       switch (stream->chan->output) {
        case LTTNG_EVENT_SPLICE:
 
                /*
@@ -396,8 +482,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 * network streaming or the full padding (len) size when we are _not_
                 * streaming.
                 */
-               if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
-                               (ret != len && stream->net_seq_idx == -1)) {
+               if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
+                               (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
                        /*
                         * Display the error but continue processing to try to release the
                         * subbuffer
@@ -434,18 +520,18 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
-       /* Opening the tracefile in write mode */
-       if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) {
-               ret = run_as_open(stream->path_name,
-                               O_WRONLY|O_CREAT|O_TRUNC,
-                               S_IRWXU|S_IRWXG|S_IRWXO,
+       assert(stream);
+
+       /* Don't create anything if this is set for streaming. */
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+               ret = utils_create_stream_file(stream->chan->pathname, stream->name,
+                               stream->chan->tracefile_size, stream->tracefile_count_current,
                                stream->uid, stream->gid);
                if (ret < 0) {
-                       ERR("Opening %s", stream->path_name);
-                       perror("open");
                        goto error;
                }
                stream->out_fd = ret;
+               stream->tracefile_size_current = 0;
        }
 
        if (stream->output == LTTNG_EVENT_MMAP) {
@@ -455,15 +541,15 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
                if (ret != 0) {
                        errno = -ret;
-                       perror("kernctl_get_mmap_len");
+                       PERROR("kernctl_get_mmap_len");
                        goto error_close_fd;
                }
                stream->mmap_len = (size_t) mmap_len;
 
-               stream->mmap_base = mmap(NULL, stream->mmap_len,
-                               PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
+               stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
+                               MAP_PRIVATE, stream->wait_fd, 0);
                if (stream->mmap_base == MAP_FAILED) {
-                       perror("Error mmaping");
+                       PERROR("Error mmaping");
                        ret = -1;
                        goto error_close_fd;
                }
@@ -488,10 +574,10 @@ error:
  * stream. Consumer data lock MUST be acquired before calling this function
  * and the stream lock.
  *
- * Return 0 if the traced data are still getting read else 1 meaning that the
+ * Return 1 if the traced data are still getting read else 0 meaning that the
  * data is available for trace viewer reading.
  */
-int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream)
+int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
 {
        int ret;
 
@@ -502,11 +588,12 @@ int lttng_kconsumer_data_available(struct lttng_consumer_stream *stream)
                /* There is still data so let's put back this subbuffer. */
                ret = kernctl_put_subbuf(stream->wait_fd);
                assert(ret == 0);
+               ret = 1;   /* Data is pending */
                goto end;
        }
 
-       /* Data is available to be read for this stream. */
-       ret = 1;
+       /* Data is NOT pending and ready to be read. */
+       ret = 0;
 
 end:
        return ret;
This page took 0.029313 seconds and 4 git commands to generate.