#include <common/compat/fcntl.h>
#include <common/compat/uuid.h>
#include <common/sessiond-comm/sessiond-comm.h>
+#include <common/pipe.h>
/* Commands for consumer */
enum lttng_consumer_command {
LTTNG_CONSUMER_PUSH_METADATA,
LTTNG_CONSUMER_CLOSE_METADATA,
LTTNG_CONSUMER_SETUP_METADATA,
+ LTTNG_CONSUMER_FLUSH_CHANNEL,
+ LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
+ LTTNG_CONSUMER_SNAPSHOT_METADATA,
};
/* State of each fd in consumer */
CONSUMER_CHANNEL_TYPE_DATA = 1,
};
+extern struct lttng_consumer_global_data consumer_data;
+
struct stream_list {
struct cds_list_head head;
unsigned int count;
};
+/* Stub. */
+struct consumer_metadata_cache;
+
struct lttng_consumer_channel {
/* HT node used for consumer_data.channel_ht */
struct lttng_ht_node_u64 node;
int refcount;
/* Tracing session id on the session daemon side. */
uint64_t session_id;
+ /*
+ * Session id when requesting metadata to the session daemon for
+ * a session with per-PID buffers.
+ */
+ uint64_t session_id_per_pid;
/* Channel trace file path name. */
char pathname[PATH_MAX];
/* Channel name. */
/* UID and GID of the channel. */
uid_t uid;
gid_t gid;
- /* Relayd id of the channel. -1 if it does not apply. */
- int64_t relayd_id;
+ /* Relayd id of the channel. -1ULL if it does not apply. */
+ uint64_t relayd_id;
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
* LTTNG_CONSUMER_GET_CHANNEL.
*/
struct stream_list streams;
+
/*
* Set if the channel is metadata. We keep a reference to the stream
* because we have to flush data once pushed by the session daemon. For a
* regular channel, this is always set to NULL.
*/
struct lttng_consumer_stream *metadata_stream;
- /*
- * Metadata written so far. Helps keeping track of
- * contiguousness and order.
- */
- uint64_t contig_metadata_written;
/* for UST */
int wait_fd;
/* Node within channel thread ht */
struct lttng_ht_node_u64 wait_fd_node;
+
+ /* Metadata cache is metadata channel */
+ struct consumer_metadata_cache *metadata_cache;
+ /* For metadata periodical flush */
+ int switch_timer_enabled;
+ timer_t switch_timer;
+ int switch_timer_error;
+
+ /* On-disk circular buffer */
+ uint64_t tracefile_size;
+ uint64_t tracefile_count;
+ /*
+ * Monitor or not the streams of this channel meaning this indicates if the
+ * streams should be sent to the data/metadata thread or added to the no
+ * monitor list of the channel.
+ */
+ unsigned int monitor;
+
+ /*
+ * Channel lock.
+ *
+ * This is nested INSIDE the consumer data lock.
+ * This is nested OUTSIDE the metadata cache lock.
+ * This is nested OUTSIDE stream lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t lock;
};
/*
gid_t gid;
/* Network sequence number. Indicating on which relayd socket it goes. */
uint64_t net_seq_idx;
+ /*
+ * Indicate if this stream was successfully sent to a relayd. This is set
+ * after the refcount of the relayd is incremented and is checked when the
+ * stream is closed before decrementing the refcount in order to avoid an
+ * unbalanced state.
+ */
+ unsigned int sent_to_relayd;
+
/* Identify if the stream is the metadata */
unsigned int metadata_flag;
/* Used when the stream is set for network streaming */
* Lock to use the stream FDs since they are used between threads.
*
* This is nested INSIDE the consumer_data lock.
+ * This is nested INSIDE the metadata cache lock.
+ * This is nested INSIDE the channel lock.
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
/* Internal state of libustctl. */
struct ustctl_consumer_stream *ustream;
struct cds_list_head send_node;
+ /* On-disk circular buffer */
+ uint64_t tracefile_size_current;
+ uint64_t tracefile_count_current;
+ /*
+ * Monitor or not the streams of this channel meaning this indicates if the
+ * streams should be sent to the data/metadata thread or added to the no
+ * monitor list of the channel.
+ */
+ unsigned int monitor;
+ /*
+ * Indicate if the stream is globally visible meaning that it has been
+ * added to the multiple hash tables. If *not* set, NO lock should be
+ * acquired in the destroy path.
+ */
+ unsigned int globally_visible;
+ /*
+ * Pipe to wake up the metadata poll thread when the UST metadata
+ * cache is updated.
+ */
+ int ust_metadata_poll_pipe[2];
+ /*
+ * How much metadata was read from the metadata cache and sent
+ * to the channel.
+ */
+ uint64_t ust_metadata_pushed;
};
/*
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
- int64_t net_seq_idx;
+ uint64_t net_seq_idx;
/* Number of stream associated with this relayd */
unsigned int refcount;
pthread_mutex_t ctrl_sock_mutex;
/* Control socket. Command and metadata are passed over it */
- struct lttcomm_sock control_sock;
+ struct lttcomm_relayd_sock control_sock;
/*
* We don't need a mutex at this point since we only splice or write single
* large chunk of data with a header appended at the begining. Moreover,
* this socket is for now only used in a single thread.
*/
- struct lttcomm_sock data_sock;
+ struct lttcomm_relayd_sock data_sock;
struct lttng_ht_node_u64 node;
/* Session id on both sides for the sockets. */
* == 0 (success, FD is left to library)
* < 0 (error)
*/
- int (*on_update_stream)(int sessiond_key, uint32_t state);
+ int (*on_update_stream)(uint64_t sessiond_key, uint32_t state);
+ enum lttng_consumer_type type;
/* socket to communicate errors with sessiond */
int consumer_error_socket;
+ /* socket to ask metadata to sessiond */
+ int consumer_metadata_socket;
/* socket to exchange commands with sessiond */
char *consumer_command_sock_path;
/* communication with splice */
int consumer_channel_pipe[2];
int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
- int consumer_data_pipe[2];
+ struct lttng_pipe *consumer_data_pipe;
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
- int consumer_metadata_pipe[2];
+ struct lttng_pipe *consumer_metadata_pipe;
};
/*
const char *channel_name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
uint64_t session_id,
int cpu,
int *alloc_ret,
- enum consumer_channel_type type);
+ enum consumer_channel_type type,
+ unsigned int monitor);
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
const char *pathname,
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
- enum lttng_event_output output);
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor);
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
/* lttng-relayd consumer command */
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- int net_seq_idx);
+ uint64_t net_seq_idx);
struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path);
+void close_relayd_stream(struct lttng_consumer_stream *stream);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int sessiond_key, uint32_t state));
+ int (*update_stream)(uint64_t sessiond_key, uint32_t state));
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
- unsigned int sessiond_id);
+ struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
+ uint64_t sessiond_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
int consumer_send_status_msg(int sock, int ret_code);
int consumer_send_status_channel(int sock,
struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+ uint64_t key);
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size);
#endif /* LIB_CONSUMER_H */