#ifndef LIB_CONSUMER_H
#define LIB_CONSUMER_H
+#include <common/buffer-view.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/credentials.hpp>
+#include <common/dynamic-array.hpp>
+#include <common/hashtable/hashtable.hpp>
+#include <common/index/ctf-index.hpp>
+#include <common/pipe.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/trace-chunk-registry.hpp>
+#include <common/uuid.hpp>
+
+#include <lttng/lttng.h>
+
#include <limits.h>
#include <poll.h>
#include <stdint.h>
#include <unistd.h>
#include <urcu/list.h>
-#include <lttng/lttng.h>
-
-#include <common/hashtable/hashtable.hpp>
-#include <common/compat/fcntl.hpp>
-#include <common/uuid.hpp>
-#include <common/sessiond-comm/sessiond-comm.hpp>
-#include <common/pipe.hpp>
-#include <common/index/ctf-index.hpp>
-#include <common/trace-chunk-registry.hpp>
-#include <common/credentials.hpp>
-#include <common/buffer-view.hpp>
-#include <common/dynamic-array.hpp>
-
struct lttng_consumer_local_data;
/* Commands for consumer */
/* pause, delete, active depending on fd state */
LTTNG_CONSUMER_UPDATE_STREAM,
/* inform the consumer to quit when all fd has hang up */
- LTTNG_CONSUMER_STOP, /* deprecated */
+ LTTNG_CONSUMER_STOP, /* deprecated */
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
};
enum consumer_channel_output {
- CONSUMER_CHANNEL_MMAP = 0,
- CONSUMER_CHANNEL_SPLICE = 1,
+ CONSUMER_CHANNEL_MMAP = 0,
+ CONSUMER_CHANNEL_SPLICE = 1,
};
enum consumer_channel_type {
- CONSUMER_CHANNEL_TYPE_METADATA = 0,
- CONSUMER_CHANNEL_TYPE_DATA = 1,
+ CONSUMER_CHANNEL_TYPE_METADATA = 0,
+ CONSUMER_CHANNEL_TYPE_DATA = 1,
};
enum sync_metadata_status {
enum consumer_channel_type type;
/* For UST */
- uid_t ust_app_uid; /* Application UID. */
+ uid_t ust_app_uid; /* Application UID. */
struct lttng_ust_ctl_consumer_channel *uchan;
unsigned char uuid[LTTNG_UUID_STR_LEN];
/*
* Returns the number of bytes read, or negative error value.
*/
ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller);
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller);
/*
* function to call when we receive a new channel, it receives a
* newly allocated channel, depending on the return code of this
* This is nested OUTSIDE the stream lock.
* This is nested OUTSIDE the consumer_relayd_sock_pair lock.
*/
- pthread_mutex_t lock {};
+ pthread_mutex_t lock{};
/*
* Number of streams in the data stream hash table declared outside.
LTTNG_EXPORT extern int data_consumption_paused;
/* Return a human-readable consumer type string that is suitable for logging. */
-static inline
-const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+static inline const char *lttng_consumer_type_str(enum lttng_consumer_type type)
{
switch (type) {
case LTTNG_CONSUMER_UNKNOWN:
/*
* Set the error socket for communication with a session daemon.
*/
-void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
- int sock);
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock);
/*
* Set the command socket path for communication with a session daemon.
*/
-void lttng_consumer_set_command_sock_path(
- struct lttng_consumer_local_data *ctx, char *sock);
+void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock);
/*
* Send return code to session daemon.
* Copy the fields from the channel that need to be accessed (read-only)
* directly from the stream.
*/
-void consumer_stream_update_channel_attributes(
- struct lttng_consumer_stream *stream,
- struct lttng_consumer_channel *channel);
-
-struct lttng_consumer_stream *consumer_allocate_stream(
- struct lttng_consumer_channel *channel,
- uint64_t channel_key,
- uint64_t stream_key,
- const char *channel_name,
- uint64_t relayd_id,
- uint64_t session_id,
- struct lttng_trace_chunk *trace_chunk,
- int cpu,
- int *alloc_ret,
- enum consumer_channel_type type,
- unsigned int monitor);
+void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel);
+
+struct lttng_consumer_stream *consumer_allocate_stream(struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
+ uint64_t stream_key,
+ const char *channel_name,
+ uint64_t relayd_id,
+ uint64_t session_id,
+ struct lttng_trace_chunk *trace_chunk,
+ int cpu,
+ int *alloc_ret,
+ enum consumer_channel_type type,
+ unsigned int monitor);
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
- uint64_t session_id,
- const uint64_t *chunk_id,
- const char *pathname,
- const char *name,
- uint64_t relayd_id,
- enum lttng_event_output output,
- uint64_t tracefile_size,
- uint64_t tracefile_count,
- uint64_t session_id_per_pid,
- unsigned int monitor,
- unsigned int live_timer_interval,
- bool is_in_live_session,
- const char *root_shm_path,
- const char *shm_path);
-void consumer_del_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht);
-void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht);
+ uint64_t session_id,
+ const uint64_t *chunk_id,
+ const char *pathname,
+ const char *name,
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval,
+ bool is_in_live_session,
+ const char *root_shm_path,
+ const char *shm_path);
+void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht);
+void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht);
int consumer_add_channel(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx);
+ struct lttng_consumer_local_data *ctx);
void consumer_del_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
int consumer_send_relayd_streams_sent(uint64_t net_seq_idx);
void close_relayd_stream(struct lttng_consumer_stream *stream);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
- size_t data_size);
+int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size);
void consumer_steal_stream_key(int key, struct lttng_ht *ht);
-struct lttng_consumer_local_data *lttng_consumer_create(
- enum lttng_consumer_type type,
- ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller),
- int (*recv_channel)(struct lttng_consumer_channel *channel),
- int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(uint64_t sessiond_key, uint32_t state));
+struct lttng_consumer_local_data *
+lttng_consumer_create(enum lttng_consumer_type type,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller),
+ int (*recv_channel)(struct lttng_consumer_channel *channel),
+ int (*recv_stream)(struct lttng_consumer_stream *stream),
+ int (*update_stream)(uint64_t sessiond_key, uint32_t state));
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
-ssize_t lttng_consumer_on_read_subbuffer_mmap(
- struct lttng_consumer_stream *stream,
- const struct lttng_buffer_view *buffer,
- unsigned long padding);
-ssize_t lttng_consumer_on_read_subbuffer_splice(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding);
+ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
+ const struct lttng_buffer_view *buffer,
+ unsigned long padding);
+ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream,
+ unsigned long len,
+ unsigned long padding);
int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
-int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos);
-int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos);
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos);
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos);
int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
void *consumer_thread_channel_poll(void *data);
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
- int sock, struct pollfd *consumer_sockpoll);
+ int sock,
+ struct pollfd *consumer_sockpoll);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller);
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
void consumer_add_relayd_socket(uint64_t net_seq_idx,
- int sock_type,
- struct lttng_consumer_local_data *ctx,
- int sock,
- struct pollfd *consumer_sockpoll,
- uint64_t sessiond_id,
- uint64_t relayd_session_id,
- uint32_t relayd_version_major,
- uint32_t relayd_version_minor,
- enum lttcomm_sock_proto relayd_socket_protocol);
-void consumer_flag_relayd_for_destroy(
- struct consumer_relayd_sock_pair *relayd);
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
+ struct pollfd *consumer_sockpoll,
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol);
+void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
int consumer_send_status_msg(int sock, int ret_code);
-int consumer_send_status_channel(int sock,
- struct lttng_consumer_channel *channel);
-void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
- uint64_t key);
+int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key);
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
- unsigned long produced_pos, uint64_t nb_packets_per_stream,
- uint64_t max_sb_size);
+ unsigned long produced_pos,
+ uint64_t nb_packets_per_stream,
+ uint64_t max_sb_size);
void consumer_add_data_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
- uint64_t key, uint64_t relayd_id);
+ uint64_t key,
+ uint64_t relayd_id);
int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream);
-int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
- uint64_t key);
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
-enum lttcomm_return_code lttng_consumer_create_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id,
- time_t chunk_creation_timestamp,
- const char *chunk_override_name,
- const struct lttng_credentials *credentials,
- struct lttng_directory_handle *chunk_directory_handle);
-enum lttcomm_return_code lttng_consumer_close_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp,
- const enum lttng_trace_chunk_command_type *close_command,
- char *path);
-enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id);
+enum lttcomm_return_code
+lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle);
+enum lttcomm_return_code
+lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path);
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id);
void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
-enum lttcomm_return_code lttng_consumer_init_command(
- struct lttng_consumer_local_data *ctx,
- const lttng_uuid& sessiond_uuid);
+enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
+ const lttng_uuid& sessiond_uuid);
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
-enum lttcomm_return_code lttng_consumer_open_channel_packets(
- struct lttng_consumer_channel *channel);
+enum lttcomm_return_code
+lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel);
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
void lttng_consumer_sigbus_handle(void *addr);
void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel);