Fix: relayd: live connection fails to open file during clear
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index d5f04582419dffec1466c60c4de449cf34948a03..7d6dc1bc20a03a5af6cafe1e5ef170be1b7f5cb8 100644 (file)
@@ -268,8 +268,8 @@ end_unlock:
  *
  * Return 0 on success or else a negative value.
  */
-static int make_viewer_streams(struct relay_session *session,
-               struct lttng_trace_chunk *viewer_trace_chunk,
+static int make_viewer_streams(struct relay_session *relay_session,
+               struct relay_viewer_session *viewer_session,
                enum lttng_viewer_seek seek_t,
                uint32_t *nb_total,
                uint32_t *nb_unsent,
@@ -279,18 +279,19 @@ static int make_viewer_streams(struct relay_session *session,
        int ret;
        struct lttng_ht_iter iter;
        struct ctf_trace *ctf_trace;
+       struct relay_stream *relay_stream = NULL;
 
-       assert(session);
-       ASSERT_LOCKED(session->lock);
+       assert(relay_session);
+       ASSERT_LOCKED(relay_session->lock);
 
-       if (!viewer_trace_chunk) {
+       if (!viewer_session->current_trace_chunk) {
                ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk",
-                               session->session_name);
+                               relay_session->session_name);
                ret = -1;
                goto error;
        }
 
-       if (session->connection_closed) {
+       if (relay_session->connection_closed) {
                *closed = true;
        }
 
@@ -299,10 +300,9 @@ static int make_viewer_streams(struct relay_session *session,
         * used for a the given session id only.
         */
        rcu_read_lock();
-       cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
-                       node.node) {
+       cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter,
+                       ctf_trace, node.node) {
                bool trace_has_metadata_stream = false;
-               struct relay_stream *stream;
 
                health_code_update();
 
@@ -314,15 +314,23 @@ static int make_viewer_streams(struct relay_session *session,
                 * Iterate over all the streams of the trace to see if we have a
                 * metadata stream.
                 */
-               cds_list_for_each_entry_rcu(
-                               stream, &ctf_trace->stream_list, stream_node)
+               cds_list_for_each_entry_rcu(relay_stream,
+                               &ctf_trace->stream_list, stream_node)
                {
-                       if (stream->is_metadata) {
+                       bool is_metadata_stream;
+
+                       pthread_mutex_lock(&relay_stream->lock);
+                       is_metadata_stream = relay_stream->is_metadata;
+                       pthread_mutex_unlock(&relay_stream->lock);
+
+                       if (is_metadata_stream) {
                                trace_has_metadata_stream = true;
                                break;
                        }
                }
 
+               relay_stream = NULL;
+
                /*
                 * If there is no metadata stream in this trace at the moment
                 * and we never sent one to the viewer, skip the trace. We
@@ -334,35 +342,72 @@ static int make_viewer_streams(struct relay_session *session,
                        continue;
                }
 
-               cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
-                       struct relay_viewer_stream *vstream;
+               cds_list_for_each_entry_rcu(relay_stream,
+                               &ctf_trace->stream_list, stream_node)
+               {
+                       struct relay_viewer_stream *viewer_stream;
 
-                       if (!stream_get(stream)) {
+                       if (!stream_get(relay_stream)) {
                                continue;
                        }
+
+                       pthread_mutex_lock(&relay_stream->lock);
                        /*
                         * stream published is protected by the session lock.
                         */
-                       if (!stream->published) {
+                       if (!relay_stream->published) {
                                goto next;
                        }
-                       vstream = viewer_stream_get_by_id(stream->stream_handle);
-                       if (!vstream) {
+                       viewer_stream = viewer_stream_get_by_id(
+                                       relay_stream->stream_handle);
+                       if (!viewer_stream) {
+                               struct lttng_trace_chunk *viewer_stream_trace_chunk;
+
                                /*
                                 * Save that we sent the metadata stream to the
                                 * viewer. So that we know what trace the viewer
                                 * is aware of.
                                 */
-                               if (stream->is_metadata) {
-                                       ctf_trace->metadata_stream_sent_to_viewer =
-                                                       true;
+                               if (relay_stream->is_metadata) {
+                                       ctf_trace->metadata_stream_sent_to_viewer = true;
                                }
-                               vstream = viewer_stream_create(stream,
-                                               viewer_trace_chunk, seek_t);
-                               if (!vstream) {
+
+                               /*
+                                * If a rotation is ongoing, use a copy of the
+                                * relay stream's chunk to ensure the stream
+                                * files exist.
+                                *
+                                * Otherwise, the viewer session's current trace
+                                * chunk can be used safely.
+                                */
+                               if ((relay_stream->ongoing_rotation.is_set ||
+                                                   relay_session->ongoing_rotation) &&
+                                               relay_stream->trace_chunk) {
+                                       viewer_stream_trace_chunk = lttng_trace_chunk_copy(
+                                                       relay_stream->trace_chunk);
+                                       if (!viewer_stream_trace_chunk) {
+                                               ret = -1;
+                                               ctf_trace_put(ctf_trace);
+                                               goto error_unlock;
+                                       }
+                               } else {
+                                       const bool reference_acquired = lttng_trace_chunk_get(
+                                                       viewer_session->current_trace_chunk);
+
+                                       assert(reference_acquired);
+                                       viewer_stream_trace_chunk =
+                                                       viewer_session->current_trace_chunk;
+                               }
+
+                               viewer_stream = viewer_stream_create(
+                                               relay_stream,
+                                               viewer_stream_trace_chunk,
+                                               seek_t);
+                               lttng_trace_chunk_put(viewer_stream_trace_chunk);
+                               viewer_stream_trace_chunk = NULL;
+                               if (!viewer_stream) {
                                        ret = -1;
                                        ctf_trace_put(ctf_trace);
-                                       stream_put(stream);
                                        goto error_unlock;
                                }
 
@@ -374,36 +419,40 @@ static int make_viewer_streams(struct relay_session *session,
                                 * Ensure a self-reference is preserved even
                                 * after we have put our local reference.
                                 */
-                               if (!viewer_stream_get(vstream)) {
+                               if (!viewer_stream_get(viewer_stream)) {
                                        ERR("Unable to get self-reference on viewer stream, logic error.");
                                        abort();
                                }
                        } else {
-                               if (!vstream->sent_flag && nb_unsent) {
+                               if (!viewer_stream->sent_flag && nb_unsent) {
                                        /* Update number of unsent stream counter. */
                                        (*nb_unsent)++;
                                }
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               if (stream->is_metadata) {
-                                       if (!stream->closed ||
-                                                       stream->metadata_received > vstream->metadata_sent) {
+                               if (relay_stream->is_metadata) {
+                                       if (!relay_stream->closed ||
+                                                       relay_stream->metadata_received >
+                                                                       viewer_stream->metadata_sent) {
                                                (*nb_total)++;
                                        }
                                } else {
-                                       if (!stream->closed ||
-                                               !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) {
-
+                                       if (!relay_stream->closed ||
+                                                       !(((int64_t)(relay_stream->prev_data_seq -
+                                                                         relay_stream->last_net_seq_num)) >=
+                                                                       0)) {
                                                (*nb_total)++;
                                        }
                                }
                        }
                        /* Put local reference. */
-                       viewer_stream_put(vstream);
+                       viewer_stream_put(viewer_stream);
                next:
-                       stream_put(stream);
+                       pthread_mutex_unlock(&relay_stream->lock);
+                       stream_put(relay_stream);
                }
+               relay_stream = NULL;
                ctf_trace_put(ctf_trace);
        }
 
@@ -412,6 +461,11 @@ static int make_viewer_streams(struct relay_session *session,
 error_unlock:
        rcu_read_unlock();
 error:
+       if (relay_stream) {
+               pthread_mutex_unlock(&relay_stream->lock);
+               stream_put(relay_stream);
+       }
+
        return ret;
 }
 
@@ -1088,7 +1142,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
 
        pthread_mutex_lock(&session->lock);
        ret = make_viewer_streams(session,
-                       conn->viewer_session->current_trace_chunk,
+                       conn->viewer_session,
                        LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
                        &nb_created, &closed);
        if (ret < 0) {
@@ -1237,7 +1291,7 @@ int viewer_attach_session(struct relay_connection *conn)
        }
 
        ret = make_viewer_streams(session,
-                       conn->viewer_session->current_trace_chunk, seek_type,
+                       conn->viewer_session, seek_type,
                        &nb_streams, NULL, NULL, &closed);
        if (ret < 0) {
                goto end_put_session;
This page took 0.026946 seconds and 4 git commands to generate.