- viewer_stream->session_id = stream->session->id;
- viewer_stream->stream_handle = stream->stream_handle;
- viewer_stream->path_name = strndup(stream->path_name,
- LTTNG_VIEWER_PATH_MAX);
- viewer_stream->channel_name = strndup(stream->channel_name,
- LTTNG_VIEWER_NAME_MAX);
- viewer_stream->tracefile_count = stream->tracefile_count;
- viewer_stream->metadata_flag = stream->metadata_flag;
- viewer_stream->tracefile_count_last = -1ULL;
- if (seek_last) {
- viewer_stream->tracefile_count_current =
- stream->tracefile_count_current;
- } else {
- viewer_stream->tracefile_count_current =
- stream->oldest_tracefile_id;
- }
-
- /*
- * The deletion of this ctf_trace object is only done in a call RCU of the
- * relay stream making it valid as long as we have the read side lock.
- */
- viewer_stream->ctf_trace = stream->ctf_trace;
- uatomic_inc(&viewer_stream->ctf_trace->refcount);
-
- lttng_ht_node_init_u64(&viewer_stream->stream_n, stream->stream_handle);
- lttng_ht_add_unique_u64(viewer_streams_ht, &viewer_stream->stream_n);
-
- viewer_stream->index_read_fd = -1;
- viewer_stream->read_fd = -1;
-
- /*
- * This is to avoid a race between the initialization of this object and
- * the close of the given stream. If the stream is unable to find this
- * viewer stream when closing, this copy will at least take the latest
- * value.
- * We also need that for the seek_last.
- */
- viewer_stream->total_index_received = stream->total_index_received;
-
- /*
- * If we never received an index for the current stream, delay
- * the opening of the index, otherwise open it right now.
- */
- if (viewer_stream->tracefile_count_current ==
- stream->tracefile_count_current &&
- viewer_stream->total_index_received == 0) {
- viewer_stream->index_read_fd = -1;
- } else {
- ret = open_index(viewer_stream);
- if (ret < 0) {
- goto error;
- }
- }
-
- if (seek_last && viewer_stream->index_read_fd > 0) {
- ret = lseek(viewer_stream->index_read_fd,
- viewer_stream->total_index_received *
- sizeof(struct ctf_packet_index),
- SEEK_CUR);
- if (ret < 0) {
- goto error;
- }
- viewer_stream->last_sent_index =
- viewer_stream->total_index_received;
- }
-
- ret = 0;
-
-error:
- return ret;
-}
-
-/*
- * Rotate a stream to the next tracefile.
- *
- * Returns 0 on success, 1 on EOF, a negative value on error.
- */
-static
-int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream,
- struct relay_stream *stream)
-{
- int ret;
- uint64_t tracefile_id;
-
- assert(viewer_stream);
-
- tracefile_id = (viewer_stream->tracefile_count_current + 1) %
- viewer_stream->tracefile_count;
- /*
- * Detect the last tracefile to open.
- */
- if (viewer_stream->tracefile_count_last != -1ULL &&
- viewer_stream->tracefile_count_last ==
- viewer_stream->tracefile_count_current) {
- ret = 1;
- goto end;
- }
-
- if (stream) {
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- }
- /*
- * The writer and the reader are not working in the same
- * tracefile, we can read up to EOF, we don't care about the
- * total_index_received.
- */
- if (!stream || (stream->tracefile_count_current != tracefile_id)) {
- viewer_stream->close_write_flag = 1;
- } else {
- /*
- * We are opening a file that is still open in write, make
- * sure we limit our reading to the number of indexes
- * received.
- */
- viewer_stream->close_write_flag = 0;
- if (stream) {
- viewer_stream->total_index_received =
- stream->total_index_received;
- }
- }
- viewer_stream->tracefile_count_current = tracefile_id;
-
- ret = close(viewer_stream->index_read_fd);
- if (ret < 0) {
- PERROR("close index file %d",
- viewer_stream->index_read_fd);
- }
- viewer_stream->index_read_fd = -1;
- ret = close(viewer_stream->read_fd);
- if (ret < 0) {
- PERROR("close tracefile %d",
- viewer_stream->read_fd);
- }
- viewer_stream->read_fd = -1;
-
- pthread_mutex_lock(&viewer_stream->overwrite_lock);
- viewer_stream->abort_flag = 0;
- pthread_mutex_unlock(&viewer_stream->overwrite_lock);
-
- viewer_stream->index_read_fd = -1;
- viewer_stream->read_fd = -1;
-
- if (stream) {
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- }
- ret = open_index(viewer_stream);
- if (ret < 0) {
- goto error;
- }
-
- ret = 0;
-
-end:
-error:
- return ret;
-}
-
-/*
- * Send the viewer the list of current sessions.
- */
-static
-int viewer_attach_session(struct relay_command *cmd,
- struct lttng_ht *sessions_ht)
-{
- int ret, send_streams = 0, nb_streams = 0;
- struct lttng_viewer_attach_session_request request;
- struct lttng_viewer_attach_session_response response;
- struct lttng_viewer_stream send_stream;
- struct relay_stream *stream;
- struct relay_viewer_stream *viewer_stream;
- struct lttng_ht_node_ulong *node;
- struct lttng_ht_node_u64 *node64;
- struct lttng_ht_iter iter;
- struct relay_session *session;
- int seek_last = 0;
-
- assert(cmd);
- assert(sessions_ht);
-
- DBG("Attach session received");
-
- if (cmd->version_check_done == 0) {
- ERR("Trying to attach session before version check");
- ret = -1;
- goto end_no_session;
- }