X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-stream.c;h=027d751508cf69b76411beae1626d3c0fd847aac;hp=b5ab6c6bee2dc38ef615ff84f87129354abf86a2;hb=10a5031171c7bca5b4498c871b119e5a88b6a3fb;hpb=618a6a28c0956fc6829c165a39ffec97f239096c diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index b5ab6c6be..027d75150 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -19,6 +19,7 @@ #define _GNU_SOURCE #include +#include #include #include @@ -80,6 +81,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } + stream->net_seq_idx = (uint64_t) -1ULL; } /* @@ -110,11 +112,11 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) if (ret) { PERROR("close"); } + stream->wait_fd = -1; } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_del_stream(stream); break; default: ERR("Unknown consumer_data type"); @@ -127,6 +129,7 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) if (ret) { PERROR("close"); } + stream->out_fd = -1; } /* Check and cleanup relayd if needed. */ @@ -151,6 +154,8 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, struct lttng_ht_iter iter; assert(stream); + /* Should NEVER be called not in monitor mode. */ + assert(stream->chan->monitor); rcu_read_lock(); @@ -177,15 +182,9 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, rcu_read_unlock(); - /* - * For a *non* monitored stream, we MUST NOT decrement or else the data - * thread will use the wrong value or stream for its local stream set. - */ - if (stream->chan->monitor) { - /* Decrement the stream count of the global consumer data. */ - assert(consumer_data.stream_count > 0); - consumer_data.stream_count--; - } + /* Decrement the stream count of the global consumer data. */ + assert(consumer_data.stream_count > 0); + consumer_data.stream_count--; } /* @@ -199,44 +198,110 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) } /* - * Destroy a stream completely. This will delete, close and free the stream. - * Once return, the stream is NO longer usable. Its channel may get destroyed - * if conditions are met. - * - * This MUST be called WITHOUT the consumer data and stream lock acquired. + * Destroy the stream's buffers of the tracer. */ -void consumer_stream_destroy(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) { - struct lttng_consumer_channel *free_chan = NULL; + assert(stream); + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_del_stream(stream); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + } +} +/* + * Destroy a stream in no monitor mode. + * + * We need a separate function because this can be called inside a destroy + * channel path which have the consumer data lock acquired. Also, in no monitor + * mode, the channel refcount is NOT incremented per stream since the ownership + * of those streams are INSIDE the channel making the lazy destroy channel not + * possible for a non monitor stream. + * + * Furthermore, there is no need to delete the stream from the global hash + * table so we avoid useless calls. + */ +static void destroy_no_monitor(struct lttng_consumer_stream *stream) +{ assert(stream); - DBG("Consumer stream destroy - wait_fd: %d", stream->wait_fd); + DBG("Consumer stream destroy unmonitored key: %" PRIu64, stream->key); + + /* Destroy tracer buffers of the stream. */ + consumer_stream_destroy_buffers(stream); + /* Close down everything including the relayd if one. */ + consumer_stream_close(stream); +} - pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&stream->lock); +/* + * Destroy a stream in monitor mode. + */ +static void destroy_monitor(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) +{ + assert(stream); + + DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key); /* Remove every reference of the stream in the consumer. */ consumer_stream_delete(stream, ht); - + /* Destroy tracer buffers of the stream. */ + consumer_stream_destroy_buffers(stream); /* Close down everything including the relayd if one. */ consumer_stream_close(stream); +} - /* Update refcount of channel and see if we need to destroy it. */ - if (!uatomic_sub_return(&stream->chan->refcount, 1) - && !uatomic_read(&stream->chan->nb_init_stream_left)) { - free_chan = stream->chan; - } +/* + * Destroy a stream completely. This will delete, close and free the stream. + * Once return, the stream is NO longer usable. Its channel may get destroyed + * if conditions are met for a monitored stream. + * + * This MUST be called WITHOUT the consumer data and stream lock acquired if + * the stream is in _monitor_ mode else it does not matter. + */ +void consumer_stream_destroy(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) +{ + assert(stream); + + /* Stream is in monitor mode. */ + if (stream->chan->monitor) { + struct lttng_consumer_channel *free_chan = NULL; - /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; + pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->lock); - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&consumer_data.lock); + destroy_monitor(stream, ht); - if (free_chan) { - consumer_del_channel(free_chan); + /* Update refcount of channel and see if we need to destroy it. */ + if (!uatomic_sub_return(&stream->chan->refcount, 1) + && !uatomic_read(&stream->chan->nb_init_stream_left)) { + free_chan = stream->chan; + } + + /* Indicates that the consumer data state MUST be updated after this. */ + consumer_data.need_update = 1; + + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&consumer_data.lock); + + if (free_chan) { + consumer_del_channel(free_chan); + } + } else { + /* + * No monitor mode the stream's ownership is in its channel thus we + * don't have to handle the channel refcount nor the lazy deletion. + */ + destroy_no_monitor(stream); } /* Free stream within a RCU call. */