Fix: add consumer wake up pipe to avoid race
[lttng-tools.git] / src / common / consumer.h
index c206970bfdc778467bcdb0aa08c872d47b739ec4..4ac823c017d03754ed545a9a1288df707b84778f 100644 (file)
@@ -41,7 +41,7 @@ enum lttng_consumer_command {
        /* pause, delete, active depending on fd state */
        LTTNG_CONSUMER_UPDATE_STREAM,
        /* inform the consumer to quit when all fd has hang up */
-       LTTNG_CONSUMER_STOP,
+       LTTNG_CONSUMER_STOP,    /* deprecated */
        LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
        /* Inform the consumer to kill a specific relayd connection */
        LTTNG_CONSUMER_DESTROY_RELAYD,
@@ -57,6 +57,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_FLUSH_CHANNEL,
        LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
        LTTNG_CONSUMER_SNAPSHOT_METADATA,
+       LTTNG_CONSUMER_STREAMS_SENT,
 };
 
 /* State of each fd in consumer */
@@ -340,6 +341,9 @@ struct lttng_consumer_stream {
         */
        pthread_cond_t metadata_rdv;
        pthread_mutex_t metadata_rdv_lock;
+
+       /* Indicate if the stream still has some data to be read. */
+       unsigned int has_data:1;
 };
 
 /*
@@ -452,6 +456,19 @@ struct lttng_consumer_local_data {
        int consumer_splice_metadata_pipe[2];
        /* Data stream poll thread pipe. To transfer data stream to the thread */
        struct lttng_pipe *consumer_data_pipe;
+
+       /*
+        * Data thread use that pipe to catch wakeup from read subbuffer that
+        * detects that there is still data to be read for the stream encountered.
+        * Before doing so, the stream is flagged to indicate that there is still
+        * data to be read.
+        *
+        * Both pipes (read/write) are owned and used inside the data thread.
+        */
+       struct lttng_pipe *consumer_wakeup_pipe;
+       /* Indicate if the wakeup thread has been notified. */
+       unsigned int has_wakeup:1;
+
        /* to let the signal handler wake up the fd receiver thread */
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
@@ -511,7 +528,7 @@ struct lttng_consumer_global_data {
 /*
  * Init consumer data structures.
  */
-void lttng_consumer_init(void);
+int lttng_consumer_init(void);
 
 /*
  * Set the error socket for communication with a session daemon.
@@ -593,6 +610,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                uint64_t net_seq_idx);
 struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
 int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path);
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx);
 void close_relayd_stream(struct lttng_consumer_stream *stream);
 struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
@@ -620,6 +638,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos);
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
 void *consumer_thread_metadata_poll(void *data);
 void *consumer_thread_data_poll(void *data);
 void *consumer_thread_sessiond_poll(void *data);
This page took 0.023467 seconds and 4 git commands to generate.