* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
#include <common/utils.h>
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
- bool has_ref = false;
-
- pthread_mutex_lock(&stream->reflock);
- if (stream->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&stream->ref);
- }
- pthread_mutex_unlock(&stream->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&stream->ref);
}
/*
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
- ret = -1;
goto error_no_alloc;
}
stream->stream_handle = stream_handle;
stream->prev_seq = -1ULL;
+ stream->last_net_seq_num = -1ULL;
stream->ctf_stream_id = -1ULL;
stream->tracefile_size = tracefile_size;
stream->tracefile_count = tracefile_count;
stream->channel_name = channel_name;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
- pthread_mutex_init(&stream->reflock, NULL);
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) {
stream->is_metadata = 1;
}
/*
* Stream must be protected by holding the stream lock or by virtue of being
- * called from stream_destroy, in which case it is guaranteed to be accessed
- * from a single thread by the reflock.
+ * called from stream_destroy.
*/
static void stream_unpublish(struct relay_stream *stream)
{
static void stream_destroy(struct relay_stream *stream)
{
if (stream->indexes_ht) {
+ /*
+ * Calling lttng_ht_destroy in call_rcu worker thread so
+ * we don't hold the RCU read-side lock while calling
+ * it.
+ */
lttng_ht_destroy(stream->indexes_ht);
}
if (stream->tfa) {
/*
* No need to take stream->lock since this is only called on the final
* stream_put which ensures that a single thread may act on the stream.
- *
- * At that point, the object is also protected by the reflock which
- * guarantees that no other thread may share ownership of this stream.
*/
static void stream_release(struct urcu_ref *ref)
{
stream_fd_put(stream->stream_fd);
stream->stream_fd = NULL;
}
- if (stream->index_fd) {
- stream_fd_put(stream->index_fd);
- stream->index_fd = NULL;
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
}
if (stream->trace) {
ctf_trace_put(stream->trace);
void stream_put(struct relay_stream *stream)
{
DBG("stream put for stream id %" PRIu64, stream->stream_handle);
- /*
- * Ensure existence of stream->reflock for stream unlock.
- */
rcu_read_lock();
- /*
- * Stream reflock ensures that concurrent test and update of
- * stream ref is atomic.
- */
- pthread_mutex_lock(&stream->reflock);
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
stream->stream_handle,
(int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
- pthread_mutex_unlock(&stream->reflock);
rcu_read_unlock();
}
-void stream_close(struct relay_stream *stream)
+void try_stream_close(struct relay_stream *stream)
{
- DBG("closing stream %" PRIu64, stream->stream_handle);
+ bool session_aborted;
+ struct relay_session *session = stream->trace->session;
+
+ DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->lock);
+ session_aborted = session->aborted;
+ pthread_mutex_unlock(&session->lock);
+
pthread_mutex_lock(&stream->lock);
+ /*
+ * Can be called concurently by connection close and reception of last
+ * pending data.
+ */
+ if (stream->closed) {
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+ return;
+ }
+
+ stream->close_requested = true;
+
+ if (stream->last_net_seq_num == -1ULL) {
+ /*
+ * Handle connection close without explicit stream close
+ * command.
+ *
+ * We can be clever about indexes partially received in
+ * cases where we received the data socket part, but not
+ * the control socket part: since we're currently closing
+ * the stream on behalf of the control socket, we *know*
+ * there won't be any more control information for this
+ * socket. Therefore, we can destroy all indexes for
+ * which we have received only the file descriptor (from
+ * data socket). This takes care of consumerd crashes
+ * between sending the data and control information for
+ * a packet. Since those are sent in that order, we take
+ * care of consumerd crashes.
+ */
+ relay_index_close_partial_fd(stream);
+ /*
+ * Use the highest net_seq_num we currently have pending
+ * As end of stream indicator. Leave last_net_seq_num
+ * at -1ULL if we cannot find any index.
+ */
+ stream->last_net_seq_num = relay_index_find_last(stream);
+ /* Fall-through into the next check. */
+ }
+
+ if (stream->last_net_seq_num != -1ULL &&
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+ && !session_aborted) {
+ /*
+ * Don't close since we still have data pending. This
+ * handles cases where an explicit close command has
+ * been received for this stream, and cases where the
+ * connection has been closed, and we are awaiting for
+ * index information from the data socket. It is
+ * therefore expected that all the index fd information
+ * we need has already been received on the control
+ * socket. Matching index information from data socket
+ * should be Expected Soon(TM).
+ *
+ * TODO: We should implement a timer to garbage collect
+ * streams after a timeout to be resilient against a
+ * consumerd implementation that would not match this
+ * expected behavior.
+ */
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+ return;
+ }
+ /*
+ * We received all the indexes we can expect.
+ */
stream_unpublish(stream);
stream->closed = true;
+ /* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
pthread_mutex_unlock(&stream->lock);
+ DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
}
+static void print_stream_indexes(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
+ index_n.node) {
+ DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
+ " stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ index,
+ index->index_n.key,
+ stream->ref.refcount,
+ index->stream->stream_handle,
+ index->stream->trace->id,
+ index->stream->trace->session->id);
+ }
+ rcu_read_unlock();
+}
+
void print_relay_streams(void)
{
struct lttng_ht_iter iter;
struct relay_stream *stream;
+ if (!relay_streams_ht) {
+ return;
+ }
+
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
stream->stream_handle,
stream->trace->id,
stream->trace->session->id);
+ print_stream_indexes(stream);
stream_put(stream);
}
rcu_read_unlock();