/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
* David Goulet <dgoulet@efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+#include <urcu/rculist.h>
+#include <sys/stat.h>
+#include "lttng-relayd.h"
#include "index.h"
#include "stream.h"
#include "viewer-stream.h"
-static void rcu_destroy_stream(struct rcu_head *head)
+/* Should be called with RCU read-side lock held. */
+bool stream_get(struct relay_stream *stream)
{
- struct relay_stream *stream =
- caa_container_of(head, struct relay_stream, rcu_node);
+ bool has_ref = false;
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
+ pthread_mutex_lock(&stream->reflock);
+ if (stream->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&stream->ref);
+ }
+ pthread_mutex_unlock(&stream->reflock);
+
+ return has_ref;
}
/*
- * Get stream from stream id from the given hash table. Return stream if found
- * else NULL.
- *
- * Need to be called with RCU read-side lock held.
+ * Get stream from stream id from the streams hash table. Return stream
+ * if found else NULL. A stream reference is taken when a stream is
+ * returned. stream_put() must be called on that stream.
*/
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
- uint64_t stream_id)
+struct relay_stream *stream_get_by_id(uint64_t stream_id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
struct relay_stream *stream = NULL;
- assert(ht);
-
- lttng_ht_lookup(ht, &stream_id, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node == NULL) {
+ if (!node) {
DBG("Relay stream %" PRIu64 " not found", stream_id);
goto end;
}
stream = caa_container_of(node, struct relay_stream, node);
-
+ if (!stream_get(stream)) {
+ stream = NULL;
+ }
end:
+ rcu_read_unlock();
return stream;
}
/*
- * Close a given stream. If an assosiated viewer stream exists it is updated.
- *
- * RCU read side lock MUST be acquired.
- *
- * Return 0 if close was successful or 1 if already closed.
+ * We keep ownership of path_name and channel_name.
*/
-int stream_close(struct relay_session *session, struct relay_stream *stream)
+struct relay_stream *stream_create(struct ctf_trace *trace,
+ uint64_t stream_handle, char *path_name,
+ char *channel_name, uint64_t tracefile_size,
+ uint64_t tracefile_count)
{
- int delret, ret;
- struct relay_viewer_stream *vstream;
- struct ctf_trace *ctf_trace;
+ int ret;
+ struct relay_stream *stream = NULL;
+ struct relay_session *session = trace->session;
- assert(stream);
+ stream = zmalloc(sizeof(struct relay_stream));
+ if (stream == NULL) {
+ PERROR("relay stream zmalloc");
+ ret = -1;
+ goto error_no_alloc;
+ }
- pthread_mutex_lock(&stream->lock);
+ stream->stream_handle = stream_handle;
+ stream->prev_seq = -1ULL;
+ stream->ctf_stream_id = -1ULL;
+ stream->tracefile_size = tracefile_size;
+ stream->tracefile_count = tracefile_count;
+ stream->path_name = path_name;
+ stream->channel_name = channel_name;
+ lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
+ pthread_mutex_init(&stream->lock, NULL);
+ pthread_mutex_init(&stream->reflock, NULL);
+ urcu_ref_init(&stream->ref);
+ ctf_trace_get(trace);
+ stream->trace = trace;
- if (stream->terminated_flag) {
- /* This stream is already closed. Ignore. */
- ret = 1;
- goto end_unlock;
+ stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!stream->indexes_ht) {
+ ERR("Cannot created indexes_ht");
+ ret = -1;
+ goto end;
}
- DBG("Closing stream id %" PRIu64, stream->stream_handle);
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
+ -1, -1);
+ if (ret < 0) {
+ ERR("relay creating output directory");
+ goto end;
+ }
- if (stream->fd >= 0) {
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
+ /*
+ * No need to use run_as API here because whatever we receive,
+ * the relayd uses its own credentials for the stream files.
+ */
+ ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, -1, -1, NULL);
+ if (ret < 0) {
+ ERR("Create output file");
+ goto end;
+ }
+ stream->stream_fd = stream_fd_create(ret);
+ if (!stream->stream_fd) {
+ if (close(ret)) {
+ PERROR("Error closing file %d", ret);
}
+ ret = -1;
+ goto end;
+ }
+ if (stream->tracefile_size) {
+ DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
+ } else {
+ DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
+ }
+
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ stream->is_metadata = 1;
}
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
+ stream->in_recv_list = true;
+
+ /*
+ * Add the stream in the recv list of the session. Once the end stream
+ * message is received, all session streams are published.
+ */
+ pthread_mutex_lock(&session->recv_list_lock);
+ cds_list_add_rcu(&stream->recv_node, &session->recv_list);
+ session->stream_count++;
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ /*
+ * Both in the ctf_trace object and the global stream ht since the data
+ * side of the relayd does not have the concept of session.
+ */
+ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+
+ DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
+ stream->stream_handle);
+ ret = 0;
+
+end:
+ if (ret) {
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
}
+ stream_put(stream);
+ stream = NULL;
}
+ return stream;
- vstream = viewer_stream_find_by_id(stream->stream_handle);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- vstream->total_index_received = stream->total_index_received;
- vstream->tracefile_count_last = stream->tracefile_count_current;
- vstream->close_write_flag = 1;
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+error_no_alloc:
+ /*
+ * path_name and channel_name need to be freed explicitly here
+ * because we cannot rely on stream_put().
+ */
+ free(path_name);
+ free(channel_name);
+ return NULL;
+}
+
+/*
+ * Called with the session lock held.
+ */
+void stream_publish(struct relay_stream *stream)
+{
+ struct relay_session *session;
+
+ pthread_mutex_lock(&stream->lock);
+ if (stream->published) {
+ goto unlock;
}
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle);
+ session = stream->trace->session;
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
- ctf_trace_put_ref(ctf_trace);
+ pthread_mutex_lock(&session->recv_list_lock);
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
- stream->close_flag = 1;
- stream->terminated_flag = 1;
- ret = 0;
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
-end_unlock:
+ stream->published = true;
+unlock:
pthread_mutex_unlock(&stream->lock);
- return ret;
}
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
+/*
+ * Only called from destroy. No stream lock needed, since there is a
+ * single user at this point. This is ensured by having the refcount
+ * reaching 0.
+ */
+static void stream_unpublish(struct relay_stream *stream)
+{
+ if (!stream->published) {
+ return;
+ }
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_del_rcu(&stream->stream_node);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+
+ stream->published = false;
+}
+
+static void stream_destroy(struct relay_stream *stream)
+{
+ if (stream->indexes_ht) {
+ lttng_ht_destroy(stream->indexes_ht);
+ }
+ free(stream->path_name);
+ free(stream->channel_name);
+ free(stream);
+}
+
+static void stream_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_stream *stream =
+ caa_container_of(rcu_head, struct relay_stream, rcu_node);
+
+ stream_destroy(stream);
+}
+
+/*
+ * No need to take stream->lock since this is only called on the final
+ * stream_put which ensures that a single thread may act on the stream.
+ *
+ * At that point, the object is also protected by the reflock which
+ * guarantees that no other thread may share ownership of this stream.
+ */
+static void stream_release(struct urcu_ref *ref)
{
+ struct relay_stream *stream =
+ caa_container_of(ref, struct relay_stream, ref);
+ struct relay_session *session;
int ret;
struct lttng_ht_iter iter;
- assert(ht);
- assert(stream);
+ session = stream->trace->session;
+
+ DBG("Releasing stream id %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ session->stream_count--;
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(ht, &iter);
+ ret = lttng_ht_del(relay_streams_ht, &iter);
assert(!ret);
- cds_list_del(&stream->trace_list);
+ stream_unpublish(stream);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+ if (stream->trace) {
+ ctf_trace_put(stream->trace);
+ stream->trace = NULL;
+ }
+
+ call_rcu(&stream->rcu_node, stream_destroy_rcu);
+}
+
+void stream_put(struct relay_stream *stream)
+{
+ DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+ /*
+ * Ensure existence of stream->reflock for stream unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Stream reflock ensures that concurrent test and update of
+ * stream ref is atomic.
+ */
+ pthread_mutex_lock(&stream->reflock);
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ DBG("stream put stream id %" PRIu64 " refcount %d",
+ stream->stream_handle,
+ (int) stream->ref.refcount);
+ urcu_ref_put(&stream->ref, stream_release);
+ pthread_mutex_unlock(&stream->reflock);
+ rcu_read_unlock();
+}
+
+void stream_close(struct relay_stream *stream)
+{
+ DBG("closing stream %" PRIu64, stream->stream_handle);
+ pthread_mutex_lock(&stream->lock);
+ stream->closed = true;
+ relay_index_close_all(stream);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
}
-void stream_destroy(struct relay_stream *stream)
+void print_relay_streams(void)
{
- assert(stream);
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
- call_rcu(&stream->rcu_node, rcu_destroy_stream);
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ stream,
+ stream->ref.refcount,
+ stream->stream_handle,
+ stream->trace->id,
+ stream->trace->session->id);
+ stream_put(stream);
+ }
+ rcu_read_unlock();
}