+ /* Session daemon status message are handled in the following call. */
+ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id);
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_ADD_CHANNEL:
+ {
+ struct lttng_consumer_channel *new_channel;
+ int ret_recv;
+
+ health_code_update();
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ health_code_update();
+
+ DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
+ new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
+ msg.u.channel.session_id, msg.u.channel.pathname,
+ msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
+ msg.u.channel.relayd_id, msg.u.channel.output,
+ msg.u.channel.tracefile_size,
+ msg.u.channel.tracefile_count, 0,
+ msg.u.channel.monitor,
+ msg.u.channel.live_timer_interval,
+ NULL, NULL);
+ if (new_channel == NULL) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ goto end_nosignal;
+ }
+ new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+ switch (msg.u.channel.output) {
+ case LTTNG_EVENT_SPLICE:
+ new_channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ new_channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ ERR("Channel output unknown %d", msg.u.channel.output);
+ goto end_nosignal;
+ }
+
+ /* Translate and save channel type. */
+ switch (msg.u.channel.type) {
+ case CONSUMER_CHANNEL_TYPE_DATA:
+ case CONSUMER_CHANNEL_TYPE_METADATA:
+ new_channel->type = msg.u.channel.type;
+ break;
+ default:
+ assert(0);
+ goto end_nosignal;
+ };
+
+ health_code_update();
+
+ if (ctx->on_recv_channel != NULL) {
+ ret_recv = ctx->on_recv_channel(new_channel);
+ if (ret_recv == 0) {
+ ret = consumer_add_channel(new_channel, ctx);
+ } else if (ret_recv < 0) {
+ goto end_nosignal;
+ }
+ } else {
+ ret = consumer_add_channel(new_channel, ctx);
+ }
+ if (CONSUMER_CHANNEL_TYPE_DATA) {
+ consumer_timer_live_start(new_channel,
+ msg.u.channel.live_timer_interval);
+ }
+
+ health_code_update();
+
+ /* If we received an error in add_channel, we need to report it. */
+ if (ret < 0) {
+ ret = consumer_send_status_msg(sock, ret);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ goto end_nosignal;
+ }
+
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_ADD_STREAM:
+ {
+ int fd;
+ struct lttng_pipe *stream_pipe;
+ struct lttng_consumer_stream *new_stream;
+ struct lttng_consumer_channel *channel;
+ int alloc_ret = 0;
+
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
+ channel = consumer_find_channel(msg.u.stream.channel_key);
+ if (!channel) {
+ /*
+ * We could not find the channel. Can happen if cpu hotplug
+ * happens while tearing down.
+ */
+ ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ health_code_update();
+
+ if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
+ /* Channel was not found. */
+ goto end_nosignal;
+ }
+
+ /* Blocking call */
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret) {
+ goto error_fatal;
+ }
+
+ health_code_update();
+
+ /* Get stream file descriptor from socket */
+ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret != sizeof(fd)) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
+ return ret;
+ }
+
+ health_code_update();
+
+ /*
+ * Send status code to session daemon only if the recv works. If the
+ * above recv() failed, the session daemon is notified through the
+ * error socket and the teardown is eventually done.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ health_code_update();
+
+ new_stream = consumer_allocate_stream(channel->key,
+ fd,
+ LTTNG_CONSUMER_ACTIVE_STREAM,
+ channel->name,
+ channel->uid,
+ channel->gid,
+ channel->relayd_id,
+ channel->session_id,
+ msg.u.stream.cpu,
+ &alloc_ret,
+ channel->type,
+ channel->monitor);
+ if (new_stream == NULL) {
+ switch (alloc_ret) {
+ case -ENOMEM:
+ case -EINVAL:
+ default:
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ break;
+ }
+ goto end_nosignal;
+ }
+
+ new_stream->chan = channel;
+ new_stream->wait_fd = fd;
+ switch (channel->output) {
+ case CONSUMER_CHANNEL_SPLICE:
+ new_stream->output = LTTNG_EVENT_SPLICE;
+ ret = utils_create_pipe(new_stream->splice_pipe);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ break;
+ case CONSUMER_CHANNEL_MMAP:
+ new_stream->output = LTTNG_EVENT_MMAP;
+ break;
+ default:
+ ERR("Stream output unknown %d", channel->output);
+ goto end_nosignal;
+ }
+
+ /*
+ * We've just assigned the channel to the stream so increment the
+ * refcount right now. We don't need to increment the refcount for
+ * streams in no monitor because we handle manually the cleanup of
+ * those. It is very important to make sure there is NO prior
+ * consumer_del_stream() calls or else the refcount will be unbalanced.
+ */
+ if (channel->monitor) {
+ uatomic_inc(&new_stream->chan->refcount);
+ }
+
+ /*
+ * The buffer flush is done on the session daemon side for the kernel
+ * so no need for the stream "hangup_flush_done" variable to be
+ * tracked. This is important for a kernel stream since we don't rely
+ * on the flush state of the stream to read data. It's not the case for
+ * user space tracing.
+ */
+ new_stream->hangup_flush_done = 0;
+
+ health_code_update();
+
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
+ }
+
+ health_code_update();
+
+ if (new_stream->metadata_flag) {
+ channel->metadata_stream = new_stream;
+ }
+
+ /* Do not monitor this stream. */
+ if (!channel->monitor) {
+ DBG("Kernel consumer add stream %s in no monitor mode with "
+ "relayd id %" PRIu64, new_stream->name,
+ new_stream->net_seq_idx);
+ cds_list_add(&new_stream->send_node, &channel->streams.head);
+ break;
+ }
+
+ /* Send stream to relayd if the stream has an ID. */
+ if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_stream(new_stream,
+ new_stream->chan->pathname);
+ if (ret < 0) {
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
+ }
+
+ /* Get the right pipe where the stream will be sent. */
+ if (new_stream->metadata_flag) {
+ ret = consumer_add_metadata_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
+ stream_pipe = ctx->consumer_metadata_pipe;
+ } else {
+ ret = consumer_add_data_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+ new_stream->key);
+ consumer_stream_free(new_stream);
+ goto end_nosignal;
+ }
+ stream_pipe = ctx->consumer_data_pipe;
+ }
+
+ /* Vitible to other threads */
+ new_stream->globally_visible = 1;
+
+ health_code_update();
+
+ ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
+ if (ret < 0) {
+ ERR("Consumer write %s stream to pipe %d",
+ new_stream->metadata_flag ? "metadata" : "data",
+ lttng_pipe_get_writefd(stream_pipe));
+ if (new_stream->metadata_flag) {
+ consumer_del_stream_for_metadata(new_stream);
+ } else {
+ consumer_del_stream_for_data(new_stream);
+ }
+ goto end_nosignal;
+ }
+
+ DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
+ new_stream->name, fd, new_stream->relayd_stream_id);
+ break;
+ }
+ case LTTNG_CONSUMER_STREAMS_SENT:
+ {
+ struct lttng_consumer_channel *channel;
+
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
+ channel = consumer_find_channel(msg.u.sent_streams.channel_key);
+ if (!channel) {
+ /*
+ * We could not find the channel. Can happen if cpu hotplug
+ * happens while tearing down.
+ */
+ ERR("Unable to find channel key %" PRIu64,
+ msg.u.sent_streams.channel_key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ /*
+ * Send status code to session daemon.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ health_code_update();
+
+ /*
+ * We should not send this message if we don't monitor the
+ * streams in this channel.
+ */
+ if (!channel->monitor) {
+ break;
+ }
+
+ health_code_update();
+ /* Send stream to relayd if the stream has an ID. */
+ if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_streams_sent(
+ msg.u.sent_streams.net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_UPDATE_STREAM:
+ {
+ rcu_read_unlock();
+ return -ENOSYS;
+ }
+ case LTTNG_CONSUMER_DESTROY_RELAYD:
+ {
+ uint64_t index = msg.u.destroy_relayd.net_seq_idx;
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("Kernel consumer destroying relayd %" PRIu64, index);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(index);
+ if (relayd == NULL) {
+ DBG("Unable to find relayd %" PRIu64, index);
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ }
+
+ /*
+ * Each relayd socket pair has a refcount of stream attached to it
+ * which tells if the relayd is still active or not depending on the
+ * refcount value.
+ *
+ * This will set the destroy flag of the relayd object and destroy it
+ * if the refcount reaches zero when called.
+ *
+ * The destroy can happen either here or when a stream fd hangs up.
+ */
+ if (relayd) {
+ consumer_flag_relayd_for_destroy(relayd);
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_DATA_PENDING:
+ {
+ int32_t ret;
+ uint64_t id = msg.u.data_pending.session_id;
+
+ DBG("Kernel consumer data pending command for id %" PRIu64, id);
+
+ ret = consumer_data_pending(id);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
+ goto error_fatal;
+ }
+
+ /*
+ * No need to send back a status message since the data pending
+ * returned value is the response.
+ */
+ break;
+ }
+ case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
+ {
+ if (msg.u.snapshot_channel.metadata == 1) {
+ ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id, ctx);
+ if (ret < 0) {
+ ERR("Snapshot metadata failed");
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ }
+ } else {
+ ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.nb_packets_per_stream,
+ ctx);
+ if (ret < 0) {
+ ERR("Snapshot channel failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;