+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ int ret;
+ uint64_t lost_packets;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no packets lost if the channel is not yet in use).
+ */
+ lost_packets = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ lost_packets = stream->chan->lost_packets;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &lost_packets,
+ sizeof(lost_packets));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+ 1);
+ if (ret != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret = consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret) {
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_ROTATE_RENAME:
+ {
+ DBG("Consumer rename session %" PRIu64 " after rotation",
+ msg.u.rotate_rename.session_id);
+ ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path,
+ msg.u.rotate_rename.new_path,
+ msg.u.rotate_rename.uid,
+ msg.u.rotate_rename.gid,
+ msg.u.rotate_rename.relayd_id);
+ if (ret < 0) {
+ ERR("Rotate rename failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_MKDIR:
+ {
+ DBG("Consumer mkdir %s in session %" PRIu64,
+ msg.u.mkdir.path,
+ msg.u.mkdir.session_id);
+ ret = lttng_consumer_mkdir(msg.u.mkdir.path,
+ msg.u.mkdir.uid,
+ msg.u.mkdir.gid,
+ msg.u.mkdir.relayd_id);
+ if (ret < 0) {
+ ERR("consumer mkdir failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+end_nosignal:
+ rcu_read_unlock();
+
+ health_code_update();
+
+ /*
+ * Return 1 to indicate success since the 0 value can be a socket
+ * shutdown during the recv() or send() call.
+ */
+ return 1;
+
+end_msg_sessiond:
+ /*
+ * The returned value here is not useful since either way we'll return 1 to
+ * the caller because the session daemon socket management is done
+ * elsewhere. Returning a negative code or 0 will shutdown the consumer.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ rcu_read_unlock();
+
+ health_code_update();
+
+ return 1;
+end_channel_error:
+ if (channel) {
+ /*
+ * Free channel here since no one has a reference to it. We don't
+ * free after that because a stream can store this pointer.
+ */
+ destroy_channel(channel);
+ }
+ /* We have to send a status channel message indicating an error. */
+ ret = consumer_send_status_channel(sock, NULL);
+ if (ret < 0) {
+ /* Stop everything if session daemon can not be notified. */
+ goto error_fatal;
+ }
+ rcu_read_unlock();
+
+ health_code_update();
+
+ return 1;
+error_fatal:
+ rcu_read_unlock();
+ /* This will issue a consumer stop. */
+ return -1;
+}
+
+/*
+ * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
+ * compiled out, we isolate it in this library.
+ */
+int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
+ unsigned long *off)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_get_mmap_read_offset(stream->ustream, off);
+}
+
+/*
+ * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
+ * compiled out, we isolate it in this library.
+ */