X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=5574c5f0e1d545691f6837d2d4100a10c2a59e48;hb=4b29f1cec9f8e93d4dde0c982f30e3201f0f4e65;hp=b9b5818808203332f220d97f21a23b30dff982b6;hpb=1d1a276c180aae7eca393b5a1c961cdfc4bddc84;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index b9b581880..5574c5f0e 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -312,6 +312,25 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } + /* Empty no monitor streams list. */ + if (!channel->monitor) { + struct lttng_consumer_stream *stream, *stmp; + + /* + * So, these streams are not visible to any data thread. This is why we + * close and free them because they were never added to any data + * structure apart from this one. + */ + cds_list_for_each_entry_safe(stream, stmp, + &channel->stream_no_monitor_list.head, no_monitor_node) { + cds_list_del(&stream->no_monitor_node); + /* Close everything in that stream. */ + consumer_stream_close(stream); + /* Free the ressource. */ + consumer_stream_free(stream); + } + } + rcu_read_lock(); iter.iter.node = &channel->node.node; ret = lttng_ht_del(consumer_data.channel_ht, &iter); @@ -509,8 +528,10 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, /* Init session id node with the stream session id */ lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); - DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64, - stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id); + DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 + " relayd_id %" PRIu64 ", session_id %" PRIu64, + stream->name, stream->key, channel_key, + stream->net_seq_idx, stream->session_id); rcu_read_unlock(); return stream; @@ -747,7 +768,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t relayd_id, enum lttng_event_output output, uint64_t tracefile_size, - uint64_t tracefile_count) + uint64_t tracefile_count, + unsigned int monitor) { struct lttng_consumer_channel *channel; @@ -766,6 +788,34 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->output = output; channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; + channel->monitor = monitor; + + /* + * In monitor mode, the streams associated with the channel will be put in + * a special list ONLY owned by this channel. So, the refcount is set to 1 + * here meaning that the channel itself has streams that are referenced. + * + * On a channel deletion, once the channel is no longer visible, the + * refcount is decremented and checked for a zero value to delete it. With + * streams in no monitor mode, it will now be safe to destroy the channel. + */ + if (!channel->monitor) { + channel->refcount = 1; + } + + switch (output) { + case LTTNG_EVENT_SPLICE: + channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + ERR("Allocate channel output unknown %d", output); + free(channel); + channel = NULL; + goto end; + } strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -778,6 +828,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->wait_fd = -1; CDS_INIT_LIST_HEAD(&channel->streams.head); + CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head); DBG("Allocated channel (key %" PRIu64 ")", channel->key) @@ -1783,7 +1834,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, PERROR("munmap metadata stream"); } } - if (stream->wait_fd >= 0) { ret = close(stream->wait_fd); if (ret < 0) { @@ -3057,6 +3107,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Assign new file descriptor */ relayd->control_sock.sock.fd = fd; + fd = -1; /* For error path */ /* Assign version values. */ relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; @@ -3102,6 +3153,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, /* Assign new file descriptor */ relayd->data_sock.sock.fd = fd; + fd = -1; /* for eventual error paths */ /* Assign version values. */ relayd->data_sock.major = relayd_sock->major; relayd->data_sock.minor = relayd_sock->minor;