X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=5c19fb9747e4e124e393e872b15d042810a7aff5;hp=1d02ee32945200b63142d934565ab83154513953;hb=b0d240a2e2204087ff1634f0bd265660c0582f33;hpb=7591bab11eceedc6a0d1e02fd6f85592267a63b5 diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 1d02ee329..5c19fb974 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -17,10 +17,14 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include +#include +#include +#include +#include +#include #include "lttng-relayd.h" #include "viewer-stream.h" @@ -41,9 +45,16 @@ static void viewer_stream_destroy_rcu(struct rcu_head *head) } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, + struct lttng_trace_chunk *viewer_trace_chunk, enum lttng_viewer_seek seek_t) { - struct relay_viewer_stream *vstream; + struct relay_viewer_stream *vstream = NULL; + const bool acquired_reference = lttng_trace_chunk_get( + viewer_trace_chunk); + + if (!acquired_reference) { + goto error; + } vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -51,82 +62,149 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); + vstream->stream_file.trace_chunk = viewer_trace_chunk; + viewer_trace_chunk = NULL; + vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); if (vstream->path_name == NULL) { PERROR("relay viewer path_name alloc"); goto error; } - vstream->channel_name = strndup(stream->channel_name, + vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX); if (vstream->channel_name == NULL) { PERROR("relay viewer channel_name alloc"); goto error; } + if (!stream_get(stream)) { + ERR("Cannot get stream"); + goto error; + } + vstream->stream = stream; + + pthread_mutex_lock(&stream->lock); + + if (stream->is_metadata && stream->trace->viewer_metadata_stream) { + ERR("Cannot attach viewer metadata stream to trace (busy)."); + goto error_unlock; + } + switch (seek_t) { case LTTNG_VIEWER_SEEK_BEGINNING: - vstream->current_tracefile_id = stream->oldest_tracefile_id; + { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + if (seq_tail == -1ULL) { + /* + * Tail may not be initialized yet. Nonetheless, we know + * we want to send the first index once it becomes + * available. + */ + seq_tail = 0; + } + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; break; + } case LTTNG_VIEWER_SEEK_LAST: - vstream->current_tracefile_id = stream->current_tracefile_id; + vstream->current_tracefile_id = + tracefile_array_get_read_file_index_head(stream->tfa); + /* + * We seek at the very end of each stream, awaiting for + * a future packet to eventually come in. + * + * We don't need to check the head position for -1ULL since the + * increment will set it to 0. + */ + vstream->index_sent_seqcount = + tracefile_array_get_seq_head(stream->tfa) + 1; break; default: - goto error; + goto error_unlock; } - if (!stream_get(stream)) { - ERR("Cannot get stream"); - goto error; - } - vstream->stream = stream; - pthread_mutex_lock(&stream->lock); /* - * If we never received an index for the current stream, delay the opening - * of the index, otherwise open it right now. + * If we never received an index for the current stream, delay + * the opening of the index, otherwise open it right now. */ - if (vstream->current_tracefile_id == stream->current_tracefile_id - && stream->total_index_received == 0) { - vstream->index_fd = NULL; + if (stream->index_file == NULL) { + vstream->index_file = NULL; } else { - int read_fd; + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_index_file_create_from_trace_chunk_read_only( + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor), + true, &vstream->index_file); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { + vstream->index_file = NULL; + } else { + goto error_unlock; + } + } + } + + /* + * If we never received a data file for the current stream, delay the + * opening, otherwise open it right now. + */ + if (stream->stream_fd) { + int fd, ret; + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + + ret = utils_stream_file_path(stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); + if (ret < 0) { + goto error_unlock; + } - read_fd = index_open(vstream->path_name, vstream->channel_name, - stream->tracefile_count, - vstream->current_tracefile_id); - if (read_fd < 0) { + status = lttng_trace_chunk_open_file( + vstream->stream_file.trace_chunk, + file_path, O_RDONLY, 0, &fd, true); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error_unlock; } - vstream->index_fd = stream_fd_create(read_fd); - if (!vstream->index_fd) { - if (close(read_fd)) { - PERROR("close"); + vstream->stream_file.fd = stream_fd_create(fd); + if (!vstream->stream_file.fd) { + if (close(fd)) { + PERROR("Failed to close viewer %sfile", + stream->is_metadata ? "metadata " : ""); } goto error_unlock; } } - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) { + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { off_t lseek_ret; - lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END); + lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END); if (lseek_ret < 0) { goto error_unlock; } - vstream->last_sent_index = stream->total_index_received; } - pthread_mutex_unlock(&stream->lock); - if (stream->is_metadata) { rcu_assign_pointer(stream->trace->viewer_metadata_stream, vstream); } + pthread_mutex_unlock(&stream->lock); /* Globally visible after the add unique. */ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - - pthread_mutex_init(&vstream->reflock, NULL); urcu_ref_init(&vstream->ref); + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); return vstream; @@ -136,6 +214,9 @@ error: if (vstream) { viewer_stream_destroy(vstream); } + if (viewer_trace_chunk && acquired_reference) { + lttng_trace_chunk_put(viewer_trace_chunk); + } return NULL; } @@ -160,34 +241,27 @@ static void viewer_stream_release(struct urcu_ref *ref) viewer_stream_unpublish(vstream); - if (vstream->stream_fd) { - stream_fd_put(vstream->stream_fd); - vstream->stream_fd = NULL; + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; } - if (vstream->index_fd) { - stream_fd_put(vstream->index_fd); - vstream->index_fd = NULL; + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; } if (vstream->stream) { stream_put(vstream->stream); vstream->stream = NULL; } + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + vstream->stream_file.trace_chunk = NULL; call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); } /* Must be called with RCU read-side lock held. */ bool viewer_stream_get(struct relay_viewer_stream *vstream) { - bool has_ref = false; - - pthread_mutex_lock(&vstream->reflock); - if (vstream->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&vstream->ref); - } - pthread_mutex_unlock(&vstream->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&vstream->ref); } /* @@ -220,45 +294,50 @@ end: void viewer_stream_put(struct relay_viewer_stream *vstream) { rcu_read_lock(); - pthread_mutex_lock(&vstream->reflock); urcu_ref_put(&vstream->ref, viewer_stream_release); - pthread_mutex_unlock(&vstream->reflock); rcu_read_unlock(); } -/* - * Returns whether the current tracefile is readable. If not, it has - * been overwritten. - * Must be called with rstream lock held. - */ -bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream, - uint64_t seq) +void viewer_stream_close_files(struct relay_viewer_stream *vstream) { - struct relay_stream *stream = vstream->stream; + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; + } + if (vstream->stream_file.fd) { + stream_fd_put(vstream->stream_file.fd); + vstream->stream_file.fd = NULL; + } +} - if (seq >= stream->oldest_tracefile_seq - && seq <= stream->current_tracefile_seq) { - /* seq is a readable file. */ - return true; - } else { - /* seq is not readable. */ - return false; +void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream) +{ + const struct relay_stream *stream = vstream->stream; + uint64_t seq_tail; + + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); + seq_tail = tracefile_array_get_seq_tail(stream->tfa); + if (seq_tail == -1ULL) { + seq_tail = 0; } + vstream->index_sent_seqcount = seq_tail; } /* * Rotate a stream to the next tracefile. * * Must be called with the rstream lock held. - * Returns 0 on success, 1 on EOF, a negative value on error. + * Returns 0 on success, 1 on EOF. */ int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - struct relay_stream *stream = vstream->stream; + uint64_t new_id; + const struct relay_stream *stream = vstream->stream; /* Detect the last tracefile to open. */ - if (stream->total_index_received == vstream->last_sent_index + if (stream->index_received_seqcount + == vstream->index_sent_seqcount && stream->trace->session->connection_closed) { ret = 1; goto end; @@ -270,43 +349,32 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) goto end; } - if (!viewer_stream_is_tracefile_seq_readable(vstream, - vstream->current_tracefile_seq + 1)) { - vstream->current_tracefile_id = - stream->oldest_tracefile_id; - vstream->current_tracefile_seq = - stream->oldest_tracefile_seq; + /* + * Try to move to the next file. + */ + new_id = (vstream->current_tracefile_id + 1) + % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, + vstream->index_sent_seqcount)) { + vstream->current_tracefile_id = new_id; } else { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + /* + * This can only be reached on overwrite, which implies there + * has been data written at some point, which will have set the + * tail. + */ + assert(seq_tail != -1ULL); + /* + * We need to resync because we lag behind tail. + */ vstream->current_tracefile_id = - (vstream->current_tracefile_id + 1) - % stream->tracefile_count; - vstream->current_tracefile_seq++; - } - - if (vstream->index_fd) { - stream_fd_put(vstream->index_fd); - vstream->index_fd = NULL; - } - if (vstream->stream_fd) { - stream_fd_put(vstream->stream_fd); - vstream->stream_fd = NULL; - } - - ret = index_open(vstream->path_name, vstream->channel_name, - stream->tracefile_count, - vstream->current_tracefile_id); - if (ret < 0) { - goto end; - } - vstream->index_fd = stream_fd_create(ret); - if (vstream->index_fd) { - ret = 0; - } else { - if (close(ret)) { - PERROR("close"); - } - ret = -1; + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; } + viewer_stream_close_files(vstream); + ret = 0; end: return ret; } @@ -316,6 +384,10 @@ void print_viewer_streams(void) struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; + if (!viewer_streams_ht) { + return; + } + rcu_read_lock(); cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {