X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-stream.c;h=723ec829f80095ff17bd3e5996540d01d754072a;hp=24f1b8a42d14c580c013f3f38bac9b9576f3c89a;hb=d01178b6f6465443d7e6e1015aa7054e9d093e91;hpb=51230d709a394904ee9c449c26d645e737c4af94 diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 24f1b8a42..723ec829f 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -19,6 +19,7 @@ #define _GNU_SOURCE #include +#include #include #include @@ -57,8 +58,10 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, assert(stream); assert(relayd); - uatomic_dec(&relayd->refcount); - assert(uatomic_read(&relayd->refcount) >= 0); + if (stream->sent_to_relayd) { + uatomic_dec(&relayd->refcount); + assert(uatomic_read(&relayd->refcount) >= 0); + } /* Closing streams requires to lock the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -80,6 +83,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } + stream->net_seq_idx = (uint64_t) -1ULL; + stream->sent_to_relayd = 0; } /* @@ -110,11 +115,11 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) if (ret) { PERROR("close"); } + stream->wait_fd = -1; } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_del_stream(stream); break; default: ERR("Unknown consumer_data type"); @@ -127,6 +132,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) if (ret) { PERROR("close"); } + stream->out_fd = -1; } /* Check and cleanup relayd if needed. */ @@ -151,6 +157,8 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, struct lttng_ht_iter iter; assert(stream); + /* Should NEVER be called not in monitor mode. */ + assert(stream->chan->monitor); rcu_read_lock(); @@ -193,29 +201,51 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) } /* - * Destroy a stream completely. This will delete, close and free the stream. - * Once return, the stream is NO longer usable. Its channel may get destroyed - * if conditions are met. - * - * This MUST be called WITHOUT the consumer data and stream lock acquired. + * Destroy the stream's buffers of the tracer. */ -void consumer_stream_destroy(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) { - struct lttng_consumer_channel *free_chan = NULL; - assert(stream); - DBG("Consumer stream destroy - wait_fd: %d", stream->wait_fd); + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_del_stream(stream); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + } +} - pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&stream->lock); +/* + * Destroy and close a already created stream. + */ +static void destroy_close_stream(struct lttng_consumer_stream *stream) +{ + assert(stream); - /* Remove every reference of the stream in the consumer. */ - consumer_stream_delete(stream, ht); + DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key); + /* Destroy tracer buffers of the stream. */ + consumer_stream_destroy_buffers(stream); /* Close down everything including the relayd if one. */ consumer_stream_close(stream); +} + +/* + * Decrement the stream's channel refcount and if down to 0, return the channel + * pointer so it can be destroyed by the caller or NULL if not. + */ +static struct lttng_consumer_channel *unref_channel( + struct lttng_consumer_stream *stream) +{ + struct lttng_consumer_channel *free_chan = NULL; + + assert(stream); + assert(stream->chan); /* Update refcount of channel and see if we need to destroy it. */ if (!uatomic_sub_return(&stream->chan->refcount, 1) @@ -223,14 +253,60 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, free_chan = stream->chan; } - /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; + return free_chan; +} + +/* + * Destroy a stream completely. This will delete, close and free the stream. + * Once return, the stream is NO longer usable. Its channel may get destroyed + * if conditions are met for a monitored stream. + * + * This MUST be called WITHOUT the consumer data and stream lock acquired if + * the stream is in _monitor_ mode else it does not matter. + */ +void consumer_stream_destroy(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) +{ + assert(stream); + + /* Stream is in monitor mode. */ + if (stream->monitor) { + struct lttng_consumer_channel *free_chan = NULL; - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&consumer_data.lock); + /* + * This means that the stream was successfully removed from the streams + * list of the channel and sent to the right thread managing this + * stream thus being globally visible. + */ + if (stream->globally_visible) { + pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->lock); + /* Remove every reference of the stream in the consumer. */ + consumer_stream_delete(stream, ht); + + destroy_close_stream(stream); + + /* Update channel's refcount of the stream. */ + free_chan = unref_channel(stream); + + /* Indicates that the consumer data state MUST be updated after this. */ + consumer_data.need_update = 1; + + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&consumer_data.lock); + } else { + /* + * If the stream is not visible globally, this needs to be done + * outside of the consumer data lock section. + */ + free_chan = unref_channel(stream); + } - if (free_chan) { - consumer_del_channel(free_chan); + if (free_chan) { + consumer_del_channel(free_chan); + } + } else { + destroy_close_stream(stream); } /* Free stream within a RCU call. */