relayd: move relay_session locking outside of make_viewer_streams
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 84fa1435196057c6a506b019eedf4bcf477da49b..dc8238c9499b876de333a7c313e1e95135e847aa 100644 (file)
@@ -197,7 +197,7 @@ end:
  */
 static
 ssize_t send_viewer_streams(struct lttcomm_sock *sock,
-               struct relay_session *session, unsigned int ignore_sent_flag)
+               uint64_t session_id, unsigned int ignore_sent_flag)
 {
        ssize_t ret;
        struct lttng_viewer_stream send_stream;
@@ -218,7 +218,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
 
                pthread_mutex_lock(&vstream->stream->lock);
                /* Ignore if not the same session. */
-               if (vstream->stream->trace->session->id != session->id ||
+               if (vstream->stream->trace->session->id != session_id ||
                                (!ignore_sent_flag && vstream->sent_flag)) {
                        pthread_mutex_unlock(&vstream->stream->lock);
                        viewer_stream_put(vstream);
@@ -271,6 +271,9 @@ end_unlock:
  * viewer stream of the session, the number of unsent stream and the number of
  * stream created. Those counters can be NULL and thus will be ignored.
  *
+ * session must be locked to ensure that we see either none or all initial
+ * streams for a session, but no intermediate state..
+ *
  * Return 0 on success or else a negative value.
  */
 static
@@ -283,12 +286,7 @@ int make_viewer_streams(struct relay_session *session,
        struct ctf_trace *ctf_trace;
 
        assert(session);
-
-       /*
-        * Hold the session lock to ensure that we see either none or
-        * all initial streams for a session, but no intermediate state.
-        */
-       pthread_mutex_lock(&session->lock);
+       ASSERT_LOCKED(session->lock);
 
        if (session->connection_closed) {
                *closed = true;
@@ -376,7 +374,6 @@ int make_viewer_streams(struct relay_session *session,
 
 error_unlock:
        rcu_read_unlock();
-       pthread_mutex_unlock(&session->lock);
        return ret;
 }
 
@@ -946,8 +943,10 @@ int viewer_get_new_streams(struct relay_connection *conn)
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
+       pthread_mutex_lock(&session->lock);
        ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
                        &nb_created, &closed);
+       pthread_mutex_unlock(&session->lock);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -988,7 +987,7 @@ send_reply:
         * streams that were not sent from that point will be sent to
         * the viewer.
         */
-       ret = send_viewer_streams(conn->sock, session, 0);
+       ret = send_viewer_streams(conn->sock, session_id, 0);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -1015,6 +1014,7 @@ int viewer_attach_session(struct relay_connection *conn)
        struct lttng_viewer_attach_session_response response;
        struct relay_session *session = NULL;
        bool closed = false;
+       uint64_t session_id;
 
        assert(conn);
 
@@ -1026,6 +1026,7 @@ int viewer_attach_session(struct relay_connection *conn)
                goto error;
        }
 
+       session_id = be64toh(request.session_id);
        health_code_update();
 
        memset(&response, 0, sizeof(response));
@@ -1036,16 +1037,15 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
-       session = session_get_by_id(be64toh(request.session_id));
+       session = session_get_by_id(session_id);
        if (!session) {
-               DBG("Relay session %" PRIu64 " not found",
-                               (uint64_t) be64toh(request.session_id));
+               DBG("Relay session %" PRIu64 " not found", session_id);
                response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
-       DBG("Attach session ID %" PRIu64 " received",
-               (uint64_t) be64toh(request.session_id));
+       DBG("Attach session ID %" PRIu64 " received", session_id);
 
+       pthread_mutex_lock(&session->lock);
        if (session->live_timer == 0) {
                DBG("Not live session");
                response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
@@ -1078,8 +1078,12 @@ int viewer_attach_session(struct relay_connection *conn)
        if (ret < 0) {
                goto end_put_session;
        }
-       response.streams_count = htobe32(nb_streams);
 
+       pthread_mutex_unlock(&session->lock);
+       session_put(session);
+       session = NULL;
+
+       response.streams_count = htobe32(nb_streams);
        /*
         * If the session is closed when the viewer is attaching, it
         * means some of the streams may have been concurrently removed,
@@ -1111,13 +1115,14 @@ send_reply:
        }
 
        /* Send stream and ignore the sent flag. */
-       ret = send_viewer_streams(conn->sock, session, 1);
+       ret = send_viewer_streams(conn->sock, session_id, 1);
        if (ret < 0) {
                goto end_put_session;
        }
 
 end_put_session:
        if (session) {
+               pthread_mutex_unlock(&session->lock);
                session_put(session);
        }
 error:
This page took 0.02465 seconds and 4 git commands to generate.