+ nb_fd = ret;
+
+ /* From here, the event is a channel wait fd */
+ for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Just don't waste time if no returned events for the fd */
+ if (!revents) {
+ continue;
+ }
+ if (pollfd == ctx->consumer_channel_pipe[0]) {
+ if (revents & (LPOLLERR | LPOLLHUP)) {
+ DBG("Channel thread pipe hung up");
+ /*
+ * Remove the pipe from the poll set and continue the loop
+ * since their might be data to consume.
+ */
+ lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ continue;
+ } else if (revents & LPOLLIN) {
+ enum consumer_channel_action action;
+ uint64_t key;
+
+ ret = read_channel_pipe(ctx, &chan, &key, &action);
+ if (ret <= 0) {
+ ERR("Error reading channel pipe");
+ continue;
+ }
+
+ switch (action) {
+ case CONSUMER_CHANNEL_ADD:
+ DBG("Adding channel %d to poll set",
+ chan->wait_fd);
+
+ lttng_ht_node_init_u64(&chan->wait_fd_node,
+ chan->wait_fd);
+ rcu_read_lock();
+ lttng_ht_add_unique_u64(channel_ht,
+ &chan->wait_fd_node);
+ rcu_read_unlock();
+ /* Add channel to the global poll events list */
+ lttng_poll_add(&events, chan->wait_fd,
+ LPOLLIN | LPOLLPRI);
+ break;
+ case CONSUMER_CHANNEL_DEL:
+ {
+ /*
+ * This command should never be called if the channel
+ * has streams monitored by either the data or metadata
+ * thread. The consumer only notify this thread with a
+ * channel del. command if it receives a destroy
+ * channel command from the session daemon that send it
+ * if a command prior to the GET_CHANNEL failed.
+ */
+
+ rcu_read_lock();
+ chan = consumer_find_channel(key);
+ if (!chan) {
+ rcu_read_unlock();
+ ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+ break;
+ }
+ lttng_poll_del(&events, chan->wait_fd);
+ iter.iter.node = &chan->wait_fd_node.node;
+ ret = lttng_ht_del(channel_ht, &iter);
+ assert(ret == 0);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ health_code_update();
+ /* Destroy streams that might have been left in the stream list. */
+ clean_channel_stream_list(chan);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
+ /*
+ * Release our own refcount. Force channel deletion even if
+ * streams were not initialized.
+ */
+ if (!uatomic_sub_return(&chan->refcount, 1)) {
+ consumer_del_channel(chan);
+ }
+ rcu_read_unlock();
+ goto restart;
+ }
+ case CONSUMER_CHANNEL_QUIT:
+ /*
+ * Remove the pipe from the poll set and continue the loop
+ * since their might be data to consume.
+ */
+ lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+ continue;
+ default:
+ ERR("Unknown action");
+ break;
+ }
+ }
+
+ /* Handle other stream */
+ continue;
+ }
+
+ rcu_read_lock();
+ {
+ uint64_t tmp_id = (uint64_t) pollfd;
+
+ lttng_ht_lookup(channel_ht, &tmp_id, &iter);
+ }
+ node = lttng_ht_iter_get_node_u64(&iter);
+ assert(node);
+
+ chan = caa_container_of(node, struct lttng_consumer_channel,
+ wait_fd_node);
+
+ /* Check for error event */
+ if (revents & (LPOLLERR | LPOLLHUP)) {
+ DBG("Channel fd %d is hup|err.", pollfd);
+
+ lttng_poll_del(&events, chan->wait_fd);
+ ret = lttng_ht_del(channel_ht, &iter);
+ assert(ret == 0);
+
+ /*
+ * This will close the wait fd for each stream associated to
+ * this channel AND monitored by the data/metadata thread thus
+ * will be clean by the right thread.
+ */
+ consumer_close_channel_streams(chan);
+
+ /* Release our own refcount */
+ if (!uatomic_sub_return(&chan->refcount, 1)
+ && !uatomic_read(&chan->nb_init_stream_left)) {
+ consumer_del_channel(chan);
+ }
+ }
+
+ /* Release RCU lock for the channel looked up */
+ rcu_read_unlock();
+ }
+ }
+
+ /* All is OK */
+ err = 0;
+end:
+ lttng_poll_clean(&events);
+end_poll:
+ destroy_channel_ht(channel_ht);
+end_ht:
+error_testpoint:
+ DBG("Channel poll thread exiting");
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);