X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=acd488b61846202813e8a7efe07898c1f67cfef7;hb=f3fe2a9216e46a1a10490e7457fabce7872bf5b7;hp=8a3b09a92060ef25e9f9f8f50b2d36d15c2de159;hpb=f8f3885cc52af9d3c951da78989d6f4a25270411;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 8a3b09a92..acd488b61 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -41,9 +41,16 @@ static void viewer_stream_destroy_rcu(struct rcu_head *head) } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, + struct lttng_trace_chunk *viewer_trace_chunk, enum lttng_viewer_seek seek_t) { - struct relay_viewer_stream *vstream; + struct relay_viewer_stream *vstream = NULL; + const bool acquired_reference = lttng_trace_chunk_get( + viewer_trace_chunk); + + if (!acquired_reference) { + goto error; + } vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -51,6 +58,8 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } + vstream->stream_file.trace_chunk = viewer_trace_chunk; + viewer_trace_chunk = NULL; vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); if (vstream->path_name == NULL) { PERROR("relay viewer path_name alloc"); @@ -96,7 +105,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, } case LTTNG_VIEWER_SEEK_LAST: vstream->current_tracefile_id = - tracefile_array_get_file_index_head(stream->tfa); + tracefile_array_get_read_file_index_head(stream->tfa); /* * We seek at the very end of each stream, awaiting for * a future packet to eventually come in. @@ -118,10 +127,18 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, if (stream->index_received_seqcount == 0) { vstream->index_file = NULL; } else { - vstream->index_file = lttng_index_file_open(vstream->path_name, - vstream->channel_name, - stream->tracefile_count, - vstream->current_tracefile_id); + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; + + vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only( + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor)); if (!vstream->index_file) { goto error_unlock; } @@ -143,10 +160,8 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, /* Globally visible after the add unique. */ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - - pthread_mutex_init(&vstream->reflock, NULL); urcu_ref_init(&vstream->ref); + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); return vstream; @@ -156,6 +171,9 @@ error: if (vstream) { viewer_stream_destroy(vstream); } + if (viewer_trace_chunk && acquired_reference) { + lttng_trace_chunk_put(viewer_trace_chunk); + } return NULL; } @@ -180,9 +198,9 @@ static void viewer_stream_release(struct urcu_ref *ref) viewer_stream_unpublish(vstream); - if (vstream->stream_fd) { - stream_fd_put(vstream->stream_fd); - vstream->stream_fd = NULL; + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; } if (vstream->index_file) { lttng_index_file_put(vstream->index_file); @@ -192,22 +210,15 @@ static void viewer_stream_release(struct urcu_ref *ref) stream_put(vstream->stream); vstream->stream = NULL; } + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + vstream->stream_file.trace_chunk = NULL; call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); } /* Must be called with RCU read-side lock held. */ bool viewer_stream_get(struct relay_viewer_stream *vstream) { - bool has_ref = false; - - pthread_mutex_lock(&vstream->reflock); - if (vstream->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&vstream->ref); - } - pthread_mutex_unlock(&vstream->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&vstream->ref); } /* @@ -240,9 +251,7 @@ end: void viewer_stream_put(struct relay_viewer_stream *vstream) { rcu_read_lock(); - pthread_mutex_lock(&vstream->reflock); urcu_ref_put(&vstream->ref, viewer_stream_release); - pthread_mutex_unlock(&vstream->reflock); rcu_read_unlock(); } @@ -255,8 +264,10 @@ void viewer_stream_put(struct relay_viewer_stream *vstream) int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - struct relay_stream *stream = vstream->stream; uint64_t new_id; + const struct relay_stream *stream = vstream->stream; + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; /* Detect the last tracefile to open. */ if (stream->index_received_seqcount @@ -301,15 +312,21 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) lttng_index_file_put(vstream->index_file); vstream->index_file = NULL; } - if (vstream->stream_fd) { - stream_fd_put(vstream->stream_fd); - vstream->stream_fd = NULL; + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; } - - vstream->index_file = lttng_index_file_open(vstream->path_name, - vstream->channel_name, - stream->tracefile_count, - vstream->current_tracefile_id); + vstream->index_file = + lttng_index_file_create_from_trace_chunk_read_only( + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, + stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor)); if (!vstream->index_file) { ret = -1; goto end;