+int lttng_consumer_init(void)
+{
+ consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.channel_ht) {
+ goto error;
+ }
+
+ consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.relayd_ht) {
+ goto error;
+ }
+
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.stream_list_ht) {
+ goto error;
+ }
+
+ consumer_data.stream_per_chan_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!consumer_data.stream_per_chan_id_ht) {
+ goto error;
+ }
+
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!data_ht) {
+ goto error;
+ }
+
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!metadata_ht) {
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return -1;
+}
+
+/*
+ * Process the ADD_RELAYD command receive by a consumer.
+ *
+ * This will create a relayd socket pair and add it to the relayd hash table.
+ * The caller MUST acquire a RCU read side lock before calling it.
+ */
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+ struct lttng_consumer_local_data *ctx, int sock,
+ struct pollfd *consumer_sockpoll,
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+ uint64_t relayd_session_id)
+{
+ int fd = -1, ret = -1, relayd_created = 0;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ assert(ctx);
+ assert(relayd_sock);
+
+ DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd == NULL) {
+ assert(sock_type == LTTNG_STREAM_CONTROL);
+ /* Not found. Allocate one. */
+ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+ if (relayd == NULL) {
+ ret = -ENOMEM;
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto error;
+ } else {
+ relayd->sessiond_session_id = sessiond_id;
+ relayd_created = 1;
+ }
+
+ /*
+ * This code path MUST continue to the consumer send status message to
+ * we can notify the session daemon and continue our work without
+ * killing everything.
+ */
+ } else {
+ /*
+ * relayd key should never be found for control socket.
+ */
+ assert(sock_type != LTTNG_STREAM_CONTROL);
+ }
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ goto error_nosignal;
+ }
+
+ /* Poll on consumer socket. */
+ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
+ ret = -EINTR;
+ goto error_nosignal;
+ }
+
+ /* Get relayd socket from session daemon */
+ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret != sizeof(fd)) {
+ ret = -1;
+ fd = -1; /* Just in case it gets set with an invalid value. */
+
+ /*
+ * Failing to receive FDs might indicate a major problem such as
+ * reaching a fd limit during the receive where the kernel returns a
+ * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+ * don't take any chances and stop everything.
+ *
+ * XXX: Feature request #558 will fix that and avoid this possible
+ * issue when reaching the fd limit.
+ */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
+ goto error;
+ }
+
+ /* Copy socket information and received FD */
+ switch (sock_type) {
+ case LTTNG_STREAM_CONTROL:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
+ ret = lttcomm_create_sock(&relayd->control_sock.sock);
+ /* Handle create_sock error. */
+ if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto error;
+ }
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->control_sock.sock.fd)) {
+ PERROR("close");
+ }
+
+ /* Assign new file descriptor */
+ relayd->control_sock.sock.fd = fd;
+ fd = -1; /* For error path */
+ /* Assign version values. */
+ relayd->control_sock.major = relayd_sock->major;
+ relayd->control_sock.minor = relayd_sock->minor;
+
+ relayd->relayd_session_id = relayd_session_id;
+
+ break;
+ case LTTNG_STREAM_DATA:
+ /* Copy received lttcomm socket */
+ lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
+ ret = lttcomm_create_sock(&relayd->data_sock.sock);
+ /* Handle create_sock error. */
+ if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto error;
+ }
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->data_sock.sock.fd)) {
+ PERROR("close");
+ }
+
+ /* Assign new file descriptor */
+ relayd->data_sock.sock.fd = fd;
+ fd = -1; /* for eventual error paths */
+ /* Assign version values. */
+ relayd->data_sock.major = relayd_sock->major;
+ relayd->data_sock.minor = relayd_sock->minor;
+ break;
+ default:
+ ERR("Unknown relayd socket type (%d)", sock_type);
+ ret = -1;
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
+ sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+ relayd->net_seq_idx, fd);
+
+ /* We successfully added the socket. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ goto error_nosignal;
+ }
+
+ /*
+ * Add relayd socket pair to consumer data hashtable. If object already
+ * exists or on error, the function gracefully returns.
+ */
+ add_relayd(relayd);
+
+ /* All good! */
+ return 0;
+
+error:
+ if (consumer_send_status_msg(sock, ret_code) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ }
+
+error_nosignal:
+ /* Close received socket if valid. */
+ if (fd >= 0) {
+ if (close(fd)) {
+ PERROR("close received socket");
+ }
+ }
+
+ if (relayd_created) {
+ free(relayd);
+ }
+
+ return ret;
+}
+
+/*
+ * Try to lock the stream mutex.
+ *
+ * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
+ */
+static int stream_try_lock(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret) {
+ /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
+ ret = 0;
+ goto end;
+ }
+
+ ret = 1;
+
+end:
+ return ret;
+}
+
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* Iterate over all relayd since they are indexed by net_seq_idx. */
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ /*
+ * Check by sessiond id which is unique here where the relayd session
+ * id might not be when having multiple relayd.
+ */
+ if (relayd->sessiond_session_id == id) {
+ /* Found the relayd. There can be only one per id. */
+ goto found;
+ }
+ }
+
+ return NULL;
+
+found:
+ return relayd;
+}
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is pending or else 0 meaning ready to be read.
+ */
+int consumer_data_pending(uint64_t id)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ int (*data_pending)(struct lttng_consumer_stream *);
+
+ DBG("Consumer data pending command on session id %" PRIu64, id);
+
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ data_pending = lttng_kconsumer_data_pending;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ data_pending = lttng_ustconsumer_data_pending;
+ break;
+ default:
+ ERR("Unknown consumer data type");
+ assert(0);
+ }
+
+ /* Ease our life a bit */
+ ht = consumer_data.stream_list_ht;
+
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ /* If this call fails, the stream is being used hence data pending. */
+ ret = stream_try_lock(stream);
+ if (!ret) {
+ goto data_pending;
+ }
+
+ /*
+ * A removed node from the hash table indicates that the stream has
+ * been deleted thus having a guarantee that the buffers are closed
+ * on the consumer side. However, data can still be transmitted
+ * over the network so don't skip the relayd check.
+ */
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (!ret) {
+ /*
+ * An empty output file is not valid. We need at least one packet
+ * generated per stream, even if it contains no event, so it
+ * contains at least one packet header.
+ */
+ if (stream->output_written == 0) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_pending;
+ }
+ /* Check the stream if there is data in the buffers. */
+ ret = data_pending(stream);
+ if (ret == 1) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_pending;
+ }
+ }
+
+ /* Relayd check */
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (stream->metadata_flag) {
+ ret = relayd_quiescent_control(&relayd->control_sock,
+ stream->relayd_stream_id);
+ } else {
+ ret = relayd_data_pending(&relayd->control_sock,
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret == 1) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_pending;
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_end_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto data_not_pending;
+ }
+ if (is_data_inflight) {
+ goto data_pending;
+ }
+ }
+
+ /*
+ * Finding _no_ node in the hash table and no inflight data means that the
+ * stream(s) have been removed thus data is guaranteed to be available for
+ * analysis from the trace files.
+ */
+
+data_not_pending:
+ /* Data is available to be read by a viewer. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 0;
+
+data_pending:
+ /* Data is still being extracted from buffers. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 1;
+}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}
+
+/*
+ * Send a channel status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_channel(int sock,
+ struct lttng_consumer_channel *channel)