Fix: LPOLLHUP and LPOLLERR when there is still data in pipe/socket
[lttng-tools.git] / src / common / consumer.c
index c34f47c33f725252f2e598603653e9b1129fcafc..c8628e8767ba1c6ee263f0f0d44f2b4fc6160748 100644 (file)
@@ -2240,26 +2240,22 @@ restart:
                        }
 
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
-                               if (revents & (LPOLLERR | LPOLLHUP )) {
-                                       DBG("Metadata thread pipe hung up");
-                                       /*
-                                        * Remove the pipe from the poll set and continue the loop
-                                        * since their might be data to consume.
-                                        */
-                                       lttng_poll_del(&events,
-                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
-                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
-                                       continue;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        ssize_t pipe_len;
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
                                        if (pipe_len < sizeof(stream)) {
-                                               PERROR("read metadata stream");
+                                               if (pipe_len < 0) {
+                                                       PERROR("read metadata stream");
+                                               }
                                                /*
-                                                * Continue here to handle the rest of the streams.
+                                                * Remove the pipe from the poll set and continue the loop
+                                                * since their might be data to consume.
                                                 */
+                                               lttng_poll_del(&events,
+                                                               lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                               lttng_pipe_read_close(ctx->consumer_metadata_pipe);
                                                continue;
                                        }
 
@@ -2276,6 +2272,19 @@ restart:
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI | LPOLLHUP);
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Metadata thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
                                }
 
                                /* Handle other stream */
@@ -2294,8 +2303,30 @@ restart:
                        stream = caa_container_of(node, struct lttng_consumer_stream,
                                        node);
 
-                       /* Check for error event */
-                       if (revents & (LPOLLERR | LPOLLHUP)) {
+                       if (revents & (LPOLLIN | LPOLLPRI)) {
+                               /* Get the data out of the metadata file descriptor */
+                               DBG("Metadata available on fd %d", pollfd);
+                               assert(stream->wait_fd == pollfd);
+
+                               do {
+                                       health_code_update();
+
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
+                               /* It's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
+                                       /* Clean up stream from consumer and free it. */
+                                       lttng_poll_del(&events, stream->wait_fd);
+                                       consumer_del_metadata_stream(stream, metadata_ht);
+                               }
+                       } else if (revents & (LPOLLERR | LPOLLHUP)) {
                                DBG("Metadata fd %d is hup|err.", pollfd);
                                if (!stream->hangup_flush_done
                                                && (consumer_data.type == LTTNG_CONSUMER32_UST
@@ -2323,31 +2354,11 @@ restart:
                                 * and securely free the stream.
                                 */
                                consumer_del_metadata_stream(stream, metadata_ht);
-                       } else if (revents & (LPOLLIN | LPOLLPRI)) {
-                               /* Get the data out of the metadata file descriptor */
-                               DBG("Metadata available on fd %d", pollfd);
-                               assert(stream->wait_fd == pollfd);
-
-                               do {
-                                       health_code_update();
-
-                                       len = ctx->on_buffer_ready(stream, ctx);
-                                       /*
-                                        * We don't check the return value here since if we get
-                                        * a negative len, it means an error occured thus we
-                                        * simply remove it from the poll set and free the
-                                        * stream.
-                                        */
-                               } while (len > 0);
-
-                               /* It's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
-                                       /* Clean up stream from consumer and free it. */
-                                       lttng_poll_del(&events, stream->wait_fd);
-                                       consumer_del_metadata_stream(stream, metadata_ht);
-                               }
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               rcu_read_unlock;
+                               goto end;
                        }
-
                        /* Release RCU lock for the stream looked up */
                        rcu_read_unlock();
                }
@@ -2812,21 +2823,16 @@ restart:
                        }
 
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
-                               if (revents & (LPOLLERR | LPOLLHUP)) {
-                                       DBG("Channel thread pipe hung up");
-                                       /*
-                                        * Remove the pipe from the poll set and continue the loop
-                                        * since their might be data to consume.
-                                        */
-                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
-                                       continue;
-                               } else if (revents & LPOLLIN) {
+                               if (revents & LPOLLIN) {
                                        enum consumer_channel_action action;
                                        uint64_t key;
 
                                        ret = read_channel_pipe(ctx, &chan, &key, &action);
                                        if (ret <= 0) {
-                                               ERR("Error reading channel pipe");
+                                               if (ret < 0) {
+                                                       ERR("Error reading channel pipe");
+                                               }
+                                               lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
                                                continue;
                                        }
 
@@ -2843,7 +2849,7 @@ restart:
                                                rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
-                                                               LPOLLIN | LPOLLPRI);
+                                                               LPOLLERR | LPOLLHUP);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
@@ -2903,6 +2909,17 @@ restart:
                                                ERR("Unknown action");
                                                break;
                                        }
+                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
+                                       DBG("Channel thread pipe hung up");
+                                       /*
+                                        * Remove the pipe from the poll set and continue the loop
+                                        * since their might be data to consume.
+                                        */
+                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
+                                       continue;
+                               } else {
+                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                                       goto end;
                                }
 
                                /* Handle other stream */
@@ -2941,6 +2958,10 @@ restart:
                                                && !uatomic_read(&chan->nb_init_stream_left)) {
                                        consumer_del_channel(chan);
                                }
+                       } else {
+                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+                               rcu_read_unlock();
+                               goto end;
                        }
 
                        /* Release RCU lock for the channel looked up */
This page took 0.025791 seconds and 4 git commands to generate.