X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=84ef2719d47e55eb7abc8626cdf37b3a11da525c;hb=994ab360b3264e19fdf590178601fa1f9f6489d0;hp=2003cbe43003f304c7bf60387416b9a5c8a1da76;hpb=567eb353c7f88e2fdaa106eb7e0a38dbb8717792;p=lttng-tools.git diff --git a/src/common/consumer.h b/src/common/consumer.h index 2003cbe43..84ef2719d 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -32,6 +32,7 @@ #include #include #include +#include /* Commands for consumer */ enum lttng_consumer_command { @@ -56,6 +57,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_FLUSH_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_METADATA, + LTTNG_CONSUMER_STREAMS_SENT, }; /* State of each fd in consumer */ @@ -155,11 +157,16 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; - /* For metadata periodical flush */ + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; int switch_timer_error; + /* For the live mode */ + int live_timer_enabled; + timer_t live_timer; + int live_timer_error; + /* On-disk circular buffer */ uint64_t tracefile_size; uint64_t tracefile_count; @@ -196,6 +203,9 @@ struct lttng_consumer_channel { * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ pthread_mutex_t timer_lock; + + /* Timer value in usec for live streaming. */ + unsigned int live_timer_interval; }; /* @@ -221,6 +231,8 @@ struct lttng_consumer_stream { int out_fd; /* output file to write the data */ /* Write position in the output file descriptor */ off_t out_fd_offset; + /* Amount of bytes written to the output */ + uint64_t output_written; enum lttng_consumer_stream_state state; int shm_fd_is_copy; int data_read; @@ -319,6 +331,16 @@ struct lttng_consumer_stream { * to the channel. */ uint64_t ust_metadata_pushed; + /* + * FD of the index file for this stream. + */ + int index_fd; + + /* + * Rendez-vous point between data and metadata stream in live mode. + */ + pthread_cond_t metadata_rdv; + pthread_mutex_t metadata_rdv_lock; }; /* @@ -490,7 +512,7 @@ struct lttng_consumer_global_data { /* * Init consumer data structures. */ -void lttng_consumer_init(void); +int lttng_consumer_init(void); /* * Set the error socket for communication with a session daemon. @@ -557,7 +579,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, - unsigned int monitor); + unsigned int monitor, + unsigned int live_timer_interval); void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, @@ -571,6 +594,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( uint64_t net_seq_idx); struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key); int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path); +int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, @@ -588,14 +612,18 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding); + unsigned long padding, + struct ctf_packet_index *index); ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding); + unsigned long padding, + struct ctf_packet_index *index); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); +int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data); @@ -609,7 +637,7 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, - uint64_t sessiond_id); + uint64_t sessiond_id, uint64_t relayd_session_id); void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); @@ -625,5 +653,6 @@ int consumer_add_data_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); int consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); +int consumer_create_index_file(struct lttng_consumer_stream *stream); #endif /* LIB_CONSUMER_H */