X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.hpp;h=851b3e2b8137eaef4d5904b19deb81a3f9921bdf;hb=HEAD;hp=dd8eb40d44798c8466c7f3d3eaa794ee6d60c169;hpb=c715ddc950bf653d9456d92c6ead2e3cbd3c54ae;p=lttng-tools.git diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index dd8eb40d4..851b3e2b8 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -11,25 +11,25 @@ #ifndef LIB_CONSUMER_H #define LIB_CONSUMER_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + #include #include #include #include #include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - struct lttng_consumer_local_data; /* Commands for consumer */ @@ -39,7 +39,7 @@ enum lttng_consumer_command { /* pause, delete, active depending on fd state */ LTTNG_CONSUMER_UPDATE_STREAM, /* inform the consumer to quit when all fd has hang up */ - LTTNG_CONSUMER_STOP, /* deprecated */ + LTTNG_CONSUMER_STOP, /* deprecated */ LTTNG_CONSUMER_ADD_RELAYD_SOCKET, /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, @@ -82,13 +82,13 @@ enum consumer_endpoint_status { }; enum consumer_channel_output { - CONSUMER_CHANNEL_MMAP = 0, - CONSUMER_CHANNEL_SPLICE = 1, + CONSUMER_CHANNEL_MMAP = 0, + CONSUMER_CHANNEL_SPLICE = 1, }; enum consumer_channel_type { - CONSUMER_CHANNEL_TYPE_METADATA = 0, - CONSUMER_CHANNEL_TYPE_DATA = 1, + CONSUMER_CHANNEL_TYPE_METADATA = 0, + CONSUMER_CHANNEL_TYPE_DATA = 1, }; enum sync_metadata_status { @@ -109,30 +109,30 @@ struct consumer_metadata_cache; struct lttng_consumer_channel { /* Is the channel published in the channel hash tables? */ - bool is_published; + bool is_published = false; /* * Was the channel deleted (logically) and waiting to be reclaimed? * If this flag is set, no modification that is not cleaned-up by the * RCU reclamation callback should be made */ - bool is_deleted; + bool is_deleted = false; /* HT node used for consumer_data.channel_ht */ - struct lttng_ht_node_u64 node; + struct lttng_ht_node_u64 node = {}; /* HT node used for consumer_data.channels_by_session_id_ht */ - struct lttng_ht_node_u64 channels_by_session_id_ht_node; + struct lttng_ht_node_u64 channels_by_session_id_ht_node = {}; /* Indexed key. Incremented value in the consumer. */ - uint64_t key; + uint64_t key = 0; /* Number of streams referencing this channel */ - int refcount; + int refcount = 0; /* Tracing session id on the session daemon side. */ - uint64_t session_id; + uint64_t session_id = 0; /* Current trace chunk of the session in which this channel exists. */ - struct lttng_trace_chunk *trace_chunk; + struct lttng_trace_chunk *trace_chunk = nullptr; /* * Session id when requesting metadata to the session daemon for * a session with per-PID buffers. */ - uint64_t session_id_per_pid; + uint64_t session_id_per_pid = 0; /* * In the case of local streams, this field contains the channel's * output path; a path relative to the session's output path. @@ -144,72 +144,77 @@ struct lttng_consumer_channel { * peers, this contains a path of the form: * /hostname/session_path/ust/uid/1000/64-bit */ - char pathname[PATH_MAX]; + char pathname[PATH_MAX] = {}; /* Channel name. */ - char name[LTTNG_SYMBOL_NAME_LEN]; + char name[LTTNG_SYMBOL_NAME_LEN] = {}; /* Relayd id of the channel. -1ULL if it does not apply. */ - uint64_t relayd_id; + uint64_t relayd_id = 0; /* * Number of streams NOT initialized yet. This is used in order to not * delete this channel if streams are getting initialized. */ - unsigned int nb_init_stream_left; + unsigned int nb_init_stream_left = 0; /* Output type (mmap or splice). */ - enum consumer_channel_output output; + enum consumer_channel_output output = CONSUMER_CHANNEL_MMAP; /* Channel type for stream */ - enum consumer_channel_type type; + enum consumer_channel_type type = CONSUMER_CHANNEL_TYPE_METADATA; /* For UST */ - uid_t ust_app_uid; /* Application UID. */ - struct lttng_ust_ctl_consumer_channel *uchan; - unsigned char uuid[LTTNG_UUID_STR_LEN]; + uid_t ust_app_uid = 65534; /* Application UID. */ + struct lttng_ust_ctl_consumer_channel *uchan = nullptr; + unsigned char uuid[LTTNG_UUID_STR_LEN] = {}; /* * Temporary stream list used to store the streams once created and waiting * to be sent to the session daemon by receiving the * LTTNG_CONSUMER_GET_CHANNEL. */ - struct stream_list streams; + struct stream_list streams = {}; /* * Set if the channel is metadata. We keep a reference to the stream * because we have to flush data once pushed by the session daemon. For a * regular channel, this is always set to NULL. */ - struct lttng_consumer_stream *metadata_stream; + struct lttng_consumer_stream *metadata_stream = nullptr; /* for UST */ - int wait_fd; + int wait_fd = -1; /* Node within channel thread ht */ - struct lttng_ht_node_u64 wait_fd_node; + struct lttng_ht_node_u64 wait_fd_node = {}; /* Metadata cache is metadata channel */ - struct consumer_metadata_cache *metadata_cache; + struct consumer_metadata_cache *metadata_cache = nullptr; + + /* + * Wait queue awaiting updates to metadata stream's flushed position. + */ + lttng::synchro::wait_queue metadata_pushed_wait_queue; /* For UST metadata periodical flush */ - int switch_timer_enabled; - timer_t switch_timer; - int switch_timer_error; + int switch_timer_enabled = 0; + timer_t switch_timer = {}; + int switch_timer_error = 0; /* For the live mode */ - int live_timer_enabled; - timer_t live_timer; - int live_timer_error; + int live_timer_enabled = 0; + timer_t live_timer = {}; + int live_timer_error = 0; /* Channel is part of a live session ? */ - bool is_live; + bool is_live = false; /* For channel monitoring timer. */ - int monitor_timer_enabled; - timer_t monitor_timer; + int monitor_timer_enabled = 0; + timer_t monitor_timer = {}; /* On-disk circular buffer */ - uint64_t tracefile_size; - uint64_t tracefile_count; + uint64_t tracefile_size = 0; + uint64_t tracefile_count = 0; /* * Monitor or not the streams of this channel meaning this indicates if the * streams should be sent to the data/metadata thread or added to the no * monitor list of the channel. */ - unsigned int monitor; + unsigned int monitor = 0; /* * Channel lock. @@ -222,7 +227,7 @@ struct lttng_consumer_channel { * This is nested OUTSIDE stream lock. * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ - pthread_mutex_t lock; + pthread_mutex_t lock = {}; /* * Channel teardown lock. @@ -236,23 +241,24 @@ struct lttng_consumer_channel { * This is nested OUTSIDE stream lock. * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ - pthread_mutex_t timer_lock; + pthread_mutex_t timer_lock = {}; /* Timer value in usec for live streaming. */ - unsigned int live_timer_interval; + unsigned int live_timer_interval = 0; - int *stream_fds; - int nr_stream_fds; - char root_shm_path[PATH_MAX]; - char shm_path[PATH_MAX]; + int *stream_fds = nullptr; + int nr_stream_fds = 0; + char root_shm_path[PATH_MAX] = {}; + char shm_path[PATH_MAX] = {}; /* Only set for UST channels. */ - LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials; + LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials = {}; /* Total number of discarded events for that channel. */ - uint64_t discarded_events; + uint64_t discarded_events = 0; /* Total number of missed packets due to overwriting (overwrite). */ - uint64_t lost_packets; + uint64_t lost_packets = 0; - bool streams_sent_to_relayd; + bool streams_sent_to_relayd = false; + uint64_t last_consumed_size_sample_sent = false; }; struct stream_subbuffer { @@ -312,7 +318,7 @@ enum get_next_subbuffer_status { * * Stream and channel locks are acquired during this call. */ -typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); +using on_wake_up_cb = int (*)(struct lttng_consumer_stream *); /* * Perform any operation required before a consumer stream is put @@ -320,16 +326,15 @@ typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); * * Stream and channel locks are acquired during this call. */ -typedef int (*on_sleep_cb)(struct lttng_consumer_stream *, - struct lttng_consumer_local_data *); +using on_sleep_cb = int (*)(struct lttng_consumer_stream *, struct lttng_consumer_local_data *); /* * Acquire the subbuffer at the current 'consumed' position. * * Stream and channel locks are acquired during this call. */ -typedef enum get_next_subbuffer_status (*get_next_subbuffer_cb)( - struct lttng_consumer_stream *, struct stream_subbuffer *); +using get_next_subbuffer_cb = enum get_next_subbuffer_status (*)(struct lttng_consumer_stream *, + struct stream_subbuffer *); /* * Populate the stream_subbuffer's info member. The info to populate @@ -337,25 +342,25 @@ typedef enum get_next_subbuffer_status (*get_next_subbuffer_cb)( * * Stream and channel locks are acquired during this call. */ -typedef int (*extract_subbuffer_info_cb)( - struct lttng_consumer_stream *, struct stream_subbuffer *); +using extract_subbuffer_info_cb = int (*)(struct lttng_consumer_stream *, + struct stream_subbuffer *); /* * Invoked after a subbuffer's info has been filled. * * Stream and channel locks are acquired during this call. */ -typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *, - const struct stream_subbuffer *); +using pre_consume_subbuffer_cb = int (*)(struct lttng_consumer_stream *, + const struct stream_subbuffer *); /* * Consume subbuffer contents. * * Stream and channel locks are acquired during this call. */ -typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, - struct lttng_consumer_stream *, - const struct stream_subbuffer *); +using consume_subbuffer_cb = ssize_t (*)(struct lttng_consumer_local_data *, + struct lttng_consumer_stream *, + const struct stream_subbuffer *); /* * Release the current subbuffer and advance the 'consumed' position by @@ -363,31 +368,30 @@ typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, * * Stream and channel locks are acquired during this call. */ -typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *, - struct stream_subbuffer *); +using put_next_subbuffer_cb = int (*)(struct lttng_consumer_stream *, struct stream_subbuffer *); /* * Invoked after consuming a subbuffer. * * Stream and channel locks are acquired during this call. */ -typedef int (*post_consume_cb)(struct lttng_consumer_stream *, - const struct stream_subbuffer *, - struct lttng_consumer_local_data *); +using post_consume_cb = int (*)(struct lttng_consumer_stream *, + const struct stream_subbuffer *, + struct lttng_consumer_local_data *); /* * Send a live beacon if no data is available. * * Stream and channel locks are acquired during this call. */ -typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *); +using send_live_beacon_cb = int (*)(struct lttng_consumer_stream *); /* * Lock the stream and channel locks and any other stream-type specific * lock that need to be acquired during the processing of an * availability notification. */ -typedef void (*lock_cb)(struct lttng_consumer_stream *); +using lock_cb = void (*)(struct lttng_consumer_stream *); /* * Unlock the stream and channel locks and any other stream-type specific @@ -395,14 +399,14 @@ typedef void (*lock_cb)(struct lttng_consumer_stream *); * * Stream and channel locks are acquired during this call. */ -typedef void (*unlock_cb)(struct lttng_consumer_stream *); +using unlock_cb = void (*)(struct lttng_consumer_stream *); /* * Assert that the stream and channel lock and any other stream type specific * lock that need to be acquired during the processing of a read_subbuffer * operation is acquired. */ -typedef void (*assert_locked_cb)(struct lttng_consumer_stream *); +using assert_locked_cb = void (*)(struct lttng_consumer_stream *); /* * Invoked when a subbuffer's metadata version does not match the last @@ -410,7 +414,7 @@ typedef void (*assert_locked_cb)(struct lttng_consumer_stream *); * * Stream and channel locks are acquired during this call. */ -typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *); +using reset_metadata_cb = void (*)(struct lttng_consumer_stream *); /* * Internal representation of the streams, sessiond_key is used to identify @@ -722,8 +726,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -815,7 +819,7 @@ struct lttng_consumer_global_data { * This is nested OUTSIDE the stream lock. * This is nested OUTSIDE the consumer_relayd_sock_pair lock. */ - pthread_mutex_t lock {}; + pthread_mutex_t lock{}; /* * Number of streams in the data stream hash table declared outside. @@ -880,8 +884,7 @@ extern int consumer_quit; LTTNG_EXPORT extern int data_consumption_paused; /* Return a human-readable consumer type string that is suitable for logging. */ -static inline -const char *lttng_consumer_type_str(enum lttng_consumer_type type) +static inline const char *lttng_consumer_type_str(enum lttng_consumer_type type) { switch (type) { case LTTNG_CONSUMER_UNKNOWN: @@ -900,19 +903,17 @@ const char *lttng_consumer_type_str(enum lttng_consumer_type type) /* * Init consumer data structures. */ -int lttng_consumer_init(void); +int lttng_consumer_init(); /* * Set the error socket for communication with a session daemon. */ -void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, - int sock); +void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock); /* * Set the command socket path for communication with a session daemon. */ -void lttng_consumer_set_command_sock_path( - struct lttng_consumer_local_data *ctx, char *sock); +void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock); /* * Send return code to session daemon. @@ -920,7 +921,8 @@ void lttng_consumer_set_command_sock_path( * Returns the return code of sendmsg : the number of bytes transmitted or -1 * on error. */ -int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd); +int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, + enum lttcomm_return_code error_code); /* * Called from signal handler to ensure a clean exit. @@ -930,7 +932,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx); /* * Cleanup the daemon's socket on exit. */ -void lttng_consumer_cleanup(void); +void lttng_consumer_cleanup(); /* * Poll on the should_quit pipe and the command socket return -1 on error and @@ -942,43 +944,39 @@ int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); * Copy the fields from the channel that need to be accessed (read-only) * directly from the stream. */ -void consumer_stream_update_channel_attributes( - struct lttng_consumer_stream *stream, - struct lttng_consumer_channel *channel); - -struct lttng_consumer_stream *consumer_allocate_stream( - struct lttng_consumer_channel *channel, - uint64_t channel_key, - uint64_t stream_key, - const char *channel_name, - uint64_t relayd_id, - uint64_t session_id, - struct lttng_trace_chunk *trace_chunk, - int cpu, - int *alloc_ret, - enum consumer_channel_type type, - unsigned int monitor); +void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel); + +struct lttng_consumer_stream *consumer_allocate_stream(struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + const char *channel_name, + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, - uint64_t session_id, - const uint64_t *chunk_id, - const char *pathname, - const char *name, - uint64_t relayd_id, - enum lttng_event_output output, - uint64_t tracefile_size, - uint64_t tracefile_count, - uint64_t session_id_per_pid, - unsigned int monitor, - unsigned int live_timer_interval, - bool is_in_live_session, - const char *root_shm_path, - const char *shm_path); -void consumer_del_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht); -void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht); + uint64_t session_id, + const uint64_t *chunk_id, + const char *pathname, + const char *name, + uint64_t relayd_id, + enum lttng_event_output output, + uint64_t tracefile_size, + uint64_t tracefile_count, + uint64_t session_id_per_pid, + unsigned int monitor, + unsigned int live_timer_interval, + bool is_in_live_session, + const char *root_shm_path, + const char *shm_path); +void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); +void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); int consumer_add_channel(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx); void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ @@ -987,33 +985,29 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); -int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, - size_t data_size); +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); void consumer_steal_stream_key(int key, struct lttng_ht *ht); -struct lttng_consumer_local_data *lttng_consumer_create( - enum lttng_consumer_type type, - ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller), - int (*recv_channel)(struct lttng_consumer_channel *channel), - int (*recv_stream)(struct lttng_consumer_stream *stream), - int (*update_stream)(uint64_t sessiond_key, uint32_t state)); +struct lttng_consumer_local_data * +lttng_consumer_create(enum lttng_consumer_type type, + ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), + int (*recv_channel)(struct lttng_consumer_channel *channel), + int (*recv_stream)(struct lttng_consumer_stream *stream), + int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); -ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_stream *stream, - const struct lttng_buffer_view *buffer, - unsigned long padding); -ssize_t lttng_consumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding); +ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream, + const struct lttng_buffer_view *buffer, + unsigned long padding); +ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long len, + unsigned long padding); int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); -int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos); -int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos); +int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); @@ -1021,69 +1015,71 @@ void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data); void *consumer_thread_channel_poll(void *data); int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll); + int sock, + struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t net_seq_idx, - int sock_type, - struct lttng_consumer_local_data *ctx, - int sock, - struct pollfd *consumer_sockpoll, - uint64_t sessiond_id, - uint64_t relayd_session_id, - uint32_t relayd_version_major, - uint32_t relayd_version_minor, - enum lttcomm_sock_proto relayd_socket_protocol); -void consumer_flag_relayd_for_destroy( - struct consumer_relayd_sock_pair *relayd); + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, + struct pollfd *consumer_sockpoll, + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol); +void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); int consumer_send_status_msg(int sock, int ret_code); -int consumer_send_status_channel(int sock, - struct lttng_consumer_channel *channel); -void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, - uint64_t key); +int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel); +void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key); void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd); unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t nb_packets_per_stream, - uint64_t max_sb_size); + unsigned long produced_pos, + uint64_t nb_packets_per_stream, + uint64_t max_sb_size); void consumer_add_data_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, uint64_t relayd_id); + uint64_t key, + uint64_t relayd_id); int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream); -int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, - uint64_t key); +int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); -enum lttcomm_return_code lttng_consumer_create_trace_chunk( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, - time_t chunk_creation_timestamp, - const char *chunk_override_name, - const struct lttng_credentials *credentials, - struct lttng_directory_handle *chunk_directory_handle); -enum lttcomm_return_code lttng_consumer_close_trace_chunk( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command, - char *path); -enum lttcomm_return_code lttng_consumer_trace_chunk_exists( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id); +enum lttcomm_return_code +lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle); +enum lttcomm_return_code +lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id, + time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command, + char *path); +enum lttcomm_return_code lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id); void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); -enum lttcomm_return_code lttng_consumer_init_command( - struct lttng_consumer_local_data *ctx, - const lttng_uuid& sessiond_uuid); +enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx, + const lttng_uuid& sessiond_uuid); int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); -enum lttcomm_return_code lttng_consumer_open_channel_packets( - struct lttng_consumer_channel *channel); +enum lttcomm_return_code +lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel); int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); void lttng_consumer_sigbus_handle(void *addr); +void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */