Add new thread in consumer for metadata handling
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 855d07141b6ebe70424ed6ee649babb494e984a4..5e2f7692b1e6e7c35ffbc574fa45f974c4e9f117 100644 (file)
@@ -92,6 +92,11 @@ int lttng_ustconsumer_get_produced_snapshot(
        return ret;
 }
 
+/*
+ * Receive command from session daemon and process it.
+ *
+ * Return 1 on success else a negative value or 0.
+ */
 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
@@ -223,14 +228,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
-               if (ctx->on_recv_stream != NULL) {
-                       ret = ctx->on_recv_stream(new_stream);
-                       if (ret == 0) {
-                               consumer_add_stream(new_stream);
-                       } else if (ret < 0) {
-                               goto end_nosignal;
+               /* Send stream to the metadata thread */
+               if (new_stream->metadata_flag) {
+                       if (ctx->on_recv_stream) {
+                               ret = ctx->on_recv_stream(new_stream);
+                               if (ret < 0) {
+                                       goto end_nosignal;
+                               }
+                       }
+
+                       do {
+                               ret = write(ctx->consumer_metadata_pipe[1], new_stream,
+                                               sizeof(struct lttng_consumer_stream));
+                       } while (ret < 0 && errno == EINTR);
+                       if (ret < 0) {
+                               PERROR("write metadata pipe");
                        }
                } else {
+                       if (ctx->on_recv_stream) {
+                               ret = ctx->on_recv_stream(new_stream);
+                               if (ret < 0) {
+                                       goto end_nosignal;
+                               }
+                       }
                        consumer_add_stream(new_stream);
                }
 
@@ -305,9 +325,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = write(ctx->consumer_poll_pipe[1], "", 1);
        } while (ret < 0 && errno == EINTR);
 end_nosignal:
-       /* XXX: At some point we might want to return something else than zero */
        rcu_read_unlock();
-       return 0;
+
+       /*
+        * Return 1 to indicate success since the 0 value can be a socket
+        * shutdown during the recv() or send() call.
+        */
+       return 1;
 }
 
 int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)
This page took 0.024009 seconds and 4 git commands to generate.