X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=116441171e5aa73d198c2bbd90175da0d9fb03fa;hb=f2a444f17e07f805109c01ab4c7f53cc98b1adf3;hp=78b3f0799737139fc9067a601a39452e384b3e0e;hpb=73811eccc9599ebf62e5f5bee49039cecc25c3eb;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 78b3f0799..116441171 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -287,6 +287,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) { int ret; struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream, *stmp; DBG("Consumer delete channel key %" PRIu64, channel->key); @@ -297,6 +298,13 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + cds_list_del(&stream->send_node); + lttng_ustconsumer_del_stream(stream); + free(stream); + } lttng_ustconsumer_del_channel(channel); break; default: @@ -890,6 +898,8 @@ end: /* * Add a channel to the global list protected by a mutex. + * + * On success 0 is returned else a negative value. */ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) @@ -907,7 +917,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, /* Channel already exist. Ignore the insertion */ ERR("Consumer add channel key %" PRIu64 " already exists!", channel->key); - ret = LTTNG_ERR_KERN_CHAN_EXIST; + ret = -EEXIST; goto end; } @@ -2019,9 +2029,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, uatomic_inc(&relayd->refcount); } - /* Update channel refcount once added without error(s). */ - uatomic_inc(&stream->chan->refcount); - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that @@ -2352,7 +2359,7 @@ void *consumer_thread_data_poll(void *data) /* allocate for all fds + 1 for the consumer_data_pipe */ local_stream = zmalloc((consumer_data.stream_count + 1) * - sizeof(struct lttng_consumer_stream)); + sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); pthread_mutex_unlock(&consumer_data.lock); @@ -2719,24 +2726,51 @@ restart: lttng_ht_node_init_u64(&chan->wait_fd_node, chan->wait_fd); + rcu_read_lock(); lttng_ht_add_unique_u64(channel_ht, &chan->wait_fd_node); + rcu_read_unlock(); /* Add channel to the global poll events list */ lttng_poll_add(&events, chan->wait_fd, LPOLLIN | LPOLLPRI); break; case CONSUMER_CHANNEL_DEL: { + struct lttng_consumer_stream *stream, *stmp; + + rcu_read_lock(); chan = consumer_find_channel(key); if (!chan) { + rcu_read_unlock(); ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key); break; } lttng_poll_del(&events, chan->wait_fd); + iter.iter.node = &chan->wait_fd_node.node; ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); consumer_close_channel_streams(chan); + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, + send_node) { + cds_list_del(&stream->send_node); + lttng_ustconsumer_del_stream(stream); + uatomic_sub(&stream->chan->refcount, 1); + assert(&chan->refcount); + free(stream); + } + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + } + /* * Release our own refcount. Force channel deletion even if * streams were not initialized. @@ -2744,6 +2778,7 @@ restart: if (!uatomic_sub_return(&chan->refcount, 1)) { consumer_del_channel(chan); } + rcu_read_unlock(); goto restart; } case CONSUMER_CHANNEL_QUIT: @@ -2782,6 +2817,7 @@ restart: lttng_poll_del(&events, chan->wait_fd); ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); + assert(cds_list_empty(&chan->streams.head)); consumer_close_channel_streams(chan); /* Release our own refcount */ @@ -2870,12 +2906,6 @@ void *consumer_thread_sessiond_poll(void *data) goto end; } - ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto end; - } - /* prepare the FDs to poll : to client socket and the should_quit pipe */ consumer_sockpoll[0].fd = ctx->consumer_should_quit[0]; consumer_sockpoll[0].events = POLLIN | POLLPRI; @@ -2893,11 +2923,6 @@ void *consumer_thread_sessiond_poll(void *data) WARN("On accept"); goto end; } - ret = fcntl(sock, F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto end; - } /* * Setup metadata socket which is the second socket connection on the @@ -2976,7 +3001,7 @@ end: } } if (client_socket >= 0) { - ret = close(sock); + ret = close(client_socket); if (ret < 0) { PERROR("close client_socket sessiond poll"); }