X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=f00f07db1b8e63fc280c5a868379da2a47a68ade;hp=e480f86e483400874c2240b78dfe74b46524222f;hb=3d7708ff51bfae7a01b7062d1f2567b2854ec374;hpb=79a4298098da8ec2d118e00ca2a08e7b542e7287 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index e480f86e4..f00f07db1 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -291,6 +291,13 @@ static int make_viewer_streams(struct relay_session *session, assert(session); ASSERT_LOCKED(session->lock); + if (!viewer_trace_chunk) { + ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk", + session->session_name); + ret = -1; + goto error; + } + if (session->connection_closed) { *closed = true; } @@ -302,6 +309,7 @@ static int make_viewer_streams(struct relay_session *session, rcu_read_lock(); cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) { + bool trace_has_metadata_stream = false; struct relay_stream *stream; health_code_update(); @@ -310,6 +318,30 @@ static int make_viewer_streams(struct relay_session *session, continue; } + /* + * Iterate over all the streams of the trace to see if we have a + * metadata stream. + */ + cds_list_for_each_entry_rcu( + stream, &ctf_trace->stream_list, stream_node) + { + if (stream->is_metadata) { + trace_has_metadata_stream = true; + break; + } + } + + /* + * If there is no metadata stream in this trace at the moment + * and we never sent one to the viewer, skip the trace. We + * accept that the viewer will not see this trace at all. + */ + if (!trace_has_metadata_stream && + !ctf_trace->metadata_stream_sent_to_viewer) { + ctf_trace_put(ctf_trace); + continue; + } + cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) { struct relay_viewer_stream *vstream; @@ -324,6 +356,15 @@ static int make_viewer_streams(struct relay_session *session, } vstream = viewer_stream_get_by_id(stream->stream_handle); if (!vstream) { + /* + * Save that we sent the metadata stream to the + * viewer. So that we know what trace the viewer + * is aware of. + */ + if (stream->is_metadata) { + ctf_trace->metadata_stream_sent_to_viewer = + true; + } vstream = viewer_stream_create(stream, viewer_trace_chunk, seek_t); if (!vstream) { @@ -378,6 +419,7 @@ static int make_viewer_streams(struct relay_session *session, error_unlock: rcu_read_unlock(); +error: return ret; } @@ -954,23 +996,11 @@ int viewer_get_new_streams(struct relay_connection *conn) } if (!viewer_session_is_attached(conn->viewer_session, session)) { - send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } - send_streams = 1; - response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); - pthread_mutex_lock(&session->lock); - if (!conn->viewer_session->current_trace_chunk && - session->current_trace_chunk) { - ret = viewer_session_set_trace_chunk(conn->viewer_session, - session->current_trace_chunk); - if (ret) { - goto error_unlock_session; - } - } ret = make_viewer_streams(session, conn->viewer_session->current_trace_chunk, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, @@ -978,7 +1008,8 @@ int viewer_get_new_streams(struct relay_connection *conn) if (ret < 0) { goto error_unlock_session; } - pthread_mutex_unlock(&session->lock); + send_streams = 1; + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); /* Only send back the newly created streams with the unsent ones. */ nb_streams = nb_created + nb_unsent; @@ -992,8 +1023,10 @@ int viewer_get_new_streams(struct relay_connection *conn) send_streams = 0; response.streams_count = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); - goto send_reply; + goto send_reply_unlock; } +send_reply_unlock: + pthread_mutex_unlock(&session->lock); send_reply: health_code_update(); @@ -1047,6 +1080,7 @@ int viewer_attach_session(struct relay_connection *conn) struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; struct relay_session *session = NULL; + enum lttng_viewer_attach_return_code viewer_attach_status; bool closed = false; uint64_t session_id; @@ -1096,10 +1130,10 @@ int viewer_attach_session(struct relay_connection *conn) } send_streams = 1; - ret = viewer_session_attach(conn->viewer_session, session); - if (ret) { - DBG("Already a viewer attached"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY); + viewer_attach_status = viewer_session_attach(conn->viewer_session, + session); + if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) { + response.status = htobe32(viewer_attach_status); goto send_reply; } @@ -1116,14 +1150,6 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - if (!conn->viewer_session->current_trace_chunk && - session->current_trace_chunk) { - ret = viewer_session_set_trace_chunk(conn->viewer_session, - session->current_trace_chunk); - if (ret) { - goto end_put_session; - } - } ret = make_viewer_streams(session, conn->viewer_session->current_trace_chunk, seek_type, &nb_streams, NULL, NULL, &closed);