X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=962fe0fd14a340c9e903f6410d09b1b6f386a3de;hb=87250ba19aec78f36e301494a03f5678fcb6fbb4;hp=7d6dc1bc20a03a5af6cafe1e5ef170be1b7f5cb8;hpb=9edaf114d28249f4740de16bc9f58c43cfe8042e;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 7d6dc1bc2..962fe0fd1 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -281,16 +281,9 @@ static int make_viewer_streams(struct relay_session *relay_session, struct ctf_trace *ctf_trace; struct relay_stream *relay_stream = NULL; - assert(relay_session); + LTTNG_ASSERT(relay_session); ASSERT_LOCKED(relay_session->lock); - if (!viewer_session->current_trace_chunk) { - ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk", - relay_session->session_name); - ret = -1; - goto error; - } - if (relay_session->connection_closed) { *closed = true; } @@ -361,7 +354,7 @@ static int make_viewer_streams(struct relay_session *relay_session, viewer_stream = viewer_stream_get_by_id( relay_stream->stream_handle); if (!viewer_stream) { - struct lttng_trace_chunk *viewer_stream_trace_chunk; + struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL; /* * Save that we sent the metadata stream to the @@ -391,12 +384,42 @@ static int make_viewer_streams(struct relay_session *relay_session, goto error_unlock; } } else { - const bool reference_acquired = lttng_trace_chunk_get( - viewer_session->current_trace_chunk); + /* + * Transition the viewer session into the newest trace chunk available. + */ + if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk, + relay_stream->trace_chunk)) { + + ret = viewer_session_set_trace_chunk_copy( + viewer_session, + relay_stream->trace_chunk); + if (ret) { + ret = -1; + ctf_trace_put(ctf_trace); + goto error_unlock; + } + } - assert(reference_acquired); - viewer_stream_trace_chunk = - viewer_session->current_trace_chunk; + if (relay_stream->trace_chunk) { + /* + * If the corresponding relay + * stream's trace chunk is set, + * the viewer stream will be + * created under it. + * + * Note that a relay stream can + * have a NULL output trace + * chunk (for instance, after a + * clear against a stopped + * session). + */ + const bool reference_acquired = lttng_trace_chunk_get( + viewer_session->current_trace_chunk); + + LTTNG_ASSERT(reference_acquired); + viewer_stream_trace_chunk = + viewer_session->current_trace_chunk; + } } viewer_stream = viewer_stream_create( @@ -460,7 +483,7 @@ static int make_viewer_streams(struct relay_session *relay_session, error_unlock: rcu_read_unlock(); -error: + if (relay_stream) { pthread_mutex_unlock(&relay_stream->lock); stream_put(relay_stream); @@ -1018,14 +1041,6 @@ int viewer_list_sessions(struct relay_connection *conn) /* Skip closed session */ goto next_session; } - if (!session->current_trace_chunk) { - /* - * Skip un-attachable session. It is either - * being destroyed or has not had a trace - * chunk created against it yet. - */ - goto next_session; - } if (count >= buf_count) { struct lttng_viewer_session *newbuf; @@ -1111,7 +1126,7 @@ int viewer_get_new_streams(struct relay_connection *conn) uint64_t session_id; bool closed = false; - assert(conn); + LTTNG_ASSERT(conn); DBG("Get new streams received"); @@ -1140,10 +1155,19 @@ int viewer_get_new_streams(struct relay_connection *conn) goto send_reply; } + /* + * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since + * that at this point the client is already attached to the session.Aany + * initial stream will have been created with the seek type at attach + * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach). + * Otherwise any event happening in a new stream between the attach and + * a call to viewer_get_new_streams will be "lost" (never received) from + * the viewer's point of view. + */ pthread_mutex_lock(&session->lock); ret = make_viewer_streams(session, conn->viewer_session, - LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, + LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, &nb_created, &closed); if (ret < 0) { goto error_unlock_session; @@ -1224,7 +1248,7 @@ int viewer_attach_session(struct relay_connection *conn) bool closed = false; uint64_t session_id; - assert(conn); + LTTNG_ASSERT(conn); health_code_update(); @@ -1254,15 +1278,6 @@ int viewer_attach_session(struct relay_connection *conn) DBG("Attach session ID %" PRIu64 " received", session_id); pthread_mutex_lock(&session->lock); - if (!session->current_trace_chunk) { - /* - * Session is either being destroyed or it never had a trace - * chunk created against it. - */ - DBG("Session requested by live client has no current trace chunk, returning unknown session"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); - goto send_reply; - } if (session->live_timer == 0) { DBG("Not live session"); response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE); @@ -1371,10 +1386,12 @@ static int try_open_index(struct relay_viewer_stream *vstream, /* * First time, we open the index file and at least one index is ready. */ - if (rstream->index_received_seqcount == 0) { + if (rstream->index_received_seqcount == 0 || + !vstream->stream_file.trace_chunk) { ret = -ENOENT; goto end; } + chunk_status = lttng_index_file_create_from_trace_chunk_read_only( vstream->stream_file.trace_chunk, rstream->path_name, rstream->channel_name, rstream->tracefile_size, @@ -1514,7 +1531,7 @@ static int check_index_status(struct relay_viewer_stream *vstream, vstream->stream->stream_handle); goto index_ready; } - assert(tracefile_array_seq_in_file(rstream->tfa, + LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa, vstream->current_tracefile_id, vstream->index_sent_seqcount)); } @@ -1528,6 +1545,24 @@ index_ready: return 1; } +static +void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, + struct lttng_trace_chunk *new_trace_chunk) +{ + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + + if (new_trace_chunk) { + const bool acquired_reference = lttng_trace_chunk_get( + new_trace_chunk); + + LTTNG_ASSERT(acquired_reference); + } + + vstream->stream_file.trace_chunk = new_trace_chunk; + viewer_stream_sync_tracefile_array_tail(vstream); + viewer_stream_close_files(vstream); +} + /* * Send the next index for a stream. * @@ -1545,7 +1580,7 @@ int viewer_get_next_index(struct relay_connection *conn) struct ctf_trace *ctf_trace = NULL; struct relay_viewer_stream *metadata_viewer_stream = NULL; - assert(conn); + LTTNG_ASSERT(conn); DBG("Viewer get next index"); @@ -1596,7 +1631,10 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - if (rstream->trace_chunk && !lttng_trace_chunk_ids_equal( + /* + * Transition the viewer session into the newest trace chunk available. + */ + if (!lttng_trace_chunk_ids_equal( conn->viewer_session->current_trace_chunk, rstream->trace_chunk)) { DBG("Relay stream and viewer chunk ids differ"); @@ -1609,21 +1647,28 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } } - if (conn->viewer_session->current_trace_chunk != - vstream->stream_file.trace_chunk) { - bool acquired_reference; + /* + * Transition the viewer stream into the latest trace chunk available. + * + * Note that the stream must _not_ rotate in one precise condition: + * the relay stream has rotated to a NULL trace chunk and the viewer + * stream is consuming the trace chunk that was active just before + * that rotation to NULL. + * + * This allows clients to consume all the packets of a trace chunk + * after a session's destruction. + */ + if (conn->viewer_session->current_trace_chunk != vstream->stream_file.trace_chunk && + !(rstream->completed_rotation_count == vstream->last_seen_rotation_count + 1 && !rstream->trace_chunk)) { DBG("Viewer session and viewer stream chunk differ: " "vsession chunk %p vstream chunk %p", conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); - lttng_trace_chunk_put(vstream->stream_file.trace_chunk); - acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); - assert(acquired_reference); - vstream->stream_file.trace_chunk = - conn->viewer_session->current_trace_chunk; - viewer_stream_sync_tracefile_array_tail(vstream); - viewer_stream_close_files(vstream); + viewer_stream_rotate_to_trace_chunk(vstream, + conn->viewer_session->current_trace_chunk); + vstream->last_seen_rotation_count = + rstream->completed_rotation_count; } ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); @@ -1637,7 +1682,7 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } /* At this point, ret is 0 thus we will be able to read the index. */ - assert(!ret); + LTTNG_ASSERT(!ret); /* Try to open an index if one is needed for that stream. */ ret = try_open_index(vstream, rstream); @@ -1854,6 +1899,8 @@ int viewer_get_packet(struct relay_connection *conn) goto send_reply; error: + /* No payload to send on error. */ + reply_size = sizeof(reply_header); reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: @@ -1907,7 +1954,7 @@ int viewer_get_metadata(struct relay_connection *conn) struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *vstream = NULL; - assert(conn); + LTTNG_ASSERT(conn); DBG("Relay get metadata"); @@ -1982,8 +2029,9 @@ int viewer_get_metadata(struct relay_connection *conn) } } - if (conn->viewer_session->current_trace_chunk != - vstream->stream_file.trace_chunk) { + if (conn->viewer_session->current_trace_chunk && + conn->viewer_session->current_trace_chunk != + vstream->stream_file.trace_chunk) { bool acquired_reference; DBG("Viewer session and viewer stream chunk differ: " @@ -1992,7 +2040,7 @@ int viewer_get_metadata(struct relay_connection *conn) vstream->stream_file.trace_chunk); lttng_trace_chunk_put(vstream->stream_file.trace_chunk); acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); - assert(acquired_reference); + LTTNG_ASSERT(acquired_reference); vstream->stream_file.trace_chunk = conn->viewer_session->current_trace_chunk; viewer_stream_close_files(vstream); @@ -2000,11 +2048,16 @@ int viewer_get_metadata(struct relay_connection *conn) len = vstream->stream->metadata_received - vstream->metadata_sent; - /* - * Either this is the first time the metadata file is read, or a - * rotation of the corresponding relay stream has occured. - */ - if (!vstream->stream_file.handle && len > 0) { + if (!vstream->stream_file.trace_chunk) { + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); + len = 0; + goto send_reply; + } else if (vstream->stream_file.trace_chunk && + !vstream->stream_file.handle && len > 0) { + /* + * Either this is the first time the metadata file is read, or a + * rotation of the corresponding relay stream has occurred. + */ struct fs_handle *fs_handle; char file_path[LTTNG_PATH_MAX]; enum lttng_trace_chunk_status status; @@ -2204,7 +2257,7 @@ int viewer_detach_session(struct relay_connection *conn) DBG("Viewer detach session received"); - assert(conn); + LTTNG_ASSERT(conn); health_code_update();