Cleanup: Remove unused label
[lttng-tools.git] / src / common / consumer.c
index 999e400059fbe763264698a75c67b3fb80d6c717..b0b926bb01a1f65c58c9d25da47bc980b4125e83 100644 (file)
@@ -18,6 +18,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
 #include <pthread.h>
@@ -1302,12 +1303,6 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_quit_pipe;
        }
 
-       ret = pipe(ctx->consumer_thread_pipe);
-       if (ret < 0) {
-               PERROR("Error creating thread pipe");
-               goto error_thread_pipe;
-       }
-
        ret = pipe(ctx->consumer_channel_pipe);
        if (ret < 0) {
                PERROR("Error creating channel pipe");
@@ -1319,20 +1314,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
-       if (ret < 0) {
-               goto error_splice_pipe;
-       }
-
        return ctx;
 
-error_splice_pipe:
-       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
-       utils_close_pipe(ctx->consumer_thread_pipe);
-error_thread_pipe:
        utils_close_pipe(ctx->consumer_should_quit);
 error_quit_pipe:
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
@@ -1419,13 +1405,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
-       utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
        lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
-       utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
        unlink(ctx->consumer_command_sock_path);
        free(ctx);
@@ -1446,7 +1430,7 @@ static int write_relayd_metadata_id(int fd,
        ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
        if (ret < sizeof(hdr)) {
                /*
-                * This error means that the fd's end is closed so ignore the perror
+                * This error means that the fd's end is closed so ignore the PERROR
                 * not to clubber the error output since this can happen in a normal
                 * code path.
                 */
@@ -1718,17 +1702,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        goto end;
                }
        }
-
-       /*
-        * Choose right pipe for splice. Metadata and trace data are handled by
-        * different threads hence the use of two pipes in order not to race or
-        * corrupt the written data.
-        */
-       if (stream->metadata_flag) {
-               splice_pipe = ctx->consumer_splice_metadata_pipe;
-       } else {
-               splice_pipe = ctx->consumer_thread_pipe;
-       }
+       splice_pipe = stream->splice_pipe;
 
        /* Write metadata stream id before payload */
        if (relayd) {
@@ -1834,7 +1808,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                /* Splice data out */
                ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
                                ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
-               DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+               DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
+                               outfd, ret_splice);
                if (ret_splice < 0) {
                        ret = errno;
                        written = -ret;
@@ -2206,18 +2181,13 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the metadata pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
                health_poll_entry();
+               DBG("Metadata poll wait");
                ret = lttng_poll_wait(&events, -1);
+               DBG("Metadata poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
@@ -2225,7 +2195,10 @@ restart:
                                ERR("Poll EINTR catched");
                                goto restart;
                        }
-                       goto error;
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
+                       goto end;
                }
 
                nb_fd = ret;
@@ -2237,6 +2210,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2353,7 +2331,6 @@ restart:
 
        /* All is OK */
        err = 0;
-error:
 end:
        DBG("Metadata poll thread exiting");
 
@@ -2776,18 +2753,13 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
-               health_code_update();
-
-               /* Only the channel pipe is set */
-               if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
-                       err = 0;        /* All is OK */
-                       goto end;
-               }
-
 restart:
-               DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_code_update();
+               DBG("Channel poll wait");
                health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               DBG("Channel poll return from wait with %d fd(s)",
+                               LTTNG_POLL_GETNB(&events));
                health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
@@ -2795,6 +2767,9 @@ restart:
                                ERR("Poll EINTR catched");
                                goto restart;
                        }
+                       if (LTTNG_POLL_GETNB(&events) == 0) {
+                               err = 0;        /* All is OK */
+                       }
                        goto end;
                }
 
@@ -2807,10 +2782,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
                        if (!revents) {
+                               /* No activity for this FD (poll implementation). */
                                continue;
                        }
+
                        if (pollfd == ctx->consumer_channel_pipe[0]) {
                                if (revents & (LPOLLERR | LPOLLHUP)) {
                                        DBG("Channel thread pipe hung up");
@@ -3573,15 +3549,6 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
-                       /*
-                        * An empty output file is not valid. We need at least one packet
-                        * generated per stream, even if it contains no event, so it
-                        * contains at least one packet header.
-                        */
-                       if (stream->output_written == 0) {
-                               pthread_mutex_unlock(&stream->lock);
-                               goto data_pending;
-                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
This page took 0.026173 seconds and 4 git commands to generate.