X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=13078026b220b7e3f9e6c3e1c8a0e3009e3e7766;hb=a0377dfefe40662ba7d68617bce6ff467114136c;hp=5877467c0d5598f0cbdc7481f08c144a0b7580b1;hpb=80516611b6f19201b1e173fb448935aca7a9e668;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 5877467c0..13078026b 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -281,7 +281,7 @@ static int make_viewer_streams(struct relay_session *relay_session, struct ctf_trace *ctf_trace; struct relay_stream *relay_stream = NULL; - assert(relay_session); + LTTNG_ASSERT(relay_session); ASSERT_LOCKED(relay_session->lock); if (relay_session->connection_closed) { @@ -404,7 +404,7 @@ static int make_viewer_streams(struct relay_session *relay_session, reference_acquired = lttng_trace_chunk_get( viewer_session->current_trace_chunk); - assert(reference_acquired); + LTTNG_ASSERT(reference_acquired); viewer_stream_trace_chunk = viewer_session->current_trace_chunk; } @@ -1113,7 +1113,7 @@ int viewer_get_new_streams(struct relay_connection *conn) uint64_t session_id; bool closed = false; - assert(conn); + LTTNG_ASSERT(conn); DBG("Get new streams received"); @@ -1142,10 +1142,19 @@ int viewer_get_new_streams(struct relay_connection *conn) goto send_reply; } + /* + * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since + * that at this point the client is already attached to the session.Aany + * initial stream will have been created with the seek type at attach + * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach). + * Otherwise any event happening in a new stream between the attach and + * a call to viewer_get_new_streams will be "lost" (never received) from + * the viewer's point of view. + */ pthread_mutex_lock(&session->lock); ret = make_viewer_streams(session, conn->viewer_session, - LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, + LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { goto error_unlock_session; @@ -1226,7 +1235,7 @@ int viewer_attach_session(struct relay_connection *conn) bool closed = false; uint64_t session_id; - assert(conn); + LTTNG_ASSERT(conn); health_code_update(); @@ -1509,7 +1518,7 @@ static int check_index_status(struct relay_viewer_stream *vstream, vstream->stream->stream_handle); goto index_ready; } - assert(tracefile_array_seq_in_file(rstream->tfa, + LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa, vstream->current_tracefile_id, vstream->index_sent_seqcount)); } @@ -1533,7 +1542,7 @@ void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, const bool acquired_reference = lttng_trace_chunk_get( new_trace_chunk); - assert(acquired_reference); + LTTNG_ASSERT(acquired_reference); } vstream->stream_file.trace_chunk = new_trace_chunk; @@ -1558,7 +1567,7 @@ int viewer_get_next_index(struct relay_connection *conn) struct ctf_trace *ctf_trace = NULL; struct relay_viewer_stream *metadata_viewer_stream = NULL; - assert(conn); + LTTNG_ASSERT(conn); DBG("Viewer get next index"); @@ -1660,7 +1669,7 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } /* At this point, ret is 0 thus we will be able to read the index. */ - assert(!ret); + LTTNG_ASSERT(!ret); /* Try to open an index if one is needed for that stream. */ ret = try_open_index(vstream, rstream); @@ -1932,7 +1941,7 @@ int viewer_get_metadata(struct relay_connection *conn) struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *vstream = NULL; - assert(conn); + LTTNG_ASSERT(conn); DBG("Relay get metadata"); @@ -2017,7 +2026,7 @@ int viewer_get_metadata(struct relay_connection *conn) vstream->stream_file.trace_chunk); lttng_trace_chunk_put(vstream->stream_file.trace_chunk); acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); - assert(acquired_reference); + LTTNG_ASSERT(acquired_reference); vstream->stream_file.trace_chunk = conn->viewer_session->current_trace_chunk; viewer_stream_close_files(vstream); @@ -2229,7 +2238,7 @@ int viewer_detach_session(struct relay_connection *conn) DBG("Viewer detach session received"); - assert(conn); + LTTNG_ASSERT(conn); health_code_update();