#include <common/compat/uuid.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/pipe.h>
+#include <common/index/ctf-index.h>
/* Commands for consumer */
enum lttng_consumer_command {
/* pause, delete, active depending on fd state */
LTTNG_CONSUMER_UPDATE_STREAM,
/* inform the consumer to quit when all fd has hang up */
- LTTNG_CONSUMER_STOP,
+ LTTNG_CONSUMER_STOP, /* deprecated */
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
LTTNG_CONSUMER_FLUSH_CHANNEL,
LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
LTTNG_CONSUMER_SNAPSHOT_METADATA,
+ LTTNG_CONSUMER_STREAMS_SENT,
};
/* State of each fd in consumer */
char pathname[PATH_MAX];
/* Channel name. */
char name[LTTNG_SYMBOL_NAME_LEN];
- /* UID and GID of the channel. */
+ /* UID and GID of the session owning this channel. */
uid_t uid;
gid_t gid;
- /* Relayd id of the channel. -1 if it does not apply. */
- int64_t relayd_id;
+ /* Relayd id of the channel. -1ULL if it does not apply. */
+ uint64_t relayd_id;
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
enum consumer_channel_type type;
/* For UST */
+ uid_t ust_app_uid; /* Application UID. */
struct ustctl_consumer_channel *uchan;
unsigned char uuid[UUID_STR_LEN];
/*
*/
struct stream_list streams;
- /*
- * List of streams in no monitor mode for this channel. Used ONLY for
- * snapshots recording.
- */
- struct stream_list stream_no_monitor_list;
-
/*
* Set if the channel is metadata. We keep a reference to the stream
* because we have to flush data once pushed by the session daemon. For a
/* Metadata cache is metadata channel */
struct consumer_metadata_cache *metadata_cache;
- /* For metadata periodical flush */
+ /* For UST metadata periodical flush */
int switch_timer_enabled;
timer_t switch_timer;
int switch_timer_error;
+ /* For the live mode */
+ int live_timer_enabled;
+ timer_t live_timer;
+ int live_timer_error;
+
/* On-disk circular buffer */
uint64_t tracefile_size;
uint64_t tracefile_count;
* monitor list of the channel.
*/
unsigned int monitor;
+
+ /*
+ * Channel lock.
+ *
+ * This lock protects against concurrent update of channel.
+ *
+ * This is nested INSIDE the consumer data lock.
+ * This is nested OUTSIDE the channel timer lock.
+ * This is nested OUTSIDE the metadata cache lock.
+ * This is nested OUTSIDE stream lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t lock;
+
+ /*
+ * Channel teardown lock.
+ *
+ * This lock protect against teardown of channel. It is _never_
+ * taken by the timer handler.
+ *
+ * This is nested INSIDE the consumer data lock.
+ * This is nested INSIDE the channel lock.
+ * This is nested OUTSIDE the metadata cache lock.
+ * This is nested OUTSIDE stream lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t timer_lock;
+
+ /* Timer value in usec for live streaming. */
+ unsigned int live_timer_interval;
};
/*
int out_fd; /* output file to write the data */
/* Write position in the output file descriptor */
off_t out_fd_offset;
+ /* Amount of bytes written to the output */
+ uint64_t output_written;
enum lttng_consumer_stream_state state;
int shm_fd_is_copy;
int data_read;
gid_t gid;
/* Network sequence number. Indicating on which relayd socket it goes. */
uint64_t net_seq_idx;
+ /*
+ * Indicate if this stream was successfully sent to a relayd. This is set
+ * after the refcount of the relayd is incremented and is checked when the
+ * stream is closed before decrementing the refcount in order to avoid an
+ * unbalanced state.
+ */
+ unsigned int sent_to_relayd;
+
/* Identify if the stream is the metadata */
unsigned int metadata_flag;
/* Used when the stream is set for network streaming */
*
* This is nested INSIDE the consumer_data lock.
* This is nested INSIDE the metadata cache lock.
+ * This is nested INSIDE the channel lock.
+ * This is nested INSIDE the channel timer lock.
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
/* On-disk circular buffer */
uint64_t tracefile_size_current;
uint64_t tracefile_count_current;
+ /*
+ * Monitor or not the streams of this channel meaning this indicates if the
+ * streams should be sent to the data/metadata thread or added to the no
+ * monitor list of the channel.
+ */
+ unsigned int monitor;
+ /*
+ * Indicate if the stream is globally visible meaning that it has been
+ * added to the multiple hash tables. If *not* set, NO lock should be
+ * acquired in the destroy path.
+ */
+ unsigned int globally_visible;
+ /*
+ * Pipe to wake up the metadata poll thread when the UST metadata
+ * cache is updated.
+ */
+ int ust_metadata_poll_pipe[2];
+ /*
+ * How much metadata was read from the metadata cache and sent
+ * to the channel.
+ */
+ uint64_t ust_metadata_pushed;
+ /*
+ * FD of the index file for this stream.
+ */
+ int index_fd;
+
+ /*
+ * Local pipe to extract data when using splice.
+ */
+ int splice_pipe[2];
+
+ /*
+ * Rendez-vous point between data and metadata stream in live mode.
+ */
+ pthread_cond_t metadata_rdv;
+ pthread_mutex_t metadata_rdv_lock;
- /* Node for the no monitor stream list in a channel. */
- struct cds_list_head no_monitor_node;
+ /* Indicate if the stream still has some data to be read. */
+ unsigned int has_data:1;
};
/*
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
- int64_t net_seq_idx;
+ uint64_t net_seq_idx;
/* Number of stream associated with this relayd */
unsigned int refcount;
* == 0 (success, FD is left to library)
* < 0 (error)
*/
- int (*on_update_stream)(int sessiond_key, uint32_t state);
+ int (*on_update_stream)(uint64_t sessiond_key, uint32_t state);
enum lttng_consumer_type type;
/* socket to communicate errors with sessiond */
int consumer_error_socket;
- /* socket to ask metadata to sessiond */
+ /* socket to ask metadata to sessiond. */
int consumer_metadata_socket;
+ /*
+ * Protect consumer_metadata_socket.
+ *
+ * This is nested OUTSIDE the metadata cache lock.
+ */
+ pthread_mutex_t metadata_socket_lock;
/* socket to exchange commands with sessiond */
char *consumer_command_sock_path;
/* communication with splice */
- int consumer_thread_pipe[2];
int consumer_channel_pipe[2];
- int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
struct lttng_pipe *consumer_data_pipe;
+
+ /*
+ * Data thread use that pipe to catch wakeup from read subbuffer that
+ * detects that there is still data to be read for the stream encountered.
+ * Before doing so, the stream is flagged to indicate that there is still
+ * data to be read.
+ *
+ * Both pipes (read/write) are owned and used inside the data thread.
+ */
+ struct lttng_pipe *consumer_wakeup_pipe;
+ /* Indicate if the wakeup thread has been notified. */
+ unsigned int has_wakeup:1;
+
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
/*
* Init consumer data structures.
*/
-void lttng_consumer_init(void);
+int lttng_consumer_init(void);
/*
* Set the error socket for communication with a session daemon.
uint64_t session_id,
int cpu,
int *alloc_ret,
- enum consumer_channel_type type);
+ enum consumer_channel_type type,
+ unsigned int monitor);
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
const char *pathname,
uint64_t tracefile_size,
uint64_t tracefile_count,
uint64_t session_id_per_pid,
- unsigned int monitor);
+ unsigned int monitor,
+ unsigned int live_timer_interval);
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
/* lttng-relayd consumer command */
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- int net_seq_idx);
+ uint64_t net_seq_idx);
struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path);
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx);
+void close_relayd_stream(struct lttng_consumer_stream *stream);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int sessiond_key, uint32_t state));
+ int (*update_stream)(uint64_t sessiond_key, uint32_t state));
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding);
+ unsigned long padding,
+ struct ctf_packet_index *index);
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding);
+ unsigned long padding,
+ struct ctf_packet_index *index);
int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos);
+int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_data_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
- unsigned int sessiond_id);
+ uint64_t sessiond_id, uint64_t relayd_session_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
uint64_t key);
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size);
+int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
+int consumer_create_index_file(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */