static struct lttng_ht *metadata_ht;
static struct lttng_ht *data_ht;
+/*
+ * This hash table contains the mapping between the session id of the sessiond
+ * and the relayd session id. Element of the ht are indexed by sessiond_id.
+ *
+ * Node can be added when a relayd communication is opened in the sessiond
+ * thread.
+ *
+ * Note that a session id of the session daemon is unique to a tracing session
+ * and not to a domain session. However, a domain session has one consumer
+ * which forces the 1-1 mapping between a consumer and a domain session (ex:
+ * UST). This means that we can't have duplicate in this ht.
+ */
+static struct lttng_ht *relayd_session_id_ht;
+
/*
* Notify a thread pipe to poll back again. This usually means that some global
* state has changed so we just send back the thread in a poll wait call.
{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
if (relayd == NULL) {
return;
DBG("Consumer destroy and close relayd socket pair");
+ lttng_ht_lookup(relayd_session_id_ht,
+ (void *)((unsigned long) relayd->session_id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ /* We assume the relayd is being or is destroyed */
+ return;
+ }
+
+ ret = lttng_ht_del(relayd_session_id_ht, &iter);
+ if (ret != 0) {
+ /* We assume the relayd is being or is destroyed */
+ return;
+ }
+
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
/*
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id)
{
int fd = -1, ret = -1;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_session_id *relayd_id_node;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
goto error;
}
+ /* We have the fds without error. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Copy socket information and received FD */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
+
+ /*
+ * Create a session on the relayd and store the returned id. No need to
+ * grab the socket lock since the relayd object is not yet visible.
+ */
+ ret = relayd_create_session(&relayd->control_sock,
+ &relayd->session_id);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Set up a relayd session id node. */
+ relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
+ if (!relayd_id_node) {
+ PERROR("zmalloc relayd id node");
+ goto error;
+ }
+
+ relayd_id_node->relayd_id = relayd->session_id;
+ relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
+
+ /* Indexed by session id of the session daemon. */
+ lttng_ht_node_init_ulong(&relayd_id_node->node,
+ relayd_id_node->sessiond_id);
+ rcu_read_lock();
+ lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
+ rcu_read_unlock();
+
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
rcu_read_unlock();
return 1;
}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}