+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret_fcntl == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret_fcntl;
+
+ ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret_fcntl == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_ROTATE_CHANNEL:
+ {
+ struct lttng_consumer_channel *found_channel;
+ uint64_t key = msg.u.rotate_channel.key;
+ int ret_send_status;
+
+ found_channel = consumer_find_channel(key);
+ if (!found_channel) {
+ DBG("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ int rotate_channel;
+
+ /*
+ * Sample the rotate position of all the streams in
+ * this channel.
+ */
+ rotate_channel = lttng_consumer_rotate_channel(
+ found_channel, key,
+ msg.u.rotate_channel.relayd_id,
+ msg.u.rotate_channel.metadata, ctx);
+ if (rotate_channel < 0) {
+ ERR("Rotate channel failed");
+ ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
+ }
+
+ health_code_update();
+ }
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_rotate_channel_nosignal;
+ }
+
+ /*
+ * Rotate the streams that are ready right now.
+ * FIXME: this is a second consecutive iteration over the
+ * streams in a channel, there is probably a better way to
+ * handle this, but it needs to be after the
+ * consumer_send_status_msg() call.
+ */
+ if (found_channel) {
+ int ret_rotate_read_streams;
+
+ ret_rotate_read_streams =
+ lttng_consumer_rotate_ready_streams(
+ found_channel, key,
+ ctx);
+ if (ret_rotate_read_streams < 0) {
+ ERR("Rotate channel failed");
+ }
+ }
+ break;
+end_rotate_channel_nosignal:
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_CLEAR_CHANNEL:
+ {
+ struct lttng_consumer_channel *found_channel;
+ uint64_t key = msg.u.clear_channel.key;
+ int ret_send_status;
+
+ found_channel = consumer_find_channel(key);
+ if (!found_channel) {
+ DBG("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ int ret_clear_channel;
+
+ ret_clear_channel = lttng_consumer_clear_channel(
+ found_channel);
+ if (ret_clear_channel) {
+ ERR("Clear channel failed key %" PRIu64, key);
+ ret_code = ret_clear_channel;
+ }
+
+ health_code_update();
+ }
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_INIT:
+ {
+ int ret_send_status;
+
+ ret_code = lttng_consumer_init_command(ctx,
+ msg.u.init.sessiond_uuid);
+ health_code_update();
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
+ {
+ const struct lttng_credentials credentials = {
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
+ };
+ const bool is_local_trace =
+ !msg.u.create_trace_chunk.relayd_id.is_set;
+ const uint64_t relayd_id =
+ msg.u.create_trace_chunk.relayd_id.value;
+ const char *chunk_override_name =
+ *msg.u.create_trace_chunk.override_name ?
+ msg.u.create_trace_chunk.override_name :
+ NULL;
+ struct lttng_directory_handle *chunk_directory_handle = NULL;
+
+ /*
+ * The session daemon will only provide a chunk directory file
+ * descriptor for local traces.
+ */
+ if (is_local_trace) {
+ int chunk_dirfd;
+ int ret_send_status;
+ ssize_t ret_recv;
+
+ /* Acnowledge the reception of the command. */
+ ret_send_status = consumer_send_status_msg(
+ sock, LTTCOMM_CONSUMERD_SUCCESS);
+ if (ret_send_status < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ /*
+ * Receive trace chunk domain dirfd.
+ */
+ ret_recv = lttcomm_recv_fds_unix_sock(
+ sock, &chunk_dirfd, 1);
+ if (ret_recv != sizeof(chunk_dirfd)) {
+ ERR("Failed to receive trace chunk domain directory file descriptor");
+ goto error_fatal;
+ }
+
+ DBG("Received trace chunk domain directory fd (%d)",
+ chunk_dirfd);
+ chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
+ chunk_dirfd);
+ if (!chunk_directory_handle) {
+ ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
+ if (close(chunk_dirfd)) {
+ PERROR("Failed to close chunk directory file descriptor");
+ }
+ goto error_fatal;
+ }
+ }
+
+ ret_code = lttng_consumer_create_trace_chunk(
+ !is_local_trace ? &relayd_id : NULL,
+ msg.u.create_trace_chunk.session_id,
+ msg.u.create_trace_chunk.chunk_id,
+ (time_t) msg.u.create_trace_chunk
+ .creation_timestamp,
+ chunk_override_name,
+ msg.u.create_trace_chunk.credentials.is_set ?
+ &credentials :
+ NULL,
+ chunk_directory_handle);
+ lttng_directory_handle_put(chunk_directory_handle);
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
+ {
+ enum lttng_trace_chunk_command_type close_command =
+ msg.u.close_trace_chunk.close_command.value;
+ const uint64_t relayd_id =
+ msg.u.close_trace_chunk.relayd_id.value;
+ struct lttcomm_consumer_close_trace_chunk_reply reply;
+ char closed_trace_chunk_path[LTTNG_PATH_MAX];
+ int ret;
+
+ ret_code = lttng_consumer_close_trace_chunk(
+ msg.u.close_trace_chunk.relayd_id.is_set ?
+ &relayd_id :
+ NULL,
+ msg.u.close_trace_chunk.session_id,
+ msg.u.close_trace_chunk.chunk_id,
+ (time_t) msg.u.close_trace_chunk.close_timestamp,
+ msg.u.close_trace_chunk.close_command.is_set ?
+ &close_command :
+ NULL, closed_trace_chunk_path);
+ reply.ret_code = ret_code;
+ reply.path_length = strlen(closed_trace_chunk_path) + 1;
+ ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
+ if (ret != sizeof(reply)) {
+ goto error_fatal;
+ }
+ ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
+ reply.path_length);
+ if (ret != reply.path_length) {
+ goto error_fatal;
+ }
+ goto end_nosignal;
+ }
+ case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
+ {
+ const uint64_t relayd_id =
+ msg.u.trace_chunk_exists.relayd_id.value;
+
+ ret_code = lttng_consumer_trace_chunk_exists(
+ msg.u.trace_chunk_exists.relayd_id.is_set ?
+ &relayd_id : NULL,
+ msg.u.trace_chunk_exists.session_id,
+ msg.u.trace_chunk_exists.chunk_id);
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+ {
+ const uint64_t key = msg.u.open_channel_packets.key;
+ struct lttng_consumer_channel *found_channel =
+ consumer_find_channel(key);
+
+ if (found_channel) {
+ pthread_mutex_lock(&found_channel->lock);
+ ret_code = lttng_consumer_open_channel_packets(
+ found_channel);
+ pthread_mutex_unlock(&found_channel->lock);
+ } else {
+ /*
+ * The channel could have disappeared in per-pid
+ * buffering mode.
+ */
+ DBG("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+ goto end_msg_sessiond;
+ }
+ default:
+ break;
+ }
+
+end_nosignal:
+ /*
+ * Return 1 to indicate success since the 0 value can be a socket