+ rcu_read_unlock();
+ return -ENOSYS;
+ }
+ case LTTNG_CONSUMER_DESTROY_RELAYD:
+ {
+ uint64_t index = msg.u.destroy_relayd.net_seq_idx;
+ struct consumer_relayd_sock_pair *relayd;
+ int ret_send_status;
+
+ DBG("Kernel consumer destroying relayd %" PRIu64, index);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(index);
+ if (relayd == NULL) {
+ DBG("Unable to find relayd %" PRIu64, index);
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ }
+
+ /*
+ * Each relayd socket pair has a refcount of stream attached to it
+ * which tells if the relayd is still active or not depending on the
+ * refcount value.
+ *
+ * This will set the destroy flag of the relayd object and destroy it
+ * if the refcount reaches zero when called.
+ *
+ * The destroy can happen either here or when a stream fd hangs up.
+ */
+ if (relayd) {
+ consumer_flag_relayd_for_destroy(relayd);
+ }
+
+ health_code_update();
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_DATA_PENDING:
+ {
+ int32_t ret_data_pending;
+ uint64_t id = msg.u.data_pending.session_id;
+ ssize_t ret_send;
+
+ DBG("Kernel consumer data pending command for id %" PRIu64, id);
+
+ ret_data_pending = consumer_data_pending(id);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
+ sizeof(ret_data_pending));
+ if (ret_send < 0) {
+ PERROR("send data pending ret code");
+ goto error_fatal;
+ }
+
+ /*
+ * No need to send back a status message since the data pending
+ * returned value is the response.
+ */
+ break;
+ }
+ case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
+ {
+ struct lttng_consumer_channel *channel;
+ uint64_t key = msg.u.snapshot_channel.key;
+ int ret_send_status;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pthread_mutex_lock(&channel->lock);
+ if (msg.u.snapshot_channel.metadata == 1) {
+ int ret_snapshot;
+
+ ret_snapshot = lttng_kconsumer_snapshot_metadata(
+ channel, key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ ctx);
+ if (ret_snapshot < 0) {
+ ERR("Snapshot metadata failed");
+ ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
+ }
+ } else {
+ int ret_snapshot;
+
+ ret_snapshot = lttng_kconsumer_snapshot_channel(
+ channel, key,
+ msg.u.snapshot_channel.pathname,
+ msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel
+ .nb_packets_per_stream,
+ ctx);
+ if (ret_snapshot < 0) {
+ ERR("Snapshot channel failed");
+ ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
+ }
+ }
+ pthread_mutex_unlock(&channel->lock);
+ }
+ health_code_update();
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_DESTROY_CHANNEL:
+ {
+ uint64_t key = msg.u.destroy_channel.key;
+ struct lttng_consumer_channel *channel;
+ int ret_send_status;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_destroy_channel;
+ }
+
+ health_code_update();
+
+ /* Stop right now if no channel was found. */
+ if (!channel) {
+ goto end_destroy_channel;
+ }
+
+ /*
+ * This command should ONLY be issued for channel with streams set in
+ * no monitor mode.
+ */
+ assert(!channel->monitor);
+
+ /*
+ * The refcount should ALWAYS be 0 in the case of a channel in no
+ * monitor mode.
+ */
+ assert(!uatomic_sub_return(&channel->refcount, 1));
+
+ consumer_del_channel(channel);
+end_destroy_channel:
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_DISCARDED_EVENTS:
+ {
+ ssize_t ret;
+ uint64_t count;
+ struct lttng_consumer_channel *channel;
+ uint64_t id = msg.u.discarded_events.session_id;
+ uint64_t key = msg.u.discarded_events.channel_key;
+
+ DBG("Kernel consumer discarded events command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer discarded events channel %"
+ PRIu64 " not found", key);
+ count = 0;
+ } else {
+ count = channel->discarded_events;
+ }
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ ssize_t ret;
+ uint64_t count;
+ struct lttng_consumer_channel *channel;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ DBG("Kernel consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer lost packets channel %"
+ PRIu64 " not found", key);
+ count = 0;
+ } else {
+ count = channel->lost_packets;
+ }
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+ int ret_send_status, ret_set_channel_monitor_pipe;
+ ssize_t ret_recv;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ goto error_fatal;
+ }
+
+ ret_recv = lttcomm_recv_fds_unix_sock(
+ sock, &channel_monitor_pipe, 1);
+ if (ret_recv != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret_set_channel_monitor_pipe =
+ consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret_set_channel_monitor_pipe) {
+ int flags;
+ int ret_fcntl;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret_fcntl == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret_fcntl;
+
+ ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret_fcntl == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;