X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fviewer-stream.c;h=7c59cd05d88ea3797569e87d8d964d67aa61e562;hp=5ab7be790de867f42676f8c6c84bd2e86bfb7083;hb=ce3f3ba3aee62c0a317b448c2f19578ab7f057e4;hpb=56611b069e7d0a64148cd990f88b4090298a9a0f diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 5ab7be790..7c59cd05d 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,259 +17,345 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include +#include #include "lttng-relayd.h" #include "viewer-stream.h" -static void free_stream(struct relay_viewer_stream *stream) +static void viewer_stream_destroy(struct relay_viewer_stream *vstream) { - assert(stream); - - free(stream->path_name); - free(stream->channel_name); - free(stream); + free(vstream->path_name); + free(vstream->channel_name); + free(vstream); } -static void deferred_free_viewer_stream(struct rcu_head *head) +static void viewer_stream_destroy_rcu(struct rcu_head *head) { - struct relay_viewer_stream *stream = + struct relay_viewer_stream *vstream = caa_container_of(head, struct relay_viewer_stream, rcu_node); - free_stream(stream); + viewer_stream_destroy(vstream); } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) + enum lttng_viewer_seek seek_t) { struct relay_viewer_stream *vstream; - assert(stream); - assert(ctf_trace); - vstream = zmalloc(sizeof(*vstream)); if (!vstream) { PERROR("relay viewer stream zmalloc"); goto error; } - vstream->session_id = stream->session_id; - vstream->stream_handle = stream->stream_handle; - vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); - vstream->channel_name = strndup(stream->channel_name, + vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); + if (vstream->path_name == NULL) { + PERROR("relay viewer path_name alloc"); + goto error; + } + vstream->channel_name = lttng_strndup(stream->channel_name, LTTNG_VIEWER_NAME_MAX); - vstream->tracefile_count = stream->tracefile_count; - vstream->metadata_flag = stream->metadata_flag; - vstream->tracefile_count_last = -1ULL; + if (vstream->channel_name == NULL) { + PERROR("relay viewer channel_name alloc"); + goto error; + } + + if (!stream_get(stream)) { + ERR("Cannot get stream"); + goto error; + } + vstream->stream = stream; + + pthread_mutex_lock(&stream->lock); + + if (stream->is_metadata && stream->trace->viewer_metadata_stream) { + ERR("Cannot attach viewer metadata stream to trace (busy)."); + goto error_unlock; + } switch (seek_t) { case LTTNG_VIEWER_SEEK_BEGINNING: - vstream->tracefile_count_current = stream->oldest_tracefile_id; + { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + + if (seq_tail == -1ULL) { + /* + * Tail may not be initialized yet. Nonetheless, we know + * we want to send the first index once it becomes + * available. + */ + seq_tail = 0; + } + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; break; + } case LTTNG_VIEWER_SEEK_LAST: - vstream->tracefile_count_current = stream->tracefile_count_current; + vstream->current_tracefile_id = + tracefile_array_get_file_index_head(stream->tfa); + /* + * We seek at the very end of each stream, awaiting for + * a future packet to eventually come in. + * + * We don't need to check the head position for -1ULL since the + * increment will set it to 0. + */ + vstream->index_sent_seqcount = + tracefile_array_get_seq_head(stream->tfa) + 1; break; default: - assert(0); - goto error; + goto error_unlock; } - if (vstream->metadata_flag) { - ctf_trace->viewer_metadata_stream = vstream; - } - - /* Globally visible after the add unique. */ - lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - - vstream->index_read_fd = -1; - vstream->read_fd = -1; - - /* - * This is to avoid a race between the initialization of this object and - * the close of the given stream. If the stream is unable to find this - * viewer stream when closing, this copy will at least take the latest - * value. We also need that for the seek_last. - */ - vstream->total_index_received = stream->total_index_received; - /* - * If we never received an index for the current stream, delay the opening - * of the index, otherwise open it right now. + * If we never received an index for the current stream, delay + * the opening of the index, otherwise open it right now. */ - if (vstream->tracefile_count_current == stream->tracefile_count_current - && vstream->total_index_received == 0) { - vstream->index_read_fd = -1; + if (stream->index_received_seqcount == 0) { + vstream->index_fd = NULL; } else { int read_fd; read_fd = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + stream->tracefile_count, + vstream->current_tracefile_id); if (read_fd < 0) { - goto error; + goto error_unlock; + } + vstream->index_fd = stream_fd_create(read_fd); + if (!vstream->index_fd) { + if (close(read_fd)) { + PERROR("close"); + } + goto error_unlock; } - vstream->index_read_fd = read_fd; } - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) { + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) { off_t lseek_ret; - lseek_ret = lseek(vstream->index_read_fd, - vstream->total_index_received * sizeof(struct ctf_packet_index), - SEEK_CUR); + lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END); if (lseek_ret < 0) { - goto error; + goto error_unlock; } - vstream->last_sent_index = vstream->total_index_received; } + if (stream->is_metadata) { + rcu_assign_pointer(stream->trace->viewer_metadata_stream, + vstream); + } + pthread_mutex_unlock(&stream->lock); + + /* Globally visible after the add unique. */ + lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); + + pthread_mutex_init(&vstream->reflock, NULL); + urcu_ref_init(&vstream->ref); return vstream; +error_unlock: + pthread_mutex_unlock(&stream->lock); error: if (vstream) { - free_stream(vstream); + viewer_stream_destroy(vstream); } return NULL; } -void viewer_stream_delete(struct relay_viewer_stream *stream) +static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) { int ret; struct lttng_ht_iter iter; - iter.iter.node = &stream->stream_n.node; + iter.iter.node = &vstream->stream_n.node; ret = lttng_ht_del(viewer_streams_ht, &iter); assert(!ret); } -void viewer_stream_destroy(struct ctf_trace *ctf_trace, - struct relay_viewer_stream *stream) +static void viewer_stream_release(struct urcu_ref *ref) { - int ret; - - assert(stream); + struct relay_viewer_stream *vstream = caa_container_of(ref, + struct relay_viewer_stream, ref); - if (ctf_trace) { - ctf_trace_put_ref(ctf_trace); + if (vstream->stream->is_metadata) { + rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL); } - if (stream->read_fd >= 0) { - ret = close(stream->read_fd); - if (ret < 0) { - PERROR("close read_fd"); - } + viewer_stream_unpublish(vstream); + + if (vstream->stream_fd) { + stream_fd_put(vstream->stream_fd); + vstream->stream_fd = NULL; } - if (stream->index_read_fd >= 0) { - ret = close(stream->index_read_fd); - if (ret < 0) { - PERROR("close index_read_fd"); - } + if (vstream->index_fd) { + stream_fd_put(vstream->index_fd); + vstream->index_fd = NULL; + } + if (vstream->stream) { + stream_put(vstream->stream); + vstream->stream = NULL; + } + call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); +} + +/* Must be called with RCU read-side lock held. */ +bool viewer_stream_get(struct relay_viewer_stream *vstream) +{ + bool has_ref = false; + + pthread_mutex_lock(&vstream->reflock); + if (vstream->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&vstream->ref); } + pthread_mutex_unlock(&vstream->reflock); - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); + return has_ref; } /* - * Find viewer stream by id. RCU read side lock MUST be acquired. + * Get viewer stream by id. * - * Return stream if found else NULL. + * Return viewer stream if found else NULL. */ -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id) +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct relay_viewer_stream *stream = NULL; + struct relay_viewer_stream *vstream = NULL; + rcu_read_lock(); lttng_ht_lookup(viewer_streams_ht, &id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { DBG("Relay viewer stream %" PRIu64 " not found", id); goto end; } - stream = caa_container_of(node, struct relay_viewer_stream, stream_n); - + vstream = caa_container_of(node, struct relay_viewer_stream, stream_n); + if (!viewer_stream_get(vstream)) { + vstream = NULL; + } end: - return stream; + rcu_read_unlock(); + return vstream; +} + +void viewer_stream_put(struct relay_viewer_stream *vstream) +{ + rcu_read_lock(); + pthread_mutex_lock(&vstream->reflock); + urcu_ref_put(&vstream->ref, viewer_stream_release); + pthread_mutex_unlock(&vstream->reflock); + rcu_read_unlock(); } /* * Rotate a stream to the next tracefile. * - * Must be called with viewer_stream_rotation_lock held. + * Must be called with the rstream lock held. * Returns 0 on success, 1 on EOF, a negative value on error. */ -int viewer_stream_rotate(struct relay_viewer_stream *vstream, - struct relay_stream *stream) +int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - uint64_t tracefile_id; - - assert(vstream); - assert(stream); + struct relay_stream *stream = vstream->stream; + uint64_t new_id; - if (vstream->tracefile_count == 0) { - /* Ignore rotation, there is none to do. */ - ret = 0; + /* Detect the last tracefile to open. */ + if (stream->index_received_seqcount + == vstream->index_sent_seqcount + && stream->trace->session->connection_closed) { + ret = 1; goto end; } - tracefile_id = (vstream->tracefile_count_current + 1) % - vstream->tracefile_count; - - /* Detect the last tracefile to open. */ - if (vstream->tracefile_count_last != -1ULL && - vstream->tracefile_count_last == - vstream->tracefile_count_current) { - ret = 1; + if (stream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; goto end; } /* - * The writer and the reader are not working in the same tracefile, we can - * read up to EOF, we don't care about the total_index_received. + * Try to move to the next file. */ - if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { - vstream->close_write_flag = 1; + new_id = (vstream->current_tracefile_id + 1) + % stream->tracefile_count; + if (tracefile_array_seq_in_file(stream->tfa, new_id, + vstream->index_sent_seqcount)) { + vstream->current_tracefile_id = new_id; } else { + uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa); + /* - * We are opening a file that is still open in write, make sure we - * limit our reading to the number of indexes received. + * This can only be reached on overwrite, which implies there + * has been data written at some point, which will have set the + * tail. */ - vstream->close_write_flag = 0; - if (stream->close_flag) { - vstream->total_index_received = stream->total_index_received; - } + assert(seq_tail != -1ULL); + /* + * We need to resync because we lag behind tail. + */ + vstream->current_tracefile_id = + tracefile_array_get_file_index_tail(stream->tfa); + vstream->index_sent_seqcount = seq_tail; } - vstream->tracefile_count_current = tracefile_id; - ret = close(vstream->index_read_fd); - if (ret < 0) { - PERROR("close index file %d", vstream->index_read_fd); + if (vstream->index_fd) { + stream_fd_put(vstream->index_fd); + vstream->index_fd = NULL; } - vstream->index_read_fd = -1; - - ret = close(vstream->read_fd); - if (ret < 0) { - PERROR("close tracefile %d", vstream->read_fd); + if (vstream->stream_fd) { + stream_fd_put(vstream->stream_fd); + vstream->stream_fd = NULL; } - vstream->read_fd = -1; - - pthread_mutex_lock(&vstream->overwrite_lock); - vstream->abort_flag = 0; - pthread_mutex_unlock(&vstream->overwrite_lock); ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + stream->tracefile_count, + vstream->current_tracefile_id); if (ret < 0) { - goto error; + goto end; + } + vstream->index_fd = stream_fd_create(ret); + if (vstream->index_fd) { + ret = 0; + } else { + if (close(ret)) { + PERROR("close"); + } + ret = -1; } - vstream->index_read_fd = ret; - - ret = 0; - end: -error: return ret; } + +void print_viewer_streams(void) +{ + struct lttng_ht_iter iter; + struct relay_viewer_stream *vstream; + + if (!viewer_streams_ht) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, + stream_n.node) { + if (!viewer_stream_get(vstream)) { + continue; + } + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + vstream, + vstream->ref.refcount, + vstream->stream->stream_handle, + vstream->stream->trace->id, + vstream->stream->trace->session->id); + viewer_stream_put(vstream); + } + rcu_read_unlock(); +}