call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
}
+/*
+ * Iterate over the relayd hash table and destroy each element. Finally,
+ * destroy the whole hash table.
+ */
+static void cleanup_relayd_ht(void)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ destroy_relayd(relayd);
+ }
+
+ lttng_ht_destroy(consumer_data.relayd_ht);
+
+ rcu_read_unlock();
+}
+
/*
* Update the end point status of all streams having the given network sequence
* index (relayd index).
}
/*
- * Close all the tracefiles and stream fds, should be called when all instances
- * are destroyed.
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
*/
void lttng_consumer_cleanup(void)
{
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+
+ cleanup_relayd_ht();
+
+ /*
+ * This HT contains streams that are freed by either the metadata thread or
+ * the data thread so we do *nothing* on the hash table and simply destroy
+ * it.
+ */
+ lttng_ht_destroy(consumer_data.stream_list_ht);
}
/*
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
{
int fd = -1, ret = -1;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
goto error;
}
+ /* We have the fds without error. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Copy socket information and received FD */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
+
+ /*
+ * Create a session on the relayd and store the returned id. No need to
+ * grab the socket lock since the relayd object is not yet visible.
+ */
+ ret = relayd_create_session(&relayd->control_sock,
+ &relayd->session_id);
+ if (ret < 0) {
+ goto error;
+ }
+
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
rcu_read_unlock();
return 1;
}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}