X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=c909548907d78c435392c4fa3c428ac0bab51d67;hb=212d67a218a0e805950f85bd95143a996e957322;hp=9d5a36970551fd19a5cb2288ca15026f1bf8dc70;hpb=6d574024f868e661ae688ecbc47a110a1311c57e;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 9d5a36970..c90954890 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -150,6 +150,31 @@ error: return (int) ret; } +/* + * Cleanup the stream list of a channel. Those streams are not yet globally + * visible + */ +static void clean_channel_stream_list(struct lttng_consumer_channel *channel) +{ + struct lttng_consumer_stream *stream, *stmp; + + assert(channel); + + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + cds_list_del(&stream->send_node); + /* + * Once a stream is added to this list, the buffers were created so we + * have a guarantee that this call will succeed. Setting the monitor + * mode to 0 so we don't lock nor try to delete the stream from the + * global hash table. + */ + stream->monitor = 0; + consumer_stream_destroy(stream, NULL); + } +} + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -292,23 +317,14 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) { int ret; struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream, *stmp; DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - cds_list_del(&stream->send_node); - /* - * Once a stream is added to this list, the buffers were created so - * we have a guarantee that this call will succeed. - */ - consumer_stream_destroy(stream, NULL); - } + /* Destroy streams that might have been left in the stream list. */ + clean_channel_stream_list(channel); if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); @@ -2652,12 +2668,17 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel) break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* - * Note: a mutex is taken internally within - * liblttng-ust-ctl to protect timer wakeup_fd - * use from concurrent close. - */ - lttng_ustconsumer_close_stream_wakeup(stream); + if (stream->metadata_flag) { + /* Safe and protected by the stream lock. */ + lttng_ustconsumer_close_metadata(stream->chan); + } else { + /* + * Note: a mutex is taken internally within + * liblttng-ust-ctl to protect timer wakeup_fd + * use from concurrent close. + */ + lttng_ustconsumer_close_stream_wakeup(stream); + } break; default: ERR("Unknown consumer_data type"); @@ -2812,7 +2833,14 @@ restart: break; case CONSUMER_CHANNEL_DEL: { - struct lttng_consumer_stream *stream, *stmp; + /* + * This command should never be called if the channel + * has streams monitored by either the data or metadata + * thread. The consumer only notify this thread with a + * channel del. command if it receives a destroy + * channel command from the session daemon that send it + * if a command prior to the GET_CHANNEL failed. + */ rcu_read_lock(); chan = consumer_find_channel(key); @@ -2825,24 +2853,15 @@ restart: iter.iter.node = &chan->wait_fd_node.node; ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); - consumer_close_channel_streams(chan); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, - send_node) { - health_code_update(); - - cds_list_del(&stream->send_node); - lttng_ustconsumer_del_stream(stream); - uatomic_sub(&stream->chan->refcount, 1); - assert(&chan->refcount); - free(stream); - } + health_code_update(); + /* Destroy streams that might have been left in the stream list. */ + clean_channel_stream_list(chan); break; default: ERR("Unknown consumer_data type"); @@ -2895,6 +2914,12 @@ restart: lttng_poll_del(&events, chan->wait_fd); ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); + + /* + * This will close the wait fd for each stream associated to + * this channel AND monitored by the data/metadata thread thus + * will be clean by the right thread. + */ consumer_close_channel_streams(chan); /* Release our own refcount */