#include <common/compat/fcntl.h>
#include <common/compat/uuid.h>
#include <common/sessiond-comm/sessiond-comm.h>
+#include <common/pipe.h>
+#include <common/index/ctf-index.h>
/* Commands for consumer */
enum lttng_consumer_command {
/* pause, delete, active depending on fd state */
LTTNG_CONSUMER_UPDATE_STREAM,
/* inform the consumer to quit when all fd has hang up */
- LTTNG_CONSUMER_STOP,
+ LTTNG_CONSUMER_STOP, /* deprecated */
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
LTTNG_CONSUMER_ASK_CHANNEL_CREATION,
LTTNG_CONSUMER_GET_CHANNEL,
LTTNG_CONSUMER_DESTROY_CHANNEL,
+ LTTNG_CONSUMER_PUSH_METADATA,
+ LTTNG_CONSUMER_CLOSE_METADATA,
+ LTTNG_CONSUMER_SETUP_METADATA,
+ LTTNG_CONSUMER_FLUSH_CHANNEL,
+ LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
+ LTTNG_CONSUMER_SNAPSHOT_METADATA,
+ LTTNG_CONSUMER_STREAMS_SENT,
};
/* State of each fd in consumer */
enum consumer_channel_type {
CONSUMER_CHANNEL_TYPE_METADATA = 0,
- CONSUMER_CHANNEL_TYPE_DATA = 1,
+ CONSUMER_CHANNEL_TYPE_DATA = 1,
};
+extern struct lttng_consumer_global_data consumer_data;
+
struct stream_list {
struct cds_list_head head;
unsigned int count;
};
+/* Stub. */
+struct consumer_metadata_cache;
+
struct lttng_consumer_channel {
/* HT node used for consumer_data.channel_ht */
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
/* Indexed key. Incremented value in the consumer. */
- int key;
+ uint64_t key;
/* Number of streams referencing this channel */
int refcount;
/* Tracing session id on the session daemon side. */
uint64_t session_id;
+ /*
+ * Session id when requesting metadata to the session daemon for
+ * a session with per-PID buffers.
+ */
+ uint64_t session_id_per_pid;
/* Channel trace file path name. */
char pathname[PATH_MAX];
/* Channel name. */
char name[LTTNG_SYMBOL_NAME_LEN];
- /* UID and GID of the channel. */
+ /* UID and GID of the session owning this channel. */
uid_t uid;
gid_t gid;
- /* Relayd id of the channel. -1 if it does not apply. */
- int relayd_id;
+ /* Relayd id of the channel. -1ULL if it does not apply. */
+ uint64_t relayd_id;
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
enum consumer_channel_type type;
/* For UST */
+ uid_t ust_app_uid; /* Application UID. */
struct ustctl_consumer_channel *uchan;
unsigned char uuid[UUID_STR_LEN];
/*
* LTTNG_CONSUMER_GET_CHANNEL.
*/
struct stream_list streams;
+
+ /*
+ * Set if the channel is metadata. We keep a reference to the stream
+ * because we have to flush data once pushed by the session daemon. For a
+ * regular channel, this is always set to NULL.
+ */
+ struct lttng_consumer_stream *metadata_stream;
+
+ /* for UST */
+ int wait_fd;
+ /* Node within channel thread ht */
+ struct lttng_ht_node_u64 wait_fd_node;
+
+ /* Metadata cache is metadata channel */
+ struct consumer_metadata_cache *metadata_cache;
+ /* For UST metadata periodical flush */
+ int switch_timer_enabled;
+ timer_t switch_timer;
+ int switch_timer_error;
+
+ /* For the live mode */
+ int live_timer_enabled;
+ timer_t live_timer;
+ int live_timer_error;
+
+ /* On-disk circular buffer */
+ uint64_t tracefile_size;
+ uint64_t tracefile_count;
+ /*
+ * Monitor or not the streams of this channel meaning this indicates if the
+ * streams should be sent to the data/metadata thread or added to the no
+ * monitor list of the channel.
+ */
+ unsigned int monitor;
+
+ /*
+ * Channel lock.
+ *
+ * This lock protects against concurrent update of channel.
+ *
+ * This is nested INSIDE the consumer data lock.
+ * This is nested OUTSIDE the channel timer lock.
+ * This is nested OUTSIDE the metadata cache lock.
+ * This is nested OUTSIDE stream lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t lock;
+
+ /*
+ * Channel teardown lock.
+ *
+ * This lock protect against teardown of channel. It is _never_
+ * taken by the timer handler.
+ *
+ * This is nested INSIDE the consumer data lock.
+ * This is nested INSIDE the channel lock.
+ * This is nested OUTSIDE the metadata cache lock.
+ * This is nested OUTSIDE stream lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t timer_lock;
+
+ /* Timer value in usec for live streaming. */
+ unsigned int live_timer_interval;
+
+ int *stream_fds;
+ int nr_stream_fds;
+ char root_shm_path[PATH_MAX];
+ char shm_path[PATH_MAX];
};
/*
*/
struct lttng_consumer_stream {
/* HT node used by the data_ht and metadata_ht */
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
+ /* stream indexed per channel key node */
+ struct lttng_ht_node_u64 node_channel_id;
/* HT node used in consumer_data.stream_list_ht */
- struct lttng_ht_node_ulong node_session_id;
+ struct lttng_ht_node_u64 node_session_id;
/* Pointer to associated channel. */
struct lttng_consumer_channel *chan;
/* Key by which the stream is indexed for 'node'. */
- int key;
+ uint64_t key;
/*
* File descriptor of the data output file. This can be either a file or a
* socket fd for relayd streaming.
int out_fd; /* output file to write the data */
/* Write position in the output file descriptor */
off_t out_fd_offset;
+ /* Amount of bytes written to the output */
+ uint64_t output_written;
enum lttng_consumer_stream_state state;
int shm_fd_is_copy;
int data_read;
int hangup_flush_done;
+
+ /*
+ * metadata_timer_lock protects flags waiting_on_metadata and
+ * missed_metadata_flush.
+ */
+ pthread_mutex_t metadata_timer_lock;
+ /*
+ * Flag set when awaiting metadata to be pushed. Used in the
+ * timer thread to skip waiting on the stream (and stream lock) to
+ * ensure we can proceed to flushing metadata in live mode.
+ */
+ bool waiting_on_metadata;
+ /* Raised when a timer misses a metadata flush. */
+ bool missed_metadata_flush;
+
enum lttng_event_output output;
/* Maximum subbuffer size. */
unsigned long max_sb_size;
uid_t uid;
gid_t gid;
/* Network sequence number. Indicating on which relayd socket it goes. */
- int net_seq_idx;
+ uint64_t net_seq_idx;
+ /*
+ * Indicate if this stream was successfully sent to a relayd. This is set
+ * after the refcount of the relayd is incremented and is checked when the
+ * stream is closed before decrementing the refcount in order to avoid an
+ * unbalanced state.
+ */
+ unsigned int sent_to_relayd;
+
/* Identify if the stream is the metadata */
unsigned int metadata_flag;
/* Used when the stream is set for network streaming */
* Lock to use the stream FDs since they are used between threads.
*
* This is nested INSIDE the consumer_data lock.
+ * This is nested INSIDE the metadata cache lock.
+ * This is nested INSIDE the channel lock.
+ * This is nested INSIDE the channel timer lock.
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
/* Internal state of libustctl. */
struct ustctl_consumer_stream *ustream;
struct cds_list_head send_node;
+ /* On-disk circular buffer */
+ uint64_t tracefile_size_current;
+ uint64_t tracefile_count_current;
+ /*
+ * Monitor or not the streams of this channel meaning this indicates if the
+ * streams should be sent to the data/metadata thread or added to the no
+ * monitor list of the channel.
+ */
+ unsigned int monitor;
+ /*
+ * Indicate if the stream is globally visible meaning that it has been
+ * added to the multiple hash tables. If *not* set, NO lock should be
+ * acquired in the destroy path.
+ */
+ unsigned int globally_visible;
+ /*
+ * Pipe to wake up the metadata poll thread when the UST metadata
+ * cache is updated.
+ */
+ int ust_metadata_poll_pipe[2];
+ /*
+ * How much metadata was read from the metadata cache and sent
+ * to the channel.
+ */
+ uint64_t ust_metadata_pushed;
+ /*
+ * FD of the index file for this stream.
+ */
+ int index_fd;
+
+ /*
+ * Local pipe to extract data when using splice.
+ */
+ int splice_pipe[2];
+
+ /*
+ * Rendez-vous point between data and metadata stream in live mode.
+ */
+ pthread_cond_t metadata_rdv;
+ pthread_mutex_t metadata_rdv_lock;
+
+ /* Indicate if the stream still has some data to be read. */
+ unsigned int has_data:1;
};
/*
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
- int net_seq_idx;
+ uint64_t net_seq_idx;
/* Number of stream associated with this relayd */
unsigned int refcount;
pthread_mutex_t ctrl_sock_mutex;
/* Control socket. Command and metadata are passed over it */
- struct lttcomm_sock control_sock;
+ struct lttcomm_relayd_sock control_sock;
/*
* We don't need a mutex at this point since we only splice or write single
* large chunk of data with a header appended at the begining. Moreover,
* this socket is for now only used in a single thread.
*/
- struct lttcomm_sock data_sock;
- struct lttng_ht_node_ulong node;
+ struct lttcomm_relayd_sock data_sock;
+ struct lttng_ht_node_u64 node;
/* Session id on both sides for the sockets. */
uint64_t relayd_session_id;
* == 0 (success, FD is left to library)
* < 0 (error)
*/
- int (*on_update_stream)(int sessiond_key, uint32_t state);
+ int (*on_update_stream)(uint64_t sessiond_key, uint32_t state);
+ enum lttng_consumer_type type;
/* socket to communicate errors with sessiond */
int consumer_error_socket;
+ /* socket to ask metadata to sessiond. */
+ int consumer_metadata_socket;
+ /*
+ * Protect consumer_metadata_socket.
+ *
+ * This is nested OUTSIDE the metadata cache lock.
+ */
+ pthread_mutex_t metadata_socket_lock;
/* socket to exchange commands with sessiond */
char *consumer_command_sock_path;
/* communication with splice */
- int consumer_thread_pipe[2];
- int consumer_splice_metadata_pipe[2];
+ int consumer_channel_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
- int consumer_data_pipe[2];
+ struct lttng_pipe *consumer_data_pipe;
+
+ /*
+ * Data thread use that pipe to catch wakeup from read subbuffer that
+ * detects that there is still data to be read for the stream encountered.
+ * Before doing so, the stream is flagged to indicate that there is still
+ * data to be read.
+ *
+ * Both pipes (read/write) are owned and used inside the data thread.
+ */
+ struct lttng_pipe *consumer_wakeup_pipe;
+ /* Indicate if the wakeup thread has been notified. */
+ unsigned int has_wakeup:1;
+
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
- int consumer_metadata_pipe[2];
+ struct lttng_pipe *consumer_metadata_pipe;
};
/*
* This HT uses the "node_session_id" of the consumer stream.
*/
struct lttng_ht *stream_list_ht;
+
+ /*
+ * This HT uses the "node_channel_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_per_chan_id_ht;
};
/*
* Init consumer data structures.
*/
-void lttng_consumer_init(void);
+int lttng_consumer_init(void);
/*
* Set the error socket for communication with a session daemon.
*/
int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
- int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+ uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
uint64_t session_id,
int cpu,
int *alloc_ret,
- enum consumer_channel_type type);
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+ enum consumer_channel_type type,
+ unsigned int monitor);
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
const char *pathname,
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
- enum lttng_event_output output);
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval,
+ const char *root_shm_path,
+ const char *shm_path);
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-int consumer_add_channel(struct lttng_consumer_channel *channel);
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx);
void consumer_del_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
-struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- int net_seq_idx);
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key);
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path);
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx);
+void close_relayd_stream(struct lttng_consumer_stream *stream);
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
void consumer_steal_stream_key(int key, struct lttng_ht *ht);
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int sessiond_key, uint32_t state));
+ int (*update_stream)(uint64_t sessiond_key, uint32_t state));
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding);
+ unsigned long padding,
+ struct ctf_packet_index *index);
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding);
+ unsigned long padding,
+ struct ctf_packet_index *index);
int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos);
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_data_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
+void *consumer_thread_channel_poll(void *data);
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
- unsigned int sessiond_id);
+ struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
+ uint64_t sessiond_id, uint64_t relayd_session_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
int consumer_send_status_msg(int sock, int ret_code);
int consumer_send_status_channel(int sock,
struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+ uint64_t key);
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t nb_packets_per_stream,
+ uint64_t max_sb_size);
+int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
+int consumer_create_index_file(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */