X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=ac3b4903f6b8c3e945432b35a471100a8998aede;hp=aef7f560e4dcae78e62343eae1492de6ea85434b;hb=b83e03c49920557f292d3861f42d0109e6fa03ea;hpb=94d4914075c61cd1ee2ec00d8b61eacff105fc47 diff --git a/src/common/consumer.h b/src/common/consumer.h index aef7f560e..ac3b4903f 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -32,7 +32,7 @@ #include #include #include -#include +#include /* Commands for consumer */ enum lttng_consumer_command { @@ -41,7 +41,7 @@ enum lttng_consumer_command { /* pause, delete, active depending on fd state */ LTTNG_CONSUMER_UPDATE_STREAM, /* inform the consumer to quit when all fd has hang up */ - LTTNG_CONSUMER_STOP, + LTTNG_CONSUMER_STOP, /* deprecated */ LTTNG_CONSUMER_ADD_RELAYD_SOCKET, /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, @@ -57,6 +57,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_FLUSH_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_METADATA, + LTTNG_CONSUMER_STREAMS_SENT, }; /* State of each fd in consumer */ @@ -205,6 +206,11 @@ struct lttng_consumer_channel { /* Timer value in usec for live streaming. */ unsigned int live_timer_interval; + + int *stream_fds; + int nr_stream_fds; + char root_shm_path[PATH_MAX]; + char shm_path[PATH_MAX]; }; /* @@ -236,6 +242,21 @@ struct lttng_consumer_stream { int shm_fd_is_copy; int data_read; int hangup_flush_done; + + /* + * metadata_timer_lock protects flags waiting_on_metadata and + * missed_metadata_flush. + */ + pthread_mutex_t metadata_timer_lock; + /* + * Flag set when awaiting metadata to be pushed. Used in the + * timer thread to skip waiting on the stream (and stream lock) to + * ensure we can proceed to flushing metadata in live mode. + */ + bool waiting_on_metadata; + /* Raised when a timer misses a metadata flush. */ + bool missed_metadata_flush; + enum lttng_event_output output; /* Maximum subbuffer size. */ unsigned long max_sb_size; @@ -335,11 +356,19 @@ struct lttng_consumer_stream { */ int index_fd; + /* + * Local pipe to extract data when using splice. + */ + int splice_pipe[2]; + /* * Rendez-vous point between data and metadata stream in live mode. */ pthread_cond_t metadata_rdv; pthread_mutex_t metadata_rdv_lock; + + /* Indicate if the stream still has some data to be read. */ + unsigned int has_data:1; }; /* @@ -447,11 +476,22 @@ struct lttng_consumer_local_data { /* socket to exchange commands with sessiond */ char *consumer_command_sock_path; /* communication with splice */ - int consumer_thread_pipe[2]; int consumer_channel_pipe[2]; - int consumer_splice_metadata_pipe[2]; /* Data stream poll thread pipe. To transfer data stream to the thread */ struct lttng_pipe *consumer_data_pipe; + + /* + * Data thread use that pipe to catch wakeup from read subbuffer that + * detects that there is still data to be read for the stream encountered. + * Before doing so, the stream is flagged to indicate that there is still + * data to be read. + * + * Both pipes (read/write) are owned and used inside the data thread. + */ + struct lttng_pipe *consumer_wakeup_pipe; + /* Indicate if the wakeup thread has been notified. */ + unsigned int has_wakeup:1; + /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ @@ -511,7 +551,7 @@ struct lttng_consumer_global_data { /* * Init consumer data structures. */ -void lttng_consumer_init(void); +int lttng_consumer_init(void); /* * Set the error socket for communication with a session daemon. @@ -579,7 +619,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_count, uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval); + unsigned int live_timer_interval, + const char *root_shm_path, + const char *shm_path); void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, @@ -593,6 +635,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( uint64_t net_seq_idx); struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key); int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path); +int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, @@ -611,15 +654,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, - struct lttng_packet_index *index); + struct ctf_packet_index *index); ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding, - struct lttng_packet_index *index); + struct ctf_packet_index *index); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); +int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data); @@ -643,8 +688,9 @@ int consumer_send_status_channel(int sock, void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key); void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd); -unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t max_stream_size); +unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, + unsigned long produced_pos, uint64_t nb_packets_per_stream, + uint64_t max_sb_size); int consumer_add_data_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);