Rename consumer threads and spawn them in daemon
[lttng-tools.git] / src / common / consumer.c
index 242b05b3d6bb9c65939ea72a900a84fa74d382b7..cf1b7b96f95a6c09897d44b9f6d735013513d035 100644 (file)
@@ -1756,7 +1756,7 @@ error:
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
  */
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
 {
        int ret, i, pollfd;
        uint32_t revents, nb_fd;
@@ -1888,12 +1888,15 @@ restart:
                                        lttng_ustconsumer_on_stream_hangup(stream);
 
                                        /* We just flushed the stream now read it. */
-                                       len = ctx->on_buffer_ready(stream, ctx);
-                                       /* It's ok to have an unavailable sub-buffer */
-                                       if (len < 0 && len != -EAGAIN) {
-                                               rcu_read_unlock();
-                                               goto end;
-                                       }
+                                       do {
+                                               len = ctx->on_buffer_ready(stream, ctx);
+                                               /*
+                                                * We don't check the return value here since if we get
+                                                * a negative len, it means an error occured thus we
+                                                * simply remove it from the poll set and free the
+                                                * stream.
+                                                */
+                                       } while (len > 0);
                                }
 
                                lttng_poll_del(&events, stream->wait_fd);
@@ -1909,7 +1912,7 @@ restart:
 
                                len = ctx->on_buffer_ready(stream, ctx);
                                /* It's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN) {
+                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        rcu_read_unlock();
                                        goto end;
                                } else if (len > 0) {
@@ -1939,7 +1942,7 @@ end:
  * This thread polls the fds in the set to consume the data and write
  * it to tracefile if necessary.
  */
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
 {
        int num_rdy, num_hup, high_prio, ret, i;
        struct pollfd *pollfd = NULL;
@@ -1949,19 +1952,9 @@ void *lttng_consumer_thread_poll_fds(void *data)
        int nb_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
-       pthread_t metadata_thread;
-       void *status;
 
        rcu_register_thread();
 
-       /* Start metadata polling thread */
-       ret = pthread_create(&metadata_thread, NULL,
-                       lttng_consumer_thread_poll_metadata, (void *) ctx);
-       if (ret < 0) {
-               PERROR("pthread_create metadata thread");
-               goto end;
-       }
-
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
 
        while (1) {
@@ -2059,7 +2052,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                high_prio = 1;
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN) {
+                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        goto end;
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
@@ -2082,7 +2075,7 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN) {
+                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        goto end;
                                } else if (len > 0) {
                                        local_stream[i]->data_read = 1;
@@ -2145,19 +2138,13 @@ end:
 
        /*
         * Close the write side of the pipe so epoll_wait() in
-        * lttng_consumer_thread_poll_metadata can catch it. The thread is
-        * monitoring the read side of the pipe. If we close them both, epoll_wait
-        * strangely does not return and could create a endless wait period if the
-        * pipe is the only tracked fd in the poll set. The thread will take care
-        * of closing the read side.
+        * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+        * read side of the pipe. If we close them both, epoll_wait strangely does
+        * not return and could create a endless wait period if the pipe is the
+        * only tracked fd in the poll set. The thread will take care of closing
+        * the read side.
         */
        close(ctx->consumer_metadata_pipe[1]);
-       if (ret) {
-               ret = pthread_join(metadata_thread, &status);
-               if (ret < 0) {
-                       PERROR("pthread_join metadata thread");
-               }
-       }
 
        rcu_unregister_thread();
        return NULL;
@@ -2167,7 +2154,7 @@ end:
  * This thread listens on the consumerd socket and receives the file
  * descriptors from the session daemon.
  */
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
 {
        int sock, client_socket, ret;
        /*
This page took 0.037212 seconds and 4 git commands to generate.