+ case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
+ {
+ int ret;
+ struct ustctl_consumer_channel_attr attr;
+
+ /* Create a plain object and reserve a channel key. */
+ channel = allocate_channel(msg.u.ask_channel.session_id,
+ msg.u.ask_channel.pathname, msg.u.ask_channel.name,
+ msg.u.ask_channel.uid, msg.u.ask_channel.gid,
+ msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
+ (enum lttng_event_output) msg.u.ask_channel.output);
+ if (!channel) {
+ goto end_channel_error;
+ }
+
+ /* Build channel attributes from received message. */
+ attr.subbuf_size = msg.u.ask_channel.subbuf_size;
+ attr.num_subbuf = msg.u.ask_channel.num_subbuf;
+ attr.overwrite = msg.u.ask_channel.overwrite;
+ attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
+ attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
+ memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
+
+ /* Translate the event output type to UST. */
+ switch (channel->output) {
+ case LTTNG_EVENT_SPLICE:
+ /* Splice not supported so fallback on mmap(). */
+ case LTTNG_EVENT_MMAP:
+ default:
+ attr.output = CONSUMER_CHANNEL_MMAP;
+ break;
+ };
+
+ /* Translate and save channel type. */
+ switch (msg.u.ask_channel.type) {
+ case LTTNG_UST_CHAN_PER_CPU:
+ channel->type = CONSUMER_CHANNEL_TYPE_DATA;
+ attr.type = LTTNG_UST_CHAN_PER_CPU;
+ break;
+ case LTTNG_UST_CHAN_METADATA:
+ channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
+ attr.type = LTTNG_UST_CHAN_METADATA;
+ break;
+ default:
+ assert(0);
+ goto error_fatal;
+ };
+
+ ret = ask_channel(ctx, sock, channel, &attr);
+ if (ret < 0) {
+ goto end_channel_error;
+ }
+
+ /*
+ * Add the channel to the internal state AFTER all streams were created
+ * and successfully sent to session daemon. This way, all streams must
+ * be ready before this channel is visible to the threads.
+ */
+ ret = add_channel(channel, ctx);
+ if (ret < 0) {
+ goto end_channel_error;
+ }
+
+ /*
+ * Channel and streams are now created. Inform the session daemon that
+ * everything went well and should wait to receive the channel and
+ * streams with ustctl API.
+ */
+ ret = consumer_send_status_channel(sock, channel);
+ if (ret < 0) {
+ /*
+ * There is probably a problem on the socket so the poll will get
+ * it and clean everything up.
+ */
+ goto end_nosignal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_GET_CHANNEL:
+ {
+ int ret, relayd_err = 0;
+ uint64_t key = msg.u.get_channel.key;
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer get channel key %lu not found", key);
+ ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
+ }
+
+ /* Inform sessiond that we are about to send channel and streams. */
+ ret = consumer_send_status_msg(sock, LTTNG_OK);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ /* Send everything to sessiond. */
+ ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
+ if (ret < 0) {
+ if (relayd_err) {
+ /*
+ * We were unable to send to the relayd the stream so avoid
+ * sending back a fatal error to the thread since this is OK
+ * and the consumer can continue its work.
+ */
+ ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
+ goto end_msg_sessiond;
+ }
+ /*
+ * The communicaton was broken hence there is a bad state between
+ * the consumer and sessiond so stop everything.
+ */
+ goto error_fatal;
+ }
+
+ ret = send_streams_to_thread(channel, ctx);
+ if (ret < 0) {
+ /*
+ * If we are unable to send the stream to the thread, there is
+ * a big problem so just stop everything.
+ */
+ goto error_fatal;
+ }
+ /* List MUST be empty after or else it could be reused. */
+ assert(cds_list_empty(&channel->streams.head));
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_DESTROY_CHANNEL:
+ {
+ uint64_t key = msg.u.destroy_channel.key;
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer get channel key %lu not found", key);
+ ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ goto end_msg_sessiond;
+ }
+
+ destroy_channel(channel);
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_CLOSE_METADATA:
+ {
+ int ret;
+
+ ret = close_metadata(msg.u.close_metadata.key);
+ if (ret != 0) {
+ ret_code = ret;
+ }
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_PUSH_METADATA:
+ {
+ int ret;
+ uint64_t len = msg.u.push_metadata.len;
+ uint64_t target_offset = msg.u.push_metadata.target_offset;
+ uint64_t key = msg.u.push_metadata.key;
+ struct lttng_consumer_channel *channel;
+ char *metadata_str;
+
+ DBG("UST consumer push metadata key %lu of len %lu", key, len);
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("UST consumer push metadata %lu not found", key);
+ ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ }
+
+ metadata_str = zmalloc(len * sizeof(char));
+ if (!metadata_str) {
+ PERROR("zmalloc metadata string");
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto end_msg_sessiond;
+ }
+
+ /* Tell session daemon we are ready to receive the metadata. */
+ ret = consumer_send_status_msg(sock, LTTNG_OK);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error_fatal;
+ }
+
+ /* Wait for more data. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ goto end_nosignal;
+ }
+
+ /* Receive metadata string. */
+ ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
+ if (ret < 0) {
+ /* Session daemon is dead so return gracefully. */
+ goto end_nosignal;
+ }
+
+ ret = push_metadata(channel, metadata_str, target_offset, len);
+ free(metadata_str);
+ if (ret < 0) {
+ /* Unable to handle metadata. Notify session daemon. */
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto end_msg_sessiond;
+ }
+
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_SETUP_METADATA:
+ {
+ int ret;
+
+ ret = setup_metadata(ctx, msg.u.setup_metadata.key);
+ if (ret) {
+ ret_code = ret;
+ }
+ goto end_msg_sessiond;
+ }