+ ret = lttng_poll_add(&events, ctx->consumer_channel_pipe[0], LPOLLIN);
+ if (ret < 0) {
+ goto end;
+ }
+
+ /* Main loop */
+ DBG("Channel main loop started");
+
+ while (1) {
+ /* Only the channel pipe is set */
+ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ goto end;
+ }
+
+restart:
+ DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ ret = lttng_poll_wait(&events, -1);
+ DBG("Channel event catched in thread");
+ if (ret < 0) {
+ if (errno == EINTR) {
+ ERR("Poll EINTR catched");
+ goto restart;
+ }
+ goto end;
+ }
+
+ nb_fd = ret;
+
+ /* From here, the event is a channel wait fd */
+ for (i = 0; i < nb_fd; i++) {
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Just don't waste time if no returned events for the fd */
+ if (!revents) {
+ continue;
+ }
+ if (pollfd == ctx->consumer_channel_pipe[0]) {
+ if (revents & (LPOLLERR | LPOLLHUP)) {
+ DBG("Channel thread pipe hung up");
+ /*
+ * Remove the pipe from the poll set and continue the loop
+ * since their might be data to consume.
+ */
+ lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ continue;
+ } else if (revents & LPOLLIN) {
+ enum consumer_channel_action action;
+ uint64_t key;
+
+ ret = read_channel_pipe(ctx, &chan, &key, &action);
+ if (ret <= 0) {
+ ERR("Error reading channel pipe");
+ continue;
+ }
+
+ switch (action) {
+ case CONSUMER_CHANNEL_ADD:
+ DBG("Adding channel %d to poll set",
+ chan->wait_fd);
+
+ lttng_ht_node_init_u64(&chan->wait_fd_node,
+ chan->wait_fd);
+ rcu_read_lock();
+ lttng_ht_add_unique_u64(channel_ht,
+ &chan->wait_fd_node);
+ rcu_read_unlock();
+ /* Add channel to the global poll events list */
+ lttng_poll_add(&events, chan->wait_fd,
+ LPOLLIN | LPOLLPRI);
+ break;
+ case CONSUMER_CHANNEL_DEL:
+ {
+ struct lttng_consumer_stream *stream, *stmp;
+
+ rcu_read_lock();
+ chan = consumer_find_channel(key);
+ if (!chan) {
+ rcu_read_unlock();
+ ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+ break;
+ }
+ lttng_poll_del(&events, chan->wait_fd);
+ iter.iter.node = &chan->wait_fd_node.node;
+ ret = lttng_ht_del(channel_ht, &iter);
+ assert(ret == 0);
+ consumer_close_channel_streams(chan);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ uatomic_sub(&stream->chan->refcount, 1);
+ assert(&chan->refcount);
+ free(stream);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
+ /*
+ * Release our own refcount. Force channel deletion even if
+ * streams were not initialized.
+ */
+ if (!uatomic_sub_return(&chan->refcount, 1)) {
+ consumer_del_channel(chan);
+ }
+ rcu_read_unlock();
+ goto restart;
+ }
+ case CONSUMER_CHANNEL_QUIT:
+ /*
+ * Remove the pipe from the poll set and continue the loop
+ * since their might be data to consume.
+ */
+ lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ continue;
+ default:
+ ERR("Unknown action");
+ break;
+ }
+ }
+
+ /* Handle other stream */
+ continue;
+ }
+
+ rcu_read_lock();
+ {
+ uint64_t tmp_id = (uint64_t) pollfd;
+
+ lttng_ht_lookup(channel_ht, &tmp_id, &iter);
+ }
+ node = lttng_ht_iter_get_node_u64(&iter);
+ assert(node);
+
+ chan = caa_container_of(node, struct lttng_consumer_channel,
+ wait_fd_node);
+
+ /* Check for error event */
+ if (revents & (LPOLLERR | LPOLLHUP)) {
+ DBG("Channel fd %d is hup|err.", pollfd);