Fix: free metadata cache after grace period in consumer
[lttng-tools.git] / src / common / consumer.h
index a7517a34083db737d9be2b2b51b61ddb3a60eb36..ac3b4903f6b8c3e945432b35a471100a8998aede 100644 (file)
@@ -41,7 +41,7 @@ enum lttng_consumer_command {
        /* pause, delete, active depending on fd state */
        LTTNG_CONSUMER_UPDATE_STREAM,
        /* inform the consumer to quit when all fd has hang up */
-       LTTNG_CONSUMER_STOP,
+       LTTNG_CONSUMER_STOP,    /* deprecated */
        LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
        /* Inform the consumer to kill a specific relayd connection */
        LTTNG_CONSUMER_DESTROY_RELAYD,
@@ -206,6 +206,11 @@ struct lttng_consumer_channel {
 
        /* Timer value in usec for live streaming. */
        unsigned int live_timer_interval;
+
+       int *stream_fds;
+       int nr_stream_fds;
+       char root_shm_path[PATH_MAX];
+       char shm_path[PATH_MAX];
 };
 
 /*
@@ -237,6 +242,21 @@ struct lttng_consumer_stream {
        int shm_fd_is_copy;
        int data_read;
        int hangup_flush_done;
+
+       /*
+        * metadata_timer_lock protects flags waiting_on_metadata and
+        * missed_metadata_flush.
+        */
+       pthread_mutex_t metadata_timer_lock;
+       /*
+        * Flag set when awaiting metadata to be pushed. Used in the
+        * timer thread to skip waiting on the stream (and stream lock) to
+        * ensure we can proceed to flushing metadata in live mode.
+        */
+       bool waiting_on_metadata;
+       /* Raised when a timer misses a metadata flush. */
+       bool missed_metadata_flush;
+
        enum lttng_event_output output;
        /* Maximum subbuffer size. */
        unsigned long max_sb_size;
@@ -336,11 +356,19 @@ struct lttng_consumer_stream {
         */
        int index_fd;
 
+       /*
+        * Local pipe to extract data when using splice.
+        */
+       int splice_pipe[2];
+
        /*
         * Rendez-vous point between data and metadata stream in live mode.
         */
        pthread_cond_t metadata_rdv;
        pthread_mutex_t metadata_rdv_lock;
+
+       /* Indicate if the stream still has some data to be read. */
+       unsigned int has_data:1;
 };
 
 /*
@@ -448,11 +476,22 @@ struct lttng_consumer_local_data {
        /* socket to exchange commands with sessiond */
        char *consumer_command_sock_path;
        /* communication with splice */
-       int consumer_thread_pipe[2];
        int consumer_channel_pipe[2];
-       int consumer_splice_metadata_pipe[2];
        /* Data stream poll thread pipe. To transfer data stream to the thread */
        struct lttng_pipe *consumer_data_pipe;
+
+       /*
+        * Data thread use that pipe to catch wakeup from read subbuffer that
+        * detects that there is still data to be read for the stream encountered.
+        * Before doing so, the stream is flagged to indicate that there is still
+        * data to be read.
+        *
+        * Both pipes (read/write) are owned and used inside the data thread.
+        */
+       struct lttng_pipe *consumer_wakeup_pipe;
+       /* Indicate if the wakeup thread has been notified. */
+       unsigned int has_wakeup:1;
+
        /* to let the signal handler wake up the fd receiver thread */
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
@@ -580,7 +619,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
                unsigned int monitor,
-               unsigned int live_timer_interval);
+               unsigned int live_timer_interval,
+               const char *root_shm_path,
+               const char *shm_path);
 void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
@@ -622,6 +663,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos);
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
 void *consumer_thread_metadata_poll(void *data);
 void *consumer_thread_data_poll(void *data);
 void *consumer_thread_sessiond_poll(void *data);
@@ -645,8 +688,9 @@ int consumer_send_status_channel(int sock,
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
                uint64_t key);
 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size);
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size);
 int consumer_add_data_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
 int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
This page took 0.024356 seconds and 4 git commands to generate.