/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
- bool has_ref = false;
-
- pthread_mutex_lock(&stream->reflock);
- if (stream->ref.refcount != 0) {
- has_ref = true;
- urcu_ref_get(&stream->ref);
- }
- pthread_mutex_unlock(&stream->reflock);
-
- return has_ref;
+ return urcu_ref_get_unless_zero(&stream->ref);
}
/*
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
PERROR("relay stream zmalloc");
- ret = -1;
goto error_no_alloc;
}
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
stream->channel_name = channel_name;
+ stream->rotate_at_seq_num = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
- pthread_mutex_init(&stream->reflock, NULL);
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
/*
* Stream must be protected by holding the stream lock or by virtue of being
- * called from stream_destroy, in which case it is guaranteed to be accessed
- * from a single thread by the reflock.
+ * called from stream_destroy.
*/
static void stream_unpublish(struct relay_stream *stream)
{
/*
* No need to take stream->lock since this is only called on the final
* stream_put which ensures that a single thread may act on the stream.
- *
- * At that point, the object is also protected by the reflock which
- * guarantees that no other thread may share ownership of this stream.
*/
static void stream_release(struct urcu_ref *ref)
{
void stream_put(struct relay_stream *stream)
{
DBG("stream put for stream id %" PRIu64, stream->stream_handle);
- /*
- * Ensure existence of stream->reflock for stream unlock.
- */
rcu_read_lock();
- /*
- * Stream reflock ensures that concurrent test and update of
- * stream ref is atomic.
- */
- pthread_mutex_lock(&stream->reflock);
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
stream->stream_handle,
(int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
- pthread_mutex_unlock(&stream->reflock);
rcu_read_unlock();
}
* a packet. Since those are sent in that order, we take
* care of consumerd crashes.
*/
+ DBG("relay_index_close_partial_fd");
relay_index_close_partial_fd(stream);
/*
* Use the highest net_seq_num we currently have pending
* at -1ULL if we cannot find any index.
*/
stream->last_net_seq_num = relay_index_find_last(stream);
+ DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
/* Fall-through into the next check. */
}