LTTNG_CONSUMER_PUSH_METADATA,
LTTNG_CONSUMER_CLOSE_METADATA,
LTTNG_CONSUMER_SETUP_METADATA,
+ LTTNG_CONSUMER_FLUSH_CHANNEL,
};
/* State of each fd in consumer */
unsigned int count;
};
+/* Stub. */
+struct consumer_metadata_cache;
+
struct lttng_consumer_channel {
/* HT node used for consumer_data.channel_ht */
struct lttng_ht_node_u64 node;
* regular channel, this is always set to NULL.
*/
struct lttng_consumer_stream *metadata_stream;
- /*
- * Metadata written so far. Helps keeping track of
- * contiguousness and order.
- */
- uint64_t contig_metadata_written;
+
+ /* for UST */
+ int wait_fd;
+ /* Node within channel thread ht */
+ struct lttng_ht_node_u64 wait_fd_node;
+
+ /* Metadata cache is metadata channel */
+ struct consumer_metadata_cache *metadata_cache;
+ /* For metadata periodical flush */
+ int switch_timer_enabled;
+ timer_t switch_timer;
+ int switch_timer_error;
+
+ /* On-disk circular buffer */
+ uint64_t tracefile_size;
+ uint64_t tracefile_count;
};
/*
struct lttng_consumer_stream {
/* HT node used by the data_ht and metadata_ht */
struct lttng_ht_node_u64 node;
+ /* stream indexed per channel key node */
+ struct lttng_ht_node_u64 node_channel_id;
/* HT node used in consumer_data.stream_list_ht */
struct lttng_ht_node_u64 node_session_id;
/* Pointer to associated channel. */
/* Internal state of libustctl. */
struct ustctl_consumer_stream *ustream;
struct cds_list_head send_node;
+ /* On-disk circular buffer */
+ uint64_t tracefile_size_current;
+ uint64_t tracefile_count_current;
};
/*
pthread_mutex_t ctrl_sock_mutex;
/* Control socket. Command and metadata are passed over it */
- struct lttcomm_sock control_sock;
+ struct lttcomm_relayd_sock control_sock;
/*
* We don't need a mutex at this point since we only splice or write single
* large chunk of data with a header appended at the begining. Moreover,
* this socket is for now only used in a single thread.
*/
- struct lttcomm_sock data_sock;
+ struct lttcomm_relayd_sock data_sock;
struct lttng_ht_node_u64 node;
/* Session id on both sides for the sockets. */
* < 0 (error)
*/
int (*on_update_stream)(int sessiond_key, uint32_t state);
+ enum lttng_consumer_type type;
/* socket to communicate errors with sessiond */
int consumer_error_socket;
+ /* socket to ask metadata to sessiond */
+ int consumer_metadata_socket;
/* socket to exchange commands with sessiond */
char *consumer_command_sock_path;
/* communication with splice */
int consumer_thread_pipe[2];
+ int consumer_channel_pipe[2];
int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
int consumer_data_pipe[2];
* This HT uses the "node_session_id" of the consumer stream.
*/
struct lttng_ht *stream_list_ht;
+
+ /*
+ * This HT uses the "node_channel_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_per_chan_id_ht;
};
/*
uid_t uid,
gid_t gid,
int relayd_id,
- enum lttng_event_output output);
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count);
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-int consumer_add_channel(struct lttng_consumer_channel *channel);
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx);
void consumer_del_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_data_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
+void *consumer_thread_channel_poll(void *data);
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
unsigned int sessiond_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_send_status_msg(int sock, int ret_code);
int consumer_send_status_channel(int sock,
struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+ uint64_t key);
#endif /* LIB_CONSUMER_H */