X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=67c452eec7d4264139f1134b1fcdd4bc2aa2d49c;hp=37486293f8e8b5df965b2054978cee3aa3b46566;hb=80516611b6f19201b1e173fb448935aca7a9e668;hpb=b272577e27626d210d5e3aa45f7e9d05670682b8 diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 37486293f..67c452eec 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -1,53 +1,48 @@ /* - * Copyright (C) 2013 - Julien Desfossez - * David Goulet + * Copyright (C) 2013 Julien Desfossez + * Copyright (C) 2013 David Goulet + * Copyright (C) 2015 Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include +#include +#include +#include +#include +#include #include "lttng-relayd.h" #include "viewer-stream.h" -static void free_stream(struct relay_viewer_stream *stream) +static void viewer_stream_destroy(struct relay_viewer_stream *vstream) { - assert(stream); - - free(stream->path_name); - free(stream->channel_name); - free(stream); + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + free(vstream->path_name); + free(vstream->channel_name); + free(vstream); } -static void deferred_free_viewer_stream(struct rcu_head *head) +static void viewer_stream_destroy_rcu(struct rcu_head *head) { - struct relay_viewer_stream *stream = + struct relay_viewer_stream *vstream = caa_container_of(head, struct relay_viewer_stream, rcu_node); - free_stream(stream); + viewer_stream_destroy(vstream); } +/* Relay stream's lock must be held by the caller. */ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) + struct lttng_trace_chunk *trace_chunk, + enum lttng_viewer_seek seek_t) { - struct relay_viewer_stream *vstream; + struct relay_viewer_stream *vstream = NULL; - assert(stream); - assert(ctf_trace); + ASSERT_LOCKED(stream->lock); vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -55,229 +50,340 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->session_id = stream->session_id; - vstream->stream_handle = stream->stream_handle; - vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); + if (trace_chunk) { + const bool acquired_reference = lttng_trace_chunk_get( + trace_chunk); + + assert(acquired_reference); + } + + vstream->stream_file.trace_chunk = trace_chunk; + vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); if (vstream->path_name == NULL) { PERROR("relay viewer path_name alloc"); goto error; } - vstream->channel_name = strndup(stream->channel_name, + vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX); if (vstream->channel_name == NULL) { PERROR("relay viewer channel_name alloc"); goto error; } - vstream->tracefile_count = stream->tracefile_count; - vstream->metadata_flag = stream->metadata_flag; - vstream->tracefile_count_last = -1ULL; + + if (!stream_get(stream)) { + ERR("Cannot get stream"); + goto error; + } + vstream->stream = stream; + + if (stream->is_metadata && stream->trace->viewer_metadata_stream) { + ERR("Cannot attach viewer metadata stream to trace (busy)."); + goto error; + } switch (seek_t) { case LTTNG_VIEWER_SEEK_BEGINNING: - vstream->tracefile_count_current = stream->oldest_tracefile_id; + { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + if (seq_tail == -1ULL) { + /* + * Tail may not be initialized yet. Nonetheless, we know + * we want to send the first index once it becomes + * available. + */ + seq_tail = 0; + } + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; break; + } case LTTNG_VIEWER_SEEK_LAST: - vstream->tracefile_count_current = stream->tracefile_count_current; + vstream->current_tracefile_id = + tracefile_array_get_read_file_index_head(stream->tfa); + /* + * We seek at the very end of each stream, awaiting for + * a future packet to eventually come in. + * + * We don't need to check the head position for -1ULL since the + * increment will set it to 0. + */ + vstream->index_sent_seqcount = + tracefile_array_get_seq_head(stream->tfa) + 1; break; default: - assert(0); goto error; } - if (vstream->metadata_flag) { - ctf_trace->viewer_metadata_stream = vstream; - } - - /* Globally visible after the add unique. */ - lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - - vstream->index_read_fd = -1; - vstream->read_fd = -1; - /* - * This is to avoid a race between the initialization of this object and - * the close of the given stream. If the stream is unable to find this - * viewer stream when closing, this copy will at least take the latest - * value. We also need that for the seek_last. + * If we never received an index for the current stream, delay + * the opening of the index, otherwise open it right now. */ - vstream->total_index_received = stream->total_index_received; + if (stream->index_file == NULL) { + vstream->index_file = NULL; + } else { + const uint32_t connection_major = stream->trace->session->major; + const uint32_t connection_minor = stream->trace->session->minor; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_index_file_create_from_trace_chunk_read_only( + vstream->stream_file.trace_chunk, + stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, + connection_minor), + lttng_to_index_minor(connection_major, + connection_minor), + true, &vstream->index_file); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { + vstream->index_file = NULL; + } else { + goto error; + } + } + } /* - * If we never received an index for the current stream, delay the opening - * of the index, otherwise open it right now. + * If we never received a data file for the current stream, delay the + * opening, otherwise open it right now. */ - if (vstream->tracefile_count_current == stream->tracefile_count_current - && vstream->total_index_received == 0) { - vstream->index_read_fd = -1; - } else { - int read_fd; + if (stream->file) { + int ret; + char file_path[LTTNG_PATH_MAX]; + enum lttng_trace_chunk_status status; + + ret = utils_stream_file_path(stream->path_name, + stream->channel_name, stream->tracefile_size, + vstream->current_tracefile_id, NULL, file_path, + sizeof(file_path)); + if (ret < 0) { + goto error; + } - read_fd = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); - if (read_fd < 0) { + status = lttng_trace_chunk_open_fs_handle( + vstream->stream_file.trace_chunk, file_path, + O_RDONLY, 0, &vstream->stream_file.handle, + true); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { goto error; } - vstream->index_read_fd = read_fd; } - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) { + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { off_t lseek_ret; - lseek_ret = lseek(vstream->index_read_fd, - vstream->total_index_received * sizeof(struct ctf_packet_index), - SEEK_CUR); + lseek_ret = fs_handle_seek( + vstream->index_file->file, 0, SEEK_END); if (lseek_ret < 0) { goto error; } - vstream->last_sent_index = vstream->total_index_received; } + if (stream->is_metadata) { + rcu_assign_pointer(stream->trace->viewer_metadata_stream, + vstream); + } + + vstream->last_seen_rotation_count = stream->completed_rotation_count; + + /* Globally visible after the add unique. */ + lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); + urcu_ref_init(&vstream->ref); + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); return vstream; error: if (vstream) { - free_stream(vstream); + viewer_stream_destroy(vstream); } return NULL; } -void viewer_stream_delete(struct relay_viewer_stream *stream) +static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) { int ret; struct lttng_ht_iter iter; - iter.iter.node = &stream->stream_n.node; + iter.iter.node = &vstream->stream_n.node; ret = lttng_ht_del(viewer_streams_ht, &iter); assert(!ret); } -void viewer_stream_destroy(struct ctf_trace *ctf_trace, - struct relay_viewer_stream *stream) +static void viewer_stream_release(struct urcu_ref *ref) { - int ret; - - assert(stream); + struct relay_viewer_stream *vstream = caa_container_of(ref, + struct relay_viewer_stream, ref); - if (ctf_trace) { - ctf_trace_put_ref(ctf_trace); + if (vstream->stream->is_metadata) { + rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL); } - if (stream->read_fd >= 0) { - ret = close(stream->read_fd); - if (ret < 0) { - PERROR("close read_fd"); - } + viewer_stream_unpublish(vstream); + + if (vstream->stream_file.handle) { + fs_handle_close(vstream->stream_file.handle); + vstream->stream_file.handle = NULL; } - if (stream->index_read_fd >= 0) { - ret = close(stream->index_read_fd); - if (ret < 0) { - PERROR("close index_read_fd"); - } + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; + } + if (vstream->stream) { + stream_put(vstream->stream); + vstream->stream = NULL; } + lttng_trace_chunk_put(vstream->stream_file.trace_chunk); + vstream->stream_file.trace_chunk = NULL; + call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); +} - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); +/* Must be called with RCU read-side lock held. */ +bool viewer_stream_get(struct relay_viewer_stream *vstream) +{ + return urcu_ref_get_unless_zero(&vstream->ref); } /* - * Find viewer stream by id. RCU read side lock MUST be acquired. + * Get viewer stream by id. * - * Return stream if found else NULL. + * Return viewer stream if found else NULL. */ -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id) +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct relay_viewer_stream *stream = NULL; + struct relay_viewer_stream *vstream = NULL; + rcu_read_lock(); lttng_ht_lookup(viewer_streams_ht, &id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { DBG("Relay viewer stream %" PRIu64 " not found", id); goto end; } - stream = caa_container_of(node, struct relay_viewer_stream, stream_n); - + vstream = caa_container_of(node, struct relay_viewer_stream, stream_n); + if (!viewer_stream_get(vstream)) { + vstream = NULL; + } end: - return stream; + rcu_read_unlock(); + return vstream; +} + +void viewer_stream_put(struct relay_viewer_stream *vstream) +{ + rcu_read_lock(); + urcu_ref_put(&vstream->ref, viewer_stream_release); + rcu_read_unlock(); +} + +void viewer_stream_close_files(struct relay_viewer_stream *vstream) +{ + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; + } + if (vstream->stream_file.handle) { + fs_handle_close(vstream->stream_file.handle); + vstream->stream_file.handle = NULL; + } +} + +void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream) +{ + const struct relay_stream *stream = vstream->stream; + uint64_t seq_tail; + + vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa); + seq_tail = tracefile_array_get_seq_tail(stream->tfa); + if (seq_tail == -1ULL) { + seq_tail = 0; + } + vstream->index_sent_seqcount = seq_tail; } /* * Rotate a stream to the next tracefile. * - * Must be called with viewer_stream_rotation_lock held. - * Returns 0 on success, 1 on EOF, a negative value on error. + * Must be called with the rstream lock held. + * Returns 0 on success, 1 on EOF. */ -int viewer_stream_rotate(struct relay_viewer_stream *vstream, - struct relay_stream *stream) +int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - uint64_t tracefile_id; + uint64_t new_id; + const struct relay_stream *stream = vstream->stream; - assert(vstream); - assert(stream); - - if (vstream->tracefile_count == 0) { - /* Ignore rotation, there is none to do. */ - ret = 0; + /* Detect the last tracefile to open. */ + if (stream->index_received_seqcount + == vstream->index_sent_seqcount + && stream->trace->session->connection_closed) { + ret = 1; goto end; } - tracefile_id = (vstream->tracefile_count_current + 1) % - vstream->tracefile_count; - - /* Detect the last tracefile to open. */ - if (vstream->tracefile_count_last != -1ULL && - vstream->tracefile_count_last == - vstream->tracefile_count_current) { - ret = 1; + if (stream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; goto end; } /* - * The writer and the reader are not working in the same tracefile, we can - * read up to EOF, we don't care about the total_index_received. + * Try to move to the next file. */ - if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { - vstream->close_write_flag = 1; + new_id = (vstream->current_tracefile_id + 1) + % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, + vstream->index_sent_seqcount)) { + vstream->current_tracefile_id = new_id; } else { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + /* - * We are opening a file that is still open in write, make sure we - * limit our reading to the number of indexes received. + * This can only be reached on overwrite, which implies there + * has been data written at some point, which will have set the + * tail. */ - vstream->close_write_flag = 0; - if (stream->close_flag) { - vstream->total_index_received = stream->total_index_received; - } + assert(seq_tail != -1ULL); + /* + * We need to resync because we lag behind tail. + */ + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; } - vstream->tracefile_count_current = tracefile_id; + viewer_stream_close_files(vstream); + ret = 0; +end: + return ret; +} - ret = close(vstream->index_read_fd); - if (ret < 0) { - PERROR("close index file %d", vstream->index_read_fd); - } - vstream->index_read_fd = -1; +void print_viewer_streams(void) +{ + struct lttng_ht_iter iter; + struct relay_viewer_stream *vstream; - ret = close(vstream->read_fd); - if (ret < 0) { - PERROR("close tracefile %d", vstream->read_fd); + if (!viewer_streams_ht) { + return; } - vstream->read_fd = -1; - - pthread_mutex_lock(&vstream->overwrite_lock); - vstream->abort_flag = 0; - pthread_mutex_unlock(&vstream->overwrite_lock); - ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); - if (ret < 0) { - goto error; + rcu_read_lock(); + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, + stream_n.node) { + if (!viewer_stream_get(vstream)) { + continue; + } + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + vstream, + vstream->ref.refcount, + vstream->stream->stream_handle, + vstream->stream->trace->id, + vstream->stream->trace->session->id); + viewer_stream_put(vstream); } - vstream->index_read_fd = ret; - - ret = 0; - -end: -error: - return ret; + rcu_read_unlock(); }