X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=2bc815813613054f58f39810b9c8df0dcade0117;hp=cac87635ed46c2044f72c44e0e99940c6af1af17;hb=cab6931e14405d5d966ced712028ede43b2d4cff;hpb=3d07a857948f868354589ff742b0a2f6277f558f diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index cac87635e..2bc815813 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -17,7 +17,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -33,16 +32,7 @@ /* Should be called with RCU read-side lock held. */ bool stream_get(struct relay_stream *stream) { - bool has_ref = false; - - pthread_mutex_lock(&stream->reflock); - if (stream->ref.refcount != 0) { - has_ref = true; - urcu_ref_get(&stream->ref); - } - pthread_mutex_unlock(&stream->reflock); - - return has_ref; + return urcu_ref_get_unless_zero(&stream->ref); } /* @@ -87,7 +77,6 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { PERROR("relay stream zmalloc"); - ret = -1; goto error_no_alloc; } @@ -101,7 +90,6 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->channel_name = channel_name; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); - pthread_mutex_init(&stream->reflock, NULL); urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; @@ -149,7 +137,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); } - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) { stream->is_metadata = 1; } @@ -228,8 +216,7 @@ unlock: /* * Stream must be protected by holding the stream lock or by virtue of being - * called from stream_destroy, in which case it is guaranteed to be accessed - * from a single thread by the reflock. + * called from stream_destroy. */ static void stream_unpublish(struct relay_stream *stream) { @@ -253,6 +240,11 @@ static void stream_unpublish(struct relay_stream *stream) static void stream_destroy(struct relay_stream *stream) { if (stream->indexes_ht) { + /* + * Calling lttng_ht_destroy in call_rcu worker thread so + * we don't hold the RCU read-side lock while calling + * it. + */ lttng_ht_destroy(stream->indexes_ht); } if (stream->tfa) { @@ -274,9 +266,6 @@ static void stream_destroy_rcu(struct rcu_head *rcu_head) /* * No need to take stream->lock since this is only called on the final * stream_put which ensures that a single thread may act on the stream. - * - * At that point, the object is also protected by the reflock which - * guarantees that no other thread may share ownership of this stream. */ static void stream_release(struct urcu_ref *ref) { @@ -302,9 +291,9 @@ static void stream_release(struct urcu_ref *ref) stream_fd_put(stream->stream_fd); stream->stream_fd = NULL; } - if (stream->index_fd) { - stream_fd_put(stream->index_fd); - stream->index_fd = NULL; + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; } if (stream->trace) { ctf_trace_put(stream->trace); @@ -317,15 +306,7 @@ static void stream_release(struct urcu_ref *ref) void stream_put(struct relay_stream *stream) { DBG("stream put for stream id %" PRIu64, stream->stream_handle); - /* - * Ensure existence of stream->reflock for stream unlock. - */ rcu_read_lock(); - /* - * Stream reflock ensures that concurrent test and update of - * stream ref is atomic. - */ - pthread_mutex_lock(&stream->reflock); assert(stream->ref.refcount != 0); /* * Wait until we have processed all the stream packets before @@ -335,13 +316,20 @@ void stream_put(struct relay_stream *stream) stream->stream_handle, (int) stream->ref.refcount); urcu_ref_put(&stream->ref, stream_release); - pthread_mutex_unlock(&stream->reflock); rcu_read_unlock(); } void try_stream_close(struct relay_stream *stream) { + bool session_aborted; + struct relay_session *session = stream->trace->session; + DBG("Trying to close stream %" PRIu64, stream->stream_handle); + + pthread_mutex_lock(&session->lock); + session_aborted = session->aborted; + pthread_mutex_unlock(&session->lock); + pthread_mutex_lock(&stream->lock); /* * Can be called concurently by connection close and reception of last @@ -383,7 +371,8 @@ void try_stream_close(struct relay_stream *stream) } if (stream->last_net_seq_num != -1ULL && - ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { + ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0 + && !session_aborted) { /* * Don't close since we still have data pending. This * handles cases where an explicit close command has @@ -416,11 +405,36 @@ void try_stream_close(struct relay_stream *stream) stream_put(stream); } +static void print_stream_indexes(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index, + index_n.node) { + DBG("index %p net_seq_num %" PRIu64 " refcount %ld" + " stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + index, + index->index_n.key, + stream->ref.refcount, + index->stream->stream_handle, + index->stream->trace->id, + index->stream->trace->session->id); + } + rcu_read_unlock(); +} + void print_relay_streams(void) { struct lttng_ht_iter iter; struct relay_stream *stream; + if (!relay_streams_ht) { + return; + } + rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, node.node) { @@ -434,6 +448,7 @@ void print_relay_streams(void) stream->stream_handle, stream->trace->id, stream->trace->session->id); + print_stream_indexes(stream); stream_put(stream); } rcu_read_unlock();