Add the relayd create session command
[lttng-tools.git] / src / common / consumer.c
index c561b9bd2c935e5e275972aa8c8b9965b950671b..f4cfa82ce8dcc1179843a806a4d969f432b5391e 100644 (file)
@@ -234,6 +234,27 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
        call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
 }
 
+/*
+ * Iterate over the relayd hash table and destroy each element. Finally,
+ * destroy the whole hash table.
+ */
+static void cleanup_relayd_ht(void)
+{
+       struct lttng_ht_iter iter;
+       struct consumer_relayd_sock_pair *relayd;
+
+       rcu_read_lock();
+
+       cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+                       node.node) {
+               destroy_relayd(relayd);
+       }
+
+       lttng_ht_destroy(consumer_data.relayd_ht);
+
+       rcu_read_unlock();
+}
+
 /*
  * Update the end point status of all streams having the given network sequence
  * index (relayd index).
@@ -1003,8 +1024,8 @@ int lttng_consumer_send_error(
 }
 
 /*
- * Close all the tracefiles and stream fds, should be called when all instances
- * are destroyed.
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
  */
 void lttng_consumer_cleanup(void)
 {
@@ -1023,6 +1044,15 @@ void lttng_consumer_cleanup(void)
        rcu_read_unlock();
 
        lttng_ht_destroy(consumer_data.channel_ht);
+
+       cleanup_relayd_ht();
+
+       /*
+        * This HT contains streams that are freed by either the metadata thread or
+        * the data thread so we do *nothing* on the hash table and simply destroy
+        * it.
+        */
+       lttng_ht_destroy(consumer_data.stream_list_ht);
 }
 
 /*
@@ -2649,10 +2679,18 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
 {
        int fd = -1, ret = -1;
+       enum lttng_error_code ret_code = LTTNG_OK;
        struct consumer_relayd_sock_pair *relayd;
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
+       /* First send a status message before receiving the fds. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               goto error;
+       }
+
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd == NULL) {
@@ -2679,6 +2717,13 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                goto error;
        }
 
+       /* We have the fds without error. Send status back. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               goto error;
+       }
+
        /* Copy socket information and received FD */
        switch (sock_type) {
        case LTTNG_STREAM_CONTROL:
@@ -2698,6 +2743,17 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 
                /* Assign new file descriptor */
                relayd->control_sock.fd = fd;
+
+               /*
+                * Create a session on the relayd and store the returned id. No need to
+                * grab the socket lock since the relayd object is not yet visible.
+                */
+               ret = relayd_create_session(&relayd->control_sock,
+                               &relayd->session_id);
+               if (ret < 0) {
+                       goto error;
+               }
+
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
@@ -2883,3 +2939,17 @@ data_not_pending:
        rcu_read_unlock();
        return 1;
 }
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+       struct lttcomm_consumer_status_msg msg;
+
+       msg.ret_code = ret_code;
+
+       return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}
This page took 0.024322 seconds and 4 git commands to generate.