X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=335a1cf5f79de497f3002beb3ae0a2a983da355f;hp=fff8065b0da3d269eee76a11b73115f36602effe;hb=ce3f3ba3aee62c0a317b448c2f19578ab7f057e4;hpb=2a174661a1e0ab551b41ff1cae7191688525fc1f diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index fff8065b0..335a1cf5f 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,138 +17,454 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include +#include +#include +#include +#include +#include "lttng-relayd.h" #include "index.h" #include "stream.h" #include "viewer-stream.h" -static void rcu_destroy_stream(struct rcu_head *head) +/* Should be called with RCU read-side lock held. */ +bool stream_get(struct relay_stream *stream) { - struct relay_stream *stream = - caa_container_of(head, struct relay_stream, rcu_node); + bool has_ref = false; - free(stream->path_name); - free(stream->channel_name); - free(stream); + pthread_mutex_lock(&stream->reflock); + if (stream->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&stream->ref); + } + pthread_mutex_unlock(&stream->reflock); + + return has_ref; } /* - * Get stream from stream id from the given hash table. Return stream if found - * else NULL. - * - * Need to be called with RCU read-side lock held. + * Get stream from stream id from the streams hash table. Return stream + * if found else NULL. A stream reference is taken when a stream is + * returned. stream_put() must be called on that stream. */ -struct relay_stream *stream_find_by_id(struct lttng_ht *ht, - uint64_t stream_id) +struct relay_stream *stream_get_by_id(uint64_t stream_id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_stream *stream = NULL; - assert(ht); - - lttng_ht_lookup(ht, &stream_id, &iter); + rcu_read_lock(); + lttng_ht_lookup(relay_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); - if (node == NULL) { + if (!node) { DBG("Relay stream %" PRIu64 " not found", stream_id); goto end; } stream = caa_container_of(node, struct relay_stream, node); - + if (!stream_get(stream)) { + stream = NULL; + } end: + rcu_read_unlock(); return stream; } /* - * Close a given stream. If an assosiated viewer stream exists it is updated. - * - * RCU read side lock MUST be acquired. - * - * Return 0 if close was successful or 1 if already closed. + * We keep ownership of path_name and channel_name. */ -int stream_close(struct relay_session *session, struct relay_stream *stream) +struct relay_stream *stream_create(struct ctf_trace *trace, + uint64_t stream_handle, char *path_name, + char *channel_name, uint64_t tracefile_size, + uint64_t tracefile_count) { - int delret, ret; - struct relay_viewer_stream *vstream; - struct ctf_trace *ctf_trace; + int ret; + struct relay_stream *stream = NULL; + struct relay_session *session = trace->session; - assert(stream); + stream = zmalloc(sizeof(struct relay_stream)); + if (stream == NULL) { + PERROR("relay stream zmalloc"); + ret = -1; + goto error_no_alloc; + } - pthread_mutex_lock(&stream->lock); + stream->stream_handle = stream_handle; + stream->prev_seq = -1ULL; + stream->last_net_seq_num = -1ULL; + stream->ctf_stream_id = -1ULL; + stream->tracefile_size = tracefile_size; + stream->tracefile_count = tracefile_count; + stream->path_name = path_name; + stream->channel_name = channel_name; + lttng_ht_node_init_u64(&stream->node, stream->stream_handle); + pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->reflock, NULL); + urcu_ref_init(&stream->ref); + ctf_trace_get(trace); + stream->trace = trace; - if (stream->terminated_flag) { - /* This stream is already closed. Ignore. */ - ret = 1; - goto end_unlock; + stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!stream->indexes_ht) { + ERR("Cannot created indexes_ht"); + ret = -1; + goto end; } - DBG("Closing stream id %" PRIu64, stream->stream_handle); + ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, + -1, -1); + if (ret < 0) { + ERR("relay creating output directory"); + goto end; + } - if (stream->fd >= 0) { - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); + /* + * No need to use run_as API here because whatever we receive, + * the relayd uses its own credentials for the stream files. + */ + ret = utils_create_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, 0, -1, -1, NULL); + if (ret < 0) { + ERR("Create output file"); + goto end; + } + stream->stream_fd = stream_fd_create(ret); + if (!stream->stream_fd) { + if (close(ret)) { + PERROR("Error closing file %d", ret); } + ret = -1; + goto end; + } + stream->tfa = tracefile_array_create(stream->tracefile_count); + if (!stream->tfa) { + ret = -1; + goto end; + } + if (stream->tracefile_size) { + DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); + } else { + DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); } - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) { + stream->is_metadata = 1; + } + + stream->in_recv_list = true; + + /* + * Add the stream in the recv list of the session. Once the end stream + * message is received, all session streams are published. + */ + pthread_mutex_lock(&session->recv_list_lock); + cds_list_add_rcu(&stream->recv_node, &session->recv_list); + session->stream_count++; + pthread_mutex_unlock(&session->recv_list_lock); + + /* + * Both in the ctf_trace object and the global stream ht since the data + * side of the relayd does not have the concept of session. + */ + lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + stream->in_stream_ht = true; + + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, + stream->stream_handle); + ret = 0; + +end: + if (ret) { + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; } + stream_put(stream); + stream = NULL; + } + return stream; + +error_no_alloc: + /* + * path_name and channel_name need to be freed explicitly here + * because we cannot rely on stream_put(). + */ + free(path_name); + free(channel_name); + return NULL; +} + +/* + * Called with the session lock held. + */ +void stream_publish(struct relay_stream *stream) +{ + struct relay_session *session; + + pthread_mutex_lock(&stream->lock); + if (stream->published) { + goto unlock; + } + + session = stream->trace->session; + + pthread_mutex_lock(&session->recv_list_lock); + if (stream->in_recv_list) { + cds_list_del_rcu(&stream->recv_node); + stream->in_recv_list = false; + } + pthread_mutex_unlock(&session->recv_list_lock); + + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + + stream->published = true; +unlock: + pthread_mutex_unlock(&stream->lock); +} + +/* + * Stream must be protected by holding the stream lock or by virtue of being + * called from stream_destroy, in which case it is guaranteed to be accessed + * from a single thread by the reflock. + */ +static void stream_unpublish(struct relay_stream *stream) +{ + if (stream->in_stream_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(relay_streams_ht, &iter); + assert(!ret); + stream->in_stream_ht = false; + } + if (stream->published) { + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_del_rcu(&stream->stream_node); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + stream->published = false; } +} - vstream = viewer_stream_find_by_id(stream->stream_handle); - if (vstream) { +static void stream_destroy(struct relay_stream *stream) +{ + if (stream->indexes_ht) { /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. + * Calling lttng_ht_destroy in call_rcu worker thread so + * we don't hold the RCU read-side lock while calling + * it. */ - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - vstream->total_index_received = stream->total_index_received; - vstream->tracefile_count_last = stream->tracefile_count_current; - vstream->close_write_flag = 1; - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); + lttng_ht_destroy(stream->indexes_ht); } + if (stream->tfa) { + tracefile_array_destroy(stream->tfa); + } + free(stream->path_name); + free(stream->channel_name); + free(stream); +} - /* Cleanup index of that stream. */ - relay_index_destroy_by_stream_id(stream->stream_handle); +static void stream_destroy_rcu(struct rcu_head *rcu_head) +{ + struct relay_stream *stream = + caa_container_of(rcu_head, struct relay_stream, rcu_node); - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - stream->path_name); - assert(ctf_trace); - ctf_trace_put_ref(ctf_trace); + stream_destroy(stream); +} - stream->terminated_flag = 1; - ret = 0; +/* + * No need to take stream->lock since this is only called on the final + * stream_put which ensures that a single thread may act on the stream. + * + * At that point, the object is also protected by the reflock which + * guarantees that no other thread may share ownership of this stream. + */ +static void stream_release(struct urcu_ref *ref) +{ + struct relay_stream *stream = + caa_container_of(ref, struct relay_stream, ref); + struct relay_session *session; -end_unlock: - pthread_mutex_unlock(&stream->lock); - return ret; + session = stream->trace->session; + + DBG("Releasing stream id %" PRIu64, stream->stream_handle); + + pthread_mutex_lock(&session->recv_list_lock); + session->stream_count--; + if (stream->in_recv_list) { + cds_list_del_rcu(&stream->recv_node); + stream->in_recv_list = false; + } + pthread_mutex_unlock(&session->recv_list_lock); + + stream_unpublish(stream); + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + if (stream->index_fd) { + stream_fd_put(stream->index_fd); + stream->index_fd = NULL; + } + if (stream->trace) { + ctf_trace_put(stream->trace); + stream->trace = NULL; + } + + call_rcu(&stream->rcu_node, stream_destroy_rcu); } -void stream_delete(struct lttng_ht *ht, struct relay_stream *stream) +void stream_put(struct relay_stream *stream) { - int ret; - struct lttng_ht_iter iter; + DBG("stream put for stream id %" PRIu64, stream->stream_handle); + /* + * Ensure existence of stream->reflock for stream unlock. + */ + rcu_read_lock(); + /* + * Stream reflock ensures that concurrent test and update of + * stream ref is atomic. + */ + pthread_mutex_lock(&stream->reflock); + assert(stream->ref.refcount != 0); + /* + * Wait until we have processed all the stream packets before + * actually putting our last stream reference. + */ + DBG("stream put stream id %" PRIu64 " refcount %d", + stream->stream_handle, + (int) stream->ref.refcount); + urcu_ref_put(&stream->ref, stream_release); + pthread_mutex_unlock(&stream->reflock); + rcu_read_unlock(); +} + +void try_stream_close(struct relay_stream *stream) +{ + DBG("Trying to close stream %" PRIu64, stream->stream_handle); + pthread_mutex_lock(&stream->lock); + /* + * Can be called concurently by connection close and reception of last + * pending data. + */ + if (stream->closed) { + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle); + return; + } - assert(ht); - assert(stream); + stream->close_requested = true; - iter.iter.node = &stream->node.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); + if (stream->last_net_seq_num == -1ULL) { + /* + * Handle connection close without explicit stream close + * command. + * + * We can be clever about indexes partially received in + * cases where we received the data socket part, but not + * the control socket part: since we're currently closing + * the stream on behalf of the control socket, we *know* + * there won't be any more control information for this + * socket. Therefore, we can destroy all indexes for + * which we have received only the file descriptor (from + * data socket). This takes care of consumerd crashes + * between sending the data and control information for + * a packet. Since those are sent in that order, we take + * care of consumerd crashes. + */ + relay_index_close_partial_fd(stream); + /* + * Use the highest net_seq_num we currently have pending + * As end of stream indicator. Leave last_net_seq_num + * at -1ULL if we cannot find any index. + */ + stream->last_net_seq_num = relay_index_find_last(stream); + /* Fall-through into the next check. */ + } - cds_list_del(&stream->trace_list); + if (stream->last_net_seq_num != -1ULL && + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + /* + * Don't close since we still have data pending. This + * handles cases where an explicit close command has + * been received for this stream, and cases where the + * connection has been closed, and we are awaiting for + * index information from the data socket. It is + * therefore expected that all the index fd information + * we need has already been received on the control + * socket. Matching index information from data socket + * should be Expected Soon(TM). + * + * TODO: We should implement a timer to garbage collect + * streams after a timeout to be resilient against a + * consumerd implementation that would not match this + * expected behavior. + */ + pthread_mutex_unlock(&stream->lock); + DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); + return; + } + /* + * We received all the indexes we can expect. + */ + stream_unpublish(stream); + stream->closed = true; + /* Relay indexes are only used by the "consumer/sessiond" end. */ + relay_index_close_all(stream); + pthread_mutex_unlock(&stream->lock); + DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle); + stream_put(stream); } -void stream_destroy(struct relay_stream *stream) +static void print_stream_indexes(struct relay_stream *stream) { - assert(stream); + struct lttng_ht_iter iter; + struct relay_index *index; - call_rcu(&stream->rcu_node, rcu_destroy_stream); + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index, + index_n.node) { + DBG("index %p net_seq_num %" PRIu64 " refcount %ld" + " stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + index, + index->index_n.key, + stream->ref.refcount, + index->stream->stream_handle, + index->stream->trace->id, + index->stream->trace->session->id); + } + rcu_read_unlock(); +} + +void print_relay_streams(void) +{ + struct lttng_ht_iter iter; + struct relay_stream *stream; + + if (!relay_streams_ht) { + return; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + node.node) { + if (!stream_get(stream)) { + continue; + } + DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + stream, + stream->ref.refcount, + stream->stream_handle, + stream->trace->id, + stream->trace->session->id); + print_stream_indexes(stream); + stream_put(stream); + } + rcu_read_unlock(); }