Fix: steal channel key in the consumer to avoid race
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 764d616a24e692c77f8e717c37ddcf3841d556fb..06c1c15134cf63af055729b6b28f75d0489962ce 100644 (file)
@@ -62,6 +62,7 @@
 #include "stream.h"
 #include "session.h"
 #include "ctf-trace.h"
+#include "connection.h"
 
 static struct lttng_uri *live_uri;
 
@@ -69,7 +70,7 @@ static struct lttng_uri *live_uri;
  * This pipe is used to inform the worker thread that a command is queued and
  * ready to be processed.
  */
-static int live_relay_cmd_pipe[2] = { -1, -1 };
+static int live_conn_pipe[2] = { -1, -1 };
 
 /* Shared between threads */
 static int live_dispatch_thread_exit;
@@ -84,7 +85,7 @@ static pthread_t live_worker_thread;
  * The live_thread_listener and live_thread_dispatcher communicate with this
  * queue.
  */
-static struct relay_cmd_queue viewer_cmd_queue;
+static struct relay_conn_queue viewer_conn_queue;
 
 static uint64_t last_relay_viewer_session_id;
 
@@ -152,32 +153,33 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
 }
 
 /*
- * Atomically check if new streams got added in the session since the last
- * check and reset the flag to 0.
+ * Atomically check if new streams got added in one of the sessions attached
+ * and reset the flag to 0.
  *
  * Returns 1 if new streams got added, 0 if nothing changed, a negative value
  * on error.
  */
 static
-int check_new_streams(uint64_t session_id, struct lttng_ht *sessions_ht)
+int check_new_streams(struct relay_connection *conn)
 {
-       int ret;
-       unsigned long current_val;
        struct relay_session *session;
+       unsigned long current_val;
+       int ret = 0;
 
-       assert(sessions_ht);
-
-       session = session_find_by_id(sessions_ht, session_id);
-       if (!session) {
-               DBG("Relay session %" PRIu64 " not found", session_id);
-               ret = -1;
-               goto error;
+       if (!conn->viewer_session) {
+               goto end;
+       }
+       cds_list_for_each_entry(session,
+                       &conn->viewer_session->sessions_head,
+                       viewer_session_list) {
+               current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
+               ret = current_val;
+               if (ret == 1) {
+                       goto end;
+               }
        }
 
-       current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
-       ret = current_val;
-
-error:
+end:
        return ret;
 }
 
@@ -348,14 +350,14 @@ void stop_threads(void)
 
        /* Stopping all threads */
        DBG("Terminating all live threads");
-       ret = notify_thread_pipe(live_conn_pipe[1]);
+       ret = notify_thread_pipe(thread_quit_pipe[1]);
        if (ret < 0) {
                ERR("write error on thread quit pipe");
        }
 
        /* Dispatch thread */
        CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
-       futex_nto1_wake(&viewer_cmd_queue.futex);
+       futex_nto1_wake(&viewer_conn_queue.futex);
 }
 
 /*
@@ -377,7 +379,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size)
        }
 
        /* Add quit pipe */
-       ret = lttng_poll_add(events, live_conn_pipe[0], LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
        }
@@ -394,9 +396,9 @@ error:
  * Return 1 if it was triggered else 0;
  */
 static
-int check_live_conn_pipe(int fd, uint32_t events)
+int check_thread_quit_pipe(int fd, uint32_t events)
 {
-       if (fd == live_conn_pipe[0] && (events & LPOLLIN)) {
+       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
                return 1;
        }
 
@@ -451,7 +453,6 @@ static
 void *thread_listener(void *data)
 {
        int i, ret, pollfd, err = -1;
-       int val = 1;
        uint32_t revents, nb_fd;
        struct lttng_poll_event events;
        struct lttcomm_sock *live_control_sock;
@@ -514,7 +515,7 @@ restart:
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_live_conn_pipe(pollfd, revents);
+                       ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
@@ -528,43 +529,41 @@ restart:
                                 * Get allocated in this thread, enqueued to a global queue,
                                 * dequeued and freed in the worker thread.
                                 */
-                               struct relay_command *relay_cmd;
+                               int val = 1;
+                               struct relay_connection *new_conn;
                                struct lttcomm_sock *newsock;
 
-                               relay_cmd = zmalloc(sizeof(*relay_cmd));
-                               if (!relay_cmd) {
-                                       PERROR("relay command zmalloc");
+                               new_conn = connection_create();
+                               if (!new_conn) {
                                        goto error;
                                }
 
-                               assert(pollfd == live_control_sock->fd);
                                newsock = live_control_sock->ops->accept(live_control_sock);
                                if (!newsock) {
                                        PERROR("accepting control sock");
-                                       free(relay_cmd);
+                                       connection_free(new_conn);
                                        goto error;
                                }
                                DBG("Relay viewer connection accepted socket %d", newsock->fd);
+
                                ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
-                                               sizeof(int));
+                                               sizeof(val));
                                if (ret < 0) {
                                        PERROR("setsockopt inet");
                                        lttcomm_destroy_sock(newsock);
-                                       free(relay_cmd);
+                                       connection_free(new_conn);
                                        goto error;
                                }
-                               relay_cmd->sock = newsock;
+                               new_conn->sock = newsock;
 
-                               /*
-                                * Lock free enqueue the request.
-                                */
-                               cds_wfq_enqueue(&viewer_cmd_queue.queue, &relay_cmd->node);
+                               /* Enqueue request for the dispatcher thread. */
+                               cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode);
 
                                /*
-                                * Wake the dispatch queue futex. Implicit memory
-                                * barrier with the exchange in cds_wfq_enqueue.
+                                * Wake the dispatch queue futex. Implicit memory barrier with
+                                * the exchange in cds_wfq_enqueue.
                                 */
-                               futex_nto1_wake(&viewer_cmd_queue.futex);
+                               futex_nto1_wake(&viewer_conn_queue.futex);
                        }
                }
        }
@@ -602,7 +601,7 @@ void *thread_dispatcher(void *data)
        int err = -1;
        ssize_t ret;
        struct cds_wfq_node *node;
-       struct relay_command *relay_cmd = NULL;
+       struct relay_connection *conn = NULL;
 
        DBG("[thread] Live viewer relay dispatcher started");
 
@@ -618,41 +617,39 @@ void *thread_dispatcher(void *data)
                health_code_update();
 
                /* Atomically prepare the queue futex */
-               futex_nto1_prepare(&viewer_cmd_queue.futex);
+               futex_nto1_prepare(&viewer_conn_queue.futex);
 
                do {
                        health_code_update();
 
                        /* Dequeue commands */
-                       node = cds_wfq_dequeue_blocking(&viewer_cmd_queue.queue);
+                       node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue);
                        if (node == NULL) {
                                DBG("Woken up but nothing in the live-viewer "
                                                "relay command queue");
                                /* Continue thread execution */
                                break;
                        }
-
-                       relay_cmd = caa_container_of(node, struct relay_command, node);
+                       conn = caa_container_of(node, struct relay_connection, qnode);
                        DBG("Dispatching viewer request waiting on sock %d",
-                                       relay_cmd->sock->fd);
+                                       conn->sock->fd);
 
                        /*
                         * Inform worker thread of the new request. This call is blocking
                         * so we can be assured that the data will be read at some point in
                         * time or wait to the end of the world :)
                         */
-                       ret = lttng_write(live_relay_cmd_pipe[1], relay_cmd,
-                                       sizeof(*relay_cmd));
-                       free(relay_cmd);
-                       if (ret < sizeof(struct relay_command)) {
-                               PERROR("write cmd pipe");
+                       ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
+                       if (ret < 0) {
+                               PERROR("write conn pipe");
+                               connection_destroy(conn);
                                goto error;
                        }
                } while (node != NULL);
 
                /* Futex wait on queue. Blocking call on futex() */
                health_poll_entry();
-               futex_nto1_wait(&viewer_cmd_queue.futex);
+               futex_nto1_wait(&viewer_conn_queue.futex);
                health_poll_exit();
        }
 
@@ -677,26 +674,27 @@ error_testpoint:
  * Return 0 on success or else negative value.
  */
 static
-int viewer_connect(struct relay_command *cmd)
+int viewer_connect(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_connect reply, msg;
 
-       assert(cmd);
+       assert(conn);
 
-       cmd->version_check_done = 1;
+       conn->version_check_done = 1;
 
        health_code_update();
 
        DBG("Viewer is establishing a connection to the relayd.");
 
-       ret = recv_request(cmd->sock, &msg, sizeof(msg));
+       ret = recv_request(conn->sock, &msg, sizeof(msg));
        if (ret < 0) {
                goto end;
        }
 
        health_code_update();
 
+       memset(&reply, 0, sizeof(reply));
        reply.major = RELAYD_VERSION_COMM_MAJOR;
        reply.minor = RELAYD_VERSION_COMM_MINOR;
 
@@ -708,18 +706,18 @@ int viewer_connect(struct relay_command *cmd)
                goto end;
        }
 
-       cmd->major = reply.major;
+       conn->major = reply.major;
        /* We adapt to the lowest compatible version */
        if (reply.minor <= be32toh(msg.minor)) {
-               cmd->minor = reply.minor;
+               conn->minor = reply.minor;
        } else {
-               cmd->minor = be32toh(msg.minor);
+               conn->minor = be32toh(msg.minor);
        }
 
-       if (be32toh(msg.type) == VIEWER_CLIENT_COMMAND) {
-               cmd->type = RELAY_VIEWER_COMMAND;
-       } else if (be32toh(msg.type) == VIEWER_CLIENT_NOTIFICATION) {
-               cmd->type = RELAY_VIEWER_NOTIFICATION;
+       if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_COMMAND) {
+               conn->type = RELAY_VIEWER_COMMAND;
+       } else if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_NOTIFICATION) {
+               conn->type = RELAY_VIEWER_NOTIFICATION;
        } else {
                ERR("Unknown connection type : %u", be32toh(msg.type));
                ret = -1;
@@ -728,20 +726,20 @@ int viewer_connect(struct relay_command *cmd)
 
        reply.major = htobe32(reply.major);
        reply.minor = htobe32(reply.minor);
-       if (cmd->type == RELAY_VIEWER_COMMAND) {
+       if (conn->type == RELAY_VIEWER_COMMAND) {
                reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
        }
 
        health_code_update();
 
-       ret = send_response(cmd->sock, &reply, sizeof(reply));
+       ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end;
        }
 
        health_code_update();
 
-       DBG("Version check done using protocol %u.%u", cmd->major, cmd->minor);
+       DBG("Version check done using protocol %u.%u", conn->major, conn->minor);
        ret = 0;
 
 end:
@@ -754,8 +752,7 @@ end:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_list_sessions(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_list_sessions(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_list_sessions session_list;
@@ -768,19 +765,20 @@ int viewer_list_sessions(struct relay_command *cmd,
        DBG("List sessions received");
 
        rcu_read_lock();
-       cds_lfht_count_nodes(sessions_ht->ht, &approx_before, &count, &approx_after);
+       cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count,
+                       &approx_after);
        session_list.sessions_count = htobe32(count);
 
        health_code_update();
 
-       ret = send_response(cmd->sock, &session_list, sizeof(session_list));
+       ret = send_response(conn->sock, &session_list, sizeof(session_list));
        if (ret < 0) {
                goto end_unlock;
        }
 
        health_code_update();
 
-       cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
+       cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session,
                        session_n.node) {
                health_code_update();
 
@@ -795,7 +793,7 @@ int viewer_list_sessions(struct relay_command *cmd,
 
                health_code_update();
 
-               ret = send_response(cmd->sock, &send_session, sizeof(send_session));
+               ret = send_response(conn->sock, &send_session, sizeof(send_session));
                if (ret < 0) {
                        goto end_unlock;
                }
@@ -813,58 +811,144 @@ end:
        return ret;
 }
 
+/*
+ * Check if a connection is attached to a session.
+ * Return 1 if attached, 0 if not attached, a negative value on error.
+ */
+static
+int session_attached(struct relay_connection *conn, uint64_t session_id)
+{
+       struct relay_session *session;
+       int found = 0;
+
+       if (!conn->viewer_session) {
+               goto end;
+       }
+       cds_list_for_each_entry(session,
+                       &conn->viewer_session->sessions_head,
+                       viewer_session_list) {
+               if (session->id == session_id) {
+                       found = 1;
+                       goto end;
+               }
+       }
+
+end:
+       return found;
+}
+
+/*
+ * Delete all streams for a specific session ID.
+ */
+static void destroy_viewer_streams_by_session(struct relay_session *session)
+{
+       struct relay_viewer_stream *stream;
+       struct lttng_ht_iter iter;
+
+       assert(session);
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
+                       stream_n.node) {
+               struct ctf_trace *ctf_trace;
+
+               health_code_update();
+               if (stream->session_id != session->id) {
+                       continue;
+               }
+
+               ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+                               stream->path_name);
+               assert(ctf_trace);
+
+               viewer_stream_delete(stream);
+
+               if (stream->metadata_flag) {
+                       ctf_trace->metadata_sent = 0;
+                       ctf_trace->viewer_metadata_stream = NULL;
+               }
+
+               viewer_stream_destroy(ctf_trace, stream);
+       }
+       rcu_read_unlock();
+}
+
+static void try_destroy_streams(struct relay_session *session)
+{
+       struct ctf_trace *ctf_trace;
+       struct lttng_ht_iter iter;
+
+       assert(session);
+
+       cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+                       node.node) {
+               /* Attempt to destroy the ctf trace of that session. */
+               ctf_trace_try_destroy(session, ctf_trace);
+       }
+}
+
+/*
+ * Cleanup a session.
+ */
+static void cleanup_session(struct relay_connection *conn,
+               struct relay_session *session)
+{
+       /*
+        * Very important that this is done before destroying the session so we
+        * can put back every viewer stream reference from the ctf_trace.
+        */
+       destroy_viewer_streams_by_session(session);
+       try_destroy_streams(session);
+       cds_list_del(&session->viewer_session_list);
+       session_viewer_try_destroy(conn->sessions_ht, session);
+}
+
 /*
  * Send the viewer the list of current sessions.
  */
 static
-int viewer_get_new_streams(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_get_new_streams(struct relay_connection *conn)
 {
        int ret, send_streams = 0;
        uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0;
        struct lttng_viewer_new_streams_request request;
        struct lttng_viewer_new_streams_response response;
        struct relay_session *session;
+       uint64_t session_id;
 
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        DBG("Get new streams received");
 
        health_code_update();
 
        /* Receive the request from the connected client. */
-       ret = recv_request(cmd->sock, &request, sizeof(request));
+       ret = recv_request(conn->sock, &request, sizeof(request));
        if (ret < 0) {
                goto error;
        }
+       session_id = be64toh(request.session_id);
 
        health_code_update();
 
        rcu_read_lock();
-       session = session_find_by_id(sessions_ht, be64toh(request.session_id));
+       session = session_find_by_id(conn->sessions_ht, session_id);
        if (!session) {
-               DBG("Relay session %" PRIu64 " not found",
-                               be64toh(request.session_id));
-               response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+               DBG("Relay session %" PRIu64 " not found", session_id);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply;
        }
 
-       if (cmd->session_id == session->id) {
-               /* We confirmed the viewer is asking for the same session. */
-               send_streams = 1;
-               response.status = htobe32(VIEWER_NEW_STREAMS_OK);
-       } else {
+       if (!session_attached(conn, session_id)) {
                send_streams = 0;
-               response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
                goto send_reply;
        }
 
-       if (!send_streams) {
-               goto send_reply;
-       }
+       send_streams = 1;
+       response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
-       ret = make_viewer_streams(session, VIEWER_SEEK_LAST, NULL, &nb_unsent,
+       ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
                        &nb_created);
        if (ret < 0) {
                goto end_unlock;
@@ -873,9 +957,26 @@ int viewer_get_new_streams(struct relay_command *cmd,
        nb_streams = nb_created + nb_unsent;
        response.streams_count = htobe32(nb_streams);
 
+       /*
+        * If the session is closed and we have no new streams to send,
+        * it means that the viewer has already received the whole trace
+        * for this session and should now close it.
+        */
+       if (nb_streams == 0 && session->close_flag) {
+               send_streams = 0;
+               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+               /*
+                * Remove the session from the attached list of the connection
+                * and try to destroy it.
+                */
+               cds_list_del(&session->viewer_session_list);
+               cleanup_session(conn, session);
+               goto send_reply;
+       }
+
 send_reply:
        health_code_update();
-       ret = send_response(cmd->sock, &response, sizeof(response));
+       ret = send_response(conn->sock, &response, sizeof(response));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -894,7 +995,7 @@ send_reply:
         * Send stream and *DON'T* ignore the sent flag so every viewer streams
         * that were not sent from that point will be sent to the viewer.
         */
-       ret = send_viewer_streams(cmd->sock, session, 0);
+       ret = send_viewer_streams(conn->sock, session, 0);
        if (ret < 0) {
                goto end_unlock;
        }
@@ -909,8 +1010,7 @@ error:
  * Send the viewer the list of current sessions.
  */
 static
-int viewer_attach_session(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_attach_session(struct relay_connection *conn)
 {
        int send_streams = 0;
        ssize_t ret;
@@ -920,25 +1020,31 @@ int viewer_attach_session(struct relay_command *cmd,
        struct lttng_viewer_attach_session_response response;
        struct relay_session *session;
 
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        health_code_update();
 
        /* Receive the request from the connected client. */
-       ret = recv_request(cmd->sock, &request, sizeof(request));
+       ret = recv_request(conn->sock, &request, sizeof(request));
        if (ret < 0) {
                goto error;
        }
 
        health_code_update();
 
+       if (!conn->viewer_session) {
+               DBG("Client trying to attach before creating a live viewer session");
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
+               goto send_reply;
+       }
+
        rcu_read_lock();
-       session = session_find_by_id(sessions_ht, be64toh(request.session_id));
+       session = session_find_by_id(conn->sessions_ht,
+                       be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
                                be64toh(request.session_id));
-               response.status = htobe32(VIEWER_ATTACH_UNK);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
        session_viewer_attach(session);
@@ -946,36 +1052,32 @@ int viewer_attach_session(struct relay_command *cmd,
 
        if (uatomic_read(&session->viewer_refcount) > 1) {
                DBG("Already a viewer attached");
-               response.status = htobe32(VIEWER_ATTACH_ALREADY);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
                session_viewer_detach(session);
                goto send_reply;
        } else if (session->live_timer == 0) {
                DBG("Not live session");
-               response.status = htobe32(VIEWER_ATTACH_NOT_LIVE);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
                goto send_reply;
        } else {
                send_streams = 1;
-               response.status = htobe32(VIEWER_ATTACH_OK);
-               cmd->session_id = session->id;
-               cmd->session = session;
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
+               cds_list_add(&session->viewer_session_list,
+                               &conn->viewer_session->sessions_head);
        }
 
        switch (be32toh(request.seek)) {
-       case VIEWER_SEEK_BEGINNING:
-       case VIEWER_SEEK_LAST:
+       case LTTNG_VIEWER_SEEK_BEGINNING:
+       case LTTNG_VIEWER_SEEK_LAST:
                seek_type = be32toh(request.seek);
                break;
        default:
                ERR("Wrong seek parameter");
-               response.status = htobe32(VIEWER_ATTACH_SEEK_ERR);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
                send_streams = 0;
                goto send_reply;
        }
 
-       if (!send_streams) {
-               goto send_reply;
-       }
-
        ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
        if (ret < 0) {
                goto end_unlock;
@@ -984,7 +1086,7 @@ int viewer_attach_session(struct relay_command *cmd,
 
 send_reply:
        health_code_update();
-       ret = send_response(cmd->sock, &response, sizeof(response));
+       ret = send_response(conn->sock, &response, sizeof(response));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1000,7 +1102,7 @@ send_reply:
        }
 
        /* Send stream and ignore the sent flag. */
-       ret = send_viewer_streams(cmd->sock, session, 1);
+       ret = send_viewer_streams(conn->sock, session, 1);
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1017,8 +1119,7 @@ error:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_get_next_index(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_get_next_index(struct relay_connection *conn)
 {
        int ret;
        struct lttng_viewer_get_next_index request_index;
@@ -1029,28 +1130,27 @@ int viewer_get_next_index(struct relay_command *cmd,
        struct ctf_trace *ctf_trace;
        struct relay_session *session;
 
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        DBG("Viewer get next index");
 
        health_code_update();
 
-       ret = recv_request(cmd->sock, &request_index, sizeof(request_index));
+       ret = recv_request(conn->sock, &request_index, sizeof(request_index));
        if (ret < 0) {
                goto end;
        }
        health_code_update();
 
        rcu_read_lock();
-       session = session_find_by_id(sessions_ht, cmd->session_id);
-       if (!session) {
+       vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
+       if (!vstream) {
                ret = -1;
                goto end_unlock;
        }
 
-       vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
-       if (!vstream) {
+       session = session_find_by_id(conn->sessions_ht, vstream->session_id);
+       if (!session) {
                ret = -1;
                goto end_unlock;
        }
@@ -1064,7 +1164,7 @@ int viewer_get_next_index(struct relay_command *cmd,
         * The viewer should not ask for index on metadata stream.
         */
        if (vstream->metadata_flag) {
-               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto send_reply;
        }
 
@@ -1077,10 +1177,10 @@ int viewer_get_next_index(struct relay_command *cmd,
                         * The index is created only when the first data packet arrives, it
                         * might not be ready at the beginning of the session
                         */
-                       viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                        goto send_reply;
                } else if (ret < 0) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                        goto send_reply;
                }
                vstream->index_read_fd = ret;
@@ -1089,27 +1189,29 @@ int viewer_get_next_index(struct relay_command *cmd,
        rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
        assert(rstream);
 
+       pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
        if (!rstream->close_flag) {
                if (vstream->abort_flag) {
                        /* Rotate on abort (overwrite). */
                        DBG("Viewer rotate because of overwrite");
                        ret = viewer_stream_rotate(vstream, rstream);
                        if (ret < 0) {
+                               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                goto end_unlock;
                        } else if (ret == 1) {
-                               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                                viewer_stream_delete(vstream);
                                viewer_stream_destroy(ctf_trace, vstream);
+                               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                goto send_reply;
                        }
                        /* ret == 0 means successful so we continue. */
                }
 
-               pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
                if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
                        if (rstream->beacon_ts_end != -1ULL &&
                                vstream->last_sent_index == rstream->total_index_received) {
-                               viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                                viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
                                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                goto send_reply;
@@ -1122,28 +1224,29 @@ int viewer_get_next_index(struct relay_command *cmd,
                                 */
                                pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                                /* No new index to send, retry later. */
-                               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                                goto send_reply;
                        }
                }
-               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
        } else if (rstream->close_flag && vstream->close_write_flag &&
                        vstream->total_index_received == vstream->last_sent_index) {
                /* Last index sent and current tracefile closed in write */
-               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                viewer_stream_delete(vstream);
                viewer_stream_destroy(ctf_trace, vstream);
+               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                goto send_reply;
        } else {
                vstream->close_write_flag = 1;
        }
+       pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
 
        if (!ctf_trace->metadata_received ||
                        ctf_trace->metadata_received > ctf_trace->metadata_sent) {
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
        }
 
-       ret = check_new_streams(vstream->session_id, sessions_ht);
+       ret = check_new_streams(conn);
        if (ret < 0) {
                goto end_unlock;
        } else if (ret == 1) {
@@ -1155,13 +1258,13 @@ int viewer_get_next_index(struct relay_command *cmd,
                /*
                 * The file is being overwritten by the writer, we cannot * use it.
                 */
-               viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                pthread_mutex_unlock(&vstream->overwrite_lock);
                ret = viewer_stream_rotate(vstream, rstream);
                if (ret < 0) {
                        goto end_unlock;
                } else if (ret == 1) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        viewer_stream_delete(vstream);
                        viewer_stream_destroy(ctf_trace, vstream);
                        goto send_reply;
@@ -1173,28 +1276,33 @@ int viewer_get_next_index(struct relay_command *cmd,
                        sizeof(packet_index));
        pthread_mutex_unlock(&vstream->overwrite_lock);
        if (ret < sizeof(packet_index)) {
+               unsigned int close_write_flag;
+
+               pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
+               close_write_flag = vstream->close_write_flag;
+               pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
                /*
                 * The tracefile is closed in write, so we read up to EOF.
                 */
-               if (vstream->close_write_flag == 1) {
-                       viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+               if (close_write_flag == 1) {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                        /* Rotate on normal EOF */
                        ret = viewer_stream_rotate(vstream, rstream);
                        if (ret < 0) {
                                goto end_unlock;
                        } else if (ret == 1) {
-                               viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                                viewer_stream_delete(vstream);
                                viewer_stream_destroy(ctf_trace, vstream);
                                goto send_reply;
                        }
                } else {
                        PERROR("Relay reading index file %d", vstream->index_read_fd);
-                       viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                }
                goto send_reply;
        } else {
-               viewer_index.status = htobe32(VIEWER_INDEX_OK);
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
                vstream->last_sent_index++;
        }
 
@@ -1213,7 +1321,7 @@ send_reply:
        viewer_index.flags = htobe32(viewer_index.flags);
        health_code_update();
 
-       ret = send_response(cmd->sock, &viewer_index, sizeof(viewer_index));
+       ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1235,8 +1343,7 @@ end:
  * Return 0 on success or else a negative value.
  */
 static
-int viewer_get_packet(struct relay_command *cmd,
-               struct lttng_ht *sessions_ht)
+int viewer_get_packet(struct relay_connection *conn)
 {
        int ret, send_data = 0;
        char *data = NULL;
@@ -1245,15 +1352,16 @@ int viewer_get_packet(struct relay_command *cmd,
        struct lttng_viewer_get_packet get_packet_info;
        struct lttng_viewer_trace_packet reply;
        struct relay_viewer_stream *stream;
+       struct relay_session *session;
        struct ctf_trace *ctf_trace;
 
-       assert(cmd);
+       assert(conn);
 
        DBG2("Relay get data packet");
 
        health_code_update();
 
-       ret = recv_request(cmd->sock, &get_packet_info, sizeof(get_packet_info));
+       ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info));
        if (ret < 0) {
                goto end;
        }
@@ -1268,7 +1376,13 @@ int viewer_get_packet(struct relay_command *cmd,
                goto error;
        }
 
-       ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+       session = session_find_by_id(conn->sessions_ht, stream->session_id);
+       if (!session) {
+               ret = -1;
+               goto error;
+       }
+
+       ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
                        stream->path_name);
        assert(ctf_trace);
 
@@ -1301,16 +1415,16 @@ int viewer_get_packet(struct relay_command *cmd,
 
        if (!ctf_trace->metadata_received ||
                        ctf_trace->metadata_received > ctf_trace->metadata_sent) {
-               reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
                goto send_reply;
        }
 
-       ret = check_new_streams(stream->session_id, sessions_ht);
+       ret = check_new_streams(conn);
        if (ret < 0) {
                goto end_unlock;
        } else if (ret == 1) {
-               reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
                goto send_reply;
        }
@@ -1332,7 +1446,7 @@ int viewer_get_packet(struct relay_command *cmd,
                        PERROR("lseek");
                        goto error;
                }
-               reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
                goto send_reply;
        }
        read_len = lttng_read(stream->read_fd, data, len);
@@ -1347,24 +1461,24 @@ int viewer_get_packet(struct relay_command *cmd,
                                        be64toh(get_packet_info.offset));
                        goto error;
                } else {
-                       reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+                       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
                        goto send_reply;
                }
        }
-       reply.status = htobe32(VIEWER_GET_PACKET_OK);
+       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
        reply.len = htobe32(len);
        send_data = 1;
        goto send_reply;
 
 error:
-       reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
        reply.flags = htobe32(reply.flags);
 
        health_code_update();
 
-       ret = send_response(cmd->sock, &reply, sizeof(reply));
+       ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end_unlock;
        }
@@ -1372,7 +1486,7 @@ send_reply:
 
        if (send_data) {
                health_code_update();
-               ret = send_response(cmd->sock, data, len);
+               ret = send_response(conn->sock, data, len);
                if (ret < 0) {
                        goto end_unlock;
                }
@@ -1396,7 +1510,7 @@ end:
  * Return 0 on success else a negative value.
  */
 static
-int viewer_get_metadata(struct relay_command *cmd)
+int viewer_get_metadata(struct relay_connection *conn)
 {
        int ret = 0;
        ssize_t read_len;
@@ -1406,14 +1520,15 @@ int viewer_get_metadata(struct relay_command *cmd)
        struct lttng_viewer_metadata_packet reply;
        struct relay_viewer_stream *stream;
        struct ctf_trace *ctf_trace;
+       struct relay_session *session;
 
-       assert(cmd);
+       assert(conn);
 
        DBG("Relay get metadata");
 
        health_code_update();
 
-       ret = recv_request(cmd->sock, &request, sizeof(request));
+       ret = recv_request(conn->sock, &request, sizeof(request));
        if (ret < 0) {
                goto end;
        }
@@ -1426,14 +1541,20 @@ int viewer_get_metadata(struct relay_command *cmd)
                goto error;
        }
 
-       ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+       session = session_find_by_id(conn->sessions_ht, stream->session_id);
+       if (!session) {
+               ret = -1;
+               goto error;
+       }
+
+       ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
                        stream->path_name);
        assert(ctf_trace);
        assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
 
        len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
        if (len == 0) {
-               reply.status = htobe32(VIEWER_NO_NEW_METADATA);
+               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                goto send_reply;
        }
 
@@ -1467,22 +1588,22 @@ int viewer_get_metadata(struct relay_command *cmd)
                goto error;
        }
        ctf_trace->metadata_sent += read_len;
-       reply.status = htobe32(VIEWER_METADATA_OK);
+       reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
        goto send_reply;
 
 error:
-       reply.status = htobe32(VIEWER_METADATA_ERR);
+       reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
 
 send_reply:
        health_code_update();
-       ret = send_response(cmd->sock, &reply, sizeof(reply));
+       ret = send_response(conn->sock, &reply, sizeof(reply));
        if (ret < 0) {
                goto end_unlock;
        }
        health_code_update();
 
        if (len > 0) {
-               ret = send_response(cmd->sock, data, len);
+               ret = send_response(conn->sock, data, len);
                if (ret < 0) {
                        goto end_unlock;
                }
@@ -1500,16 +1621,52 @@ end:
        return ret;
 }
 
+/*
+ * Create a viewer session.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static
+int viewer_create_session(struct relay_connection *conn)
+{
+       int ret;
+       struct lttng_viewer_create_session_response resp;
+
+       DBG("Viewer create session received");
+
+       resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
+       conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
+       if (!conn->viewer_session) {
+               ERR("Allocation viewer session");
+               resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
+               goto send_reply;
+       }
+       CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
+
+send_reply:
+       health_code_update();
+       ret = send_response(conn->sock, &resp, sizeof(resp));
+       if (ret < 0) {
+               goto end;
+       }
+       health_code_update();
+       ret = 0;
+
+end:
+       return ret;
+}
+
+
 /*
  * live_relay_unknown_command: send -1 if received unknown command
  */
 static
-void live_relay_unknown_command(struct relay_command *cmd)
+void live_relay_unknown_command(struct relay_connection *conn)
 {
        struct lttcomm_relayd_generic_reply reply;
 
        reply.ret_code = htobe32(LTTNG_ERR_UNK);
-       (void) send_response(cmd->sock, &reply, sizeof(reply));
+       (void) send_response(conn->sock, &reply, sizeof(reply));
 }
 
 /*
@@ -1517,14 +1674,13 @@ void live_relay_unknown_command(struct relay_command *cmd)
  */
 static
 int process_control(struct lttng_viewer_cmd *recv_hdr,
-               struct relay_command *cmd, struct lttng_ht *sessions_ht)
+               struct relay_connection *conn)
 {
        int ret = 0;
        uint32_t msg_value;
 
        assert(recv_hdr);
-       assert(cmd);
-       assert(sessions_ht);
+       assert(conn);
 
        msg_value = be32toh(recv_hdr->cmd);
 
@@ -1532,37 +1688,40 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
         * Make sure we've done the version check before any command other then a
         * new client connection.
         */
-       if (msg_value != VIEWER_CONNECT && !cmd->version_check_done) {
-               ERR("Viewer cmd value %" PRIu32 " before version check", msg_value);
+       if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
+               ERR("Viewer conn value %" PRIu32 " before version check", msg_value);
                ret = -1;
                goto end;
        }
 
        switch (msg_value) {
-       case VIEWER_CONNECT:
-               ret = viewer_connect(cmd);
+       case LTTNG_VIEWER_CONNECT:
+               ret = viewer_connect(conn);
                break;
-       case VIEWER_LIST_SESSIONS:
-               ret = viewer_list_sessions(cmd, sessions_ht);
+       case LTTNG_VIEWER_LIST_SESSIONS:
+               ret = viewer_list_sessions(conn);
                break;
-       case VIEWER_ATTACH_SESSION:
-               ret = viewer_attach_session(cmd, sessions_ht);
+       case LTTNG_VIEWER_ATTACH_SESSION:
+               ret = viewer_attach_session(conn);
                break;
-       case VIEWER_GET_NEXT_INDEX:
-               ret = viewer_get_next_index(cmd, sessions_ht);
+       case LTTNG_VIEWER_GET_NEXT_INDEX:
+               ret = viewer_get_next_index(conn);
                break;
-       case VIEWER_GET_PACKET:
-               ret = viewer_get_packet(cmd, sessions_ht);
+       case LTTNG_VIEWER_GET_PACKET:
+               ret = viewer_get_packet(conn);
                break;
-       case VIEWER_GET_METADATA:
-               ret = viewer_get_metadata(cmd);
+       case LTTNG_VIEWER_GET_METADATA:
+               ret = viewer_get_metadata(conn);
                break;
-       case VIEWER_GET_NEW_STREAMS:
-               ret = viewer_get_new_streams(cmd, sessions_ht);
+       case LTTNG_VIEWER_GET_NEW_STREAMS:
+               ret = viewer_get_new_streams(conn);
+               break;
+       case LTTNG_VIEWER_CREATE_SESSION:
+               ret = viewer_create_session(conn);
                break;
        default:
                ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
-               live_relay_unknown_command(cmd);
+               live_relay_unknown_command(conn);
                ret = -1;
                goto end;
        }
@@ -1572,13 +1731,13 @@ end:
 }
 
 static
-void cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
+void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 {
        int ret;
 
        assert(events);
 
-       lttng_poll_del(events, pollfd);
+       (void) lttng_poll_del(events, pollfd);
 
        ret = close(pollfd);
        if (ret < 0) {
@@ -1587,146 +1746,35 @@ void cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
 }
 
 /*
- * Create and add connection to the given hash table.
+ * Delete and destroy a connection.
  *
- * Return poll add value or else -1 on error.
+ * RCU read side lock MUST be acquired.
  */
-static
-int add_connection(int fd, struct lttng_poll_event *events,
-               struct lttng_ht *relay_connections_ht)
+static void destroy_connection(struct lttng_ht *relay_connections_ht,
+               struct relay_connection *conn)
 {
-       int ret;
-       struct relay_command *relay_connection;
+       struct relay_session *session, *tmp_session;
 
-       assert(events);
        assert(relay_connections_ht);
+       assert(conn);
 
-       relay_connection = zmalloc(sizeof(struct relay_command));
-       if (relay_connection == NULL) {
-               PERROR("Relay command zmalloc");
-               goto error;
-       }
-
-       ret = lttng_read(fd, relay_connection, sizeof(*relay_connection));
-       if (ret < sizeof(*relay_connection)) {
-               PERROR("read relay cmd pipe");
-               goto error_read;
-       }
-
-       lttng_ht_node_init_ulong(&relay_connection->sock_n,
-                       (unsigned long) relay_connection->sock->fd);
-       rcu_read_lock();
-       lttng_ht_add_unique_ulong(relay_connections_ht,
-                       &relay_connection->sock_n);
-       rcu_read_unlock();
-
-       return lttng_poll_add(events, relay_connection->sock->fd,
-                       LPOLLIN | LPOLLRDHUP);
-
-error_read:
-       free(relay_connection);
-error:
-       return -1;
-}
-
-static
-void deferred_free_connection(struct rcu_head *head)
-{
-       struct relay_command *relay_connection =
-               caa_container_of(head, struct relay_command, rcu_node);
-
-       lttcomm_destroy_sock(relay_connection->sock);
-       free(relay_connection);
-}
-
-/*
- * Delete all streams for a specific session ID.
- */
-static void destroy_viewer_streams_by_session(struct relay_session *session)
-{
-       struct relay_viewer_stream *stream;
-       struct lttng_ht_iter iter;
-
-       assert(session);
-
-       rcu_read_lock();
-       cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
-                       stream_n.node) {
-               struct ctf_trace *ctf_trace;
-
-               health_code_update();
-               if (stream->session_id != session->id) {
-                       continue;
-               }
+       connection_delete(relay_connections_ht, conn);
 
-               ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-                               stream->path_name);
-               assert(ctf_trace);
-
-               viewer_stream_delete(stream);
-
-               if (stream->metadata_flag) {
-                       ctf_trace->metadata_sent = 0;
-                       ctf_trace->viewer_metadata_stream = NULL;
-               }
-
-               viewer_stream_destroy(ctf_trace, stream);
-       }
-       rcu_read_unlock();
-}
-
-static void try_destroy_streams(struct relay_session *session)
-{
-       struct ctf_trace *ctf_trace;
-       struct lttng_ht_iter iter;
-
-       assert(session);
-
-       cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
-                       node.node) {
-               /* Attempt to destroy the ctf trace of that session. */
-               ctf_trace_try_destroy(session, ctf_trace);
+       if (!conn->viewer_session) {
+               goto end;
        }
-}
-
-/*
- * Delete and free a connection.
- *
- * RCU read side lock MUST be acquired.
- */
-static
-void del_connection(struct lttng_ht *relay_connections_ht,
-               struct lttng_ht_iter *iter, struct relay_command *relay_connection,
-               struct lttng_ht *sessions_ht)
-{
-       int ret;
-       struct relay_session *session;
-
-       assert(relay_connections_ht);
-       assert(iter);
-       assert(relay_connection);
-       assert(sessions_ht);
-
-       DBG("Cleaning connection of session ID %" PRIu64,
-                       relay_connection->session_id);
 
        rcu_read_lock();
-       ret = lttng_ht_del(relay_connections_ht, iter);
-       assert(!ret);
-
-       session = session_find_by_id(sessions_ht, relay_connection->session_id);
-       if (session) {
-               /*
-                * Very important that this is done before destroying the session so we
-                * can put back every viewer stream reference from the ctf_trace.
-                */
-               destroy_viewer_streams_by_session(session);
-               try_destroy_streams(session);
-               session_viewer_try_destroy(sessions_ht, session);
+       cds_list_for_each_entry_safe(session, tmp_session,
+                       &conn->viewer_session->sessions_head,
+                       viewer_session_list) {
+               DBG("Cleaning connection of session ID %" PRIu64, session->id);
+               cleanup_session(conn, session);
        }
        rcu_read_unlock();
 
-       call_rcu(&relay_connection->rcu_node, deferred_free_connection);
+end:
+       connection_destroy(conn);
 }
 
 /*
@@ -1737,10 +1785,9 @@ void *thread_worker(void *data)
 {
        int ret, err = -1;
        uint32_t nb_fd;
-       struct relay_command *relay_connection;
+       struct relay_connection *conn;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
-       struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
        struct lttng_viewer_cmd recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
@@ -1767,7 +1814,7 @@ void *thread_worker(void *data)
                goto error_poll_create;
        }
 
-       ret = lttng_poll_add(&events, live_relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
+       ret = lttng_poll_add(&events, live_conn_pipe[0], LPOLLIN | LPOLLRDHUP);
        if (ret < 0) {
                goto error;
        }
@@ -1808,72 +1855,55 @@ restart:
                        health_code_update();
 
                        /* Thread quit pipe has been closed. Killing thread. */
-                       ret = check_live_conn_pipe(pollfd, revents);
+                       ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
                                err = 0;
                                goto exit;
                        }
 
-                       /* Inspect the relay cmd pipe for new connection */
-                       if (pollfd == live_relay_cmd_pipe[0]) {
+                       /* Inspect the relay conn pipe for new connection */
+                       if (pollfd == live_conn_pipe[0]) {
                                if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Relay live pipe error");
                                        goto error;
                                } else if (revents & LPOLLIN) {
-                                       DBG("Relay live viewer command received");
-                                       ret = add_connection(live_relay_cmd_pipe[0],
-                                                       &events, relay_connections_ht);
+                                       ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
                                        if (ret < 0) {
                                                goto error;
                                        }
-                               }
-                       } else if (revents) {
-                               rcu_read_lock();
-                               lttng_ht_lookup(relay_connections_ht,
-                                               (void *)((unsigned long) pollfd), &iter);
-                               node = lttng_ht_iter_get_node_ulong(&iter);
-                               if (node == NULL) {
-                                       DBG2("Relay viewer sock %d not found", pollfd);
+                                       conn->sessions_ht = sessions_ht;
+                                       connection_init(conn);
+                                       lttng_poll_add(&events, conn->sock->fd,
+                                                       LPOLLIN | LPOLLRDHUP);
+                                       rcu_read_lock();
+                                       lttng_ht_add_unique_ulong(relay_connections_ht,
+                                                       &conn->sock_n);
                                        rcu_read_unlock();
-                                       goto error;
+                                       DBG("Connection socket %d added", conn->sock->fd);
                                }
-                               relay_connection = caa_container_of(node, struct relay_command,
-                                               sock_n);
-
-                               if (revents & (LPOLLERR)) {
-                                       cleanup_poll_connection(&events, pollfd);
-                                       del_connection(relay_connections_ht, &iter,
-                                                       relay_connection, relay_ctx->sessions_ht);
-                               } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
-                                       DBG("Viewer socket %d hung up", pollfd);
-                                       cleanup_poll_connection(&events, pollfd);
-                                       del_connection(relay_connections_ht, &iter,
-                                                       relay_connection, relay_ctx->sessions_ht);
+                       } else {
+                               rcu_read_lock();
+                               conn = connection_find_by_sock(relay_connections_ht, pollfd);
+                               /* If not found, there is a synchronization issue. */
+                               assert(conn);
+
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       cleanup_connection_pollfd(&events, pollfd);
+                                       destroy_connection(relay_connections_ht, conn);
                                } else if (revents & LPOLLIN) {
-                                       ret = relay_connection->sock->ops->recvmsg(
-                                                       relay_connection->sock, &recv_hdr,
-                                                       sizeof(struct lttng_viewer_cmd),
-                                                       0);
-                                       /* connection closed */
+                                       ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
+                                                       sizeof(recv_hdr), 0);
                                        if (ret <= 0) {
-                                               cleanup_poll_connection(&events, pollfd);
-                                               del_connection(relay_connections_ht, &iter,
-                                                               relay_connection, relay_ctx->sessions_ht);
-                                               DBG("Viewer control connection closed with %d",
-                                                               pollfd);
+                                               /* Connection closed */
+                                               cleanup_connection_pollfd(&events, pollfd);
+                                               destroy_connection(relay_connections_ht, conn);
+                                               DBG("Viewer control conn closed with %d", pollfd);
                                        } else {
-                                               if (relay_connection->session) {
-                                                       DBG2("Relay viewer worker receiving data for "
-                                                                       "session: %" PRIu64,
-                                                                       relay_connection->session->id);
-                                               }
-                                               ret = process_control(&recv_hdr, relay_connection,
-                                                               sessions_ht);
+                                               ret = process_control(&recv_hdr, conn);
                                                if (ret < 0) {
                                                        /* Clear the session on error. */
-                                                       cleanup_poll_connection(&events, pollfd);
-                                                       del_connection(relay_connections_ht, &iter,
-                                                                       relay_connection, relay_ctx->sessions_ht);
+                                                       cleanup_connection_pollfd(&events, pollfd);
+                                                       destroy_connection(relay_connections_ht, conn);
                                                        DBG("Viewer connection closed with %d", pollfd);
                                                }
                                        }
@@ -1887,27 +1917,19 @@ exit:
 error:
        lttng_poll_clean(&events);
 
-       /* empty the hash table and free the memory */
+       /* Cleanup reamaining connection object. */
        rcu_read_lock();
-       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+       cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+                       sock_n.node) {
                health_code_update();
-
-               node = lttng_ht_iter_get_node_ulong(&iter);
-               if (!node) {
-                       continue;
-               }
-
-               relay_connection = caa_container_of(node, struct relay_command,
-                               sock_n);
-               del_connection(relay_connections_ht, &iter, relay_connection,
-                               relay_ctx->sessions_ht);
+               destroy_connection(relay_connections_ht, conn);
        }
        rcu_read_unlock();
 error_poll_create:
        lttng_ht_destroy(relay_connections_ht);
 relay_connections_ht_error:
-       /* Close relay cmd pipes */
-       utils_close_pipe(live_relay_cmd_pipe);
+       /* Close relay conn pipes */
+       utils_close_pipe(live_conn_pipe);
        if (err) {
                DBG("Viewer worker thread exited with error");
        }
@@ -1927,11 +1949,11 @@ error_testpoint:
  * Create the relay command pipe to wake thread_manage_apps.
  * Closed in cleanup().
  */
-static int create_relay_cmd_pipe(void)
+static int create_conn_pipe(void)
 {
        int ret;
 
-       ret = utils_create_pipe_cloexec(live_relay_cmd_pipe);
+       ret = utils_create_pipe_cloexec(live_conn_pipe);
 
        return ret;
 }
@@ -1992,12 +2014,12 @@ int live_start_threads(struct lttng_uri *uri,
        }
 
        /* Setup the thread apps communication pipe. */
-       if ((ret = create_relay_cmd_pipe()) < 0) {
+       if ((ret = create_conn_pipe()) < 0) {
                goto exit;
        }
 
        /* Init relay command queue. */
-       cds_wfq_init(&viewer_cmd_queue.queue);
+       cds_wfq_init(&viewer_conn_queue.queue);
 
        /* Set up max poll set size */
        lttng_poll_set_max_size();
This page took 0.046436 seconds and 4 git commands to generate.