X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;fp=src%2Fcommon%2Fconsumer.c;h=c8628e8767ba1c6ee263f0f0d44f2b4fc6160748;hp=c34f47c33f725252f2e598603653e9b1129fcafc;hb=03e431550191df8609f921c7b4054c57ee4644d8;hpb=fb27f84c28e9d8ec7c8ce5ac1b541b0671a4569e diff --git a/src/common/consumer.c b/src/common/consumer.c index c34f47c33..c8628e876 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -2240,26 +2240,22 @@ restart: } if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { - if (revents & (LPOLLERR | LPOLLHUP )) { - DBG("Metadata thread pipe hung up"); - /* - * Remove the pipe from the poll set and continue the loop - * since their might be data to consume. - */ - lttng_poll_del(&events, - lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); - lttng_pipe_read_close(ctx->consumer_metadata_pipe); - continue; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { ssize_t pipe_len; pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < sizeof(stream)) { - PERROR("read metadata stream"); + if (pipe_len < 0) { + PERROR("read metadata stream"); + } /* - * Continue here to handle the rest of the streams. + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); continue; } @@ -2276,6 +2272,19 @@ restart: /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI | LPOLLHUP); + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Metadata thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; } /* Handle other stream */ @@ -2294,8 +2303,30 @@ restart: stream = caa_container_of(node, struct lttng_consumer_stream, node); - /* Check for error event */ - if (revents & (LPOLLERR | LPOLLHUP)) { + if (revents & (LPOLLIN | LPOLLPRI)) { + /* Get the data out of the metadata file descriptor */ + DBG("Metadata available on fd %d", pollfd); + assert(stream->wait_fd == pollfd); + + do { + health_code_update(); + + len = ctx->on_buffer_ready(stream, ctx); + /* + * We don't check the return value here since if we get + * a negative len, it means an error occured thus we + * simply remove it from the poll set and free the + * stream. + */ + } while (len > 0); + + /* It's ok to have an unavailable sub-buffer */ + if (len < 0 && len != -EAGAIN && len != -ENODATA) { + /* Clean up stream from consumer and free it. */ + lttng_poll_del(&events, stream->wait_fd); + consumer_del_metadata_stream(stream, metadata_ht); + } + } else if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Metadata fd %d is hup|err.", pollfd); if (!stream->hangup_flush_done && (consumer_data.type == LTTNG_CONSUMER32_UST @@ -2323,31 +2354,11 @@ restart: * and securely free the stream. */ consumer_del_metadata_stream(stream, metadata_ht); - } else if (revents & (LPOLLIN | LPOLLPRI)) { - /* Get the data out of the metadata file descriptor */ - DBG("Metadata available on fd %d", pollfd); - assert(stream->wait_fd == pollfd); - - do { - health_code_update(); - - len = ctx->on_buffer_ready(stream, ctx); - /* - * We don't check the return value here since if we get - * a negative len, it means an error occured thus we - * simply remove it from the poll set and free the - * stream. - */ - } while (len > 0); - - /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN && len != -ENODATA) { - /* Clean up stream from consumer and free it. */ - lttng_poll_del(&events, stream->wait_fd); - consumer_del_metadata_stream(stream, metadata_ht); - } + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + rcu_read_unlock; + goto end; } - /* Release RCU lock for the stream looked up */ rcu_read_unlock(); } @@ -2812,21 +2823,16 @@ restart: } if (pollfd == ctx->consumer_channel_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP)) { - DBG("Channel thread pipe hung up"); - /* - * Remove the pipe from the poll set and continue the loop - * since their might be data to consume. - */ - lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); - continue; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { enum consumer_channel_action action; uint64_t key; ret = read_channel_pipe(ctx, &chan, &key, &action); if (ret <= 0) { - ERR("Error reading channel pipe"); + if (ret < 0) { + ERR("Error reading channel pipe"); + } + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); continue; } @@ -2843,7 +2849,7 @@ restart: rcu_read_unlock(); /* Add channel to the global poll events list */ lttng_poll_add(&events, chan->wait_fd, - LPOLLIN | LPOLLPRI); + LPOLLERR | LPOLLHUP); break; case CONSUMER_CHANNEL_DEL: { @@ -2903,6 +2909,17 @@ restart: ERR("Unknown action"); break; } + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Channel thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; } /* Handle other stream */ @@ -2941,6 +2958,10 @@ restart: && !uatomic_read(&chan->nb_init_stream_left)) { consumer_del_channel(chan); } + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + rcu_read_unlock(); + goto end; } /* Release RCU lock for the channel looked up */