LTTNG_CONSUMER_ASK_CHANNEL_CREATION,
LTTNG_CONSUMER_GET_CHANNEL,
LTTNG_CONSUMER_DESTROY_CHANNEL,
+ LTTNG_CONSUMER_PUSH_METADATA,
+ LTTNG_CONSUMER_CLOSE_METADATA,
+ LTTNG_CONSUMER_SETUP_METADATA,
+ LTTNG_CONSUMER_FLUSH_CHANNEL,
};
/* State of each fd in consumer */
enum consumer_channel_type {
CONSUMER_CHANNEL_TYPE_METADATA = 0,
- CONSUMER_CHANNEL_TYPE_DATA = 1,
+ CONSUMER_CHANNEL_TYPE_DATA = 1,
};
struct stream_list {
unsigned int count;
};
+/* Stub. */
+struct consumer_metadata_cache;
+
struct lttng_consumer_channel {
/* HT node used for consumer_data.channel_ht */
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
/* Indexed key. Incremented value in the consumer. */
- int key;
+ uint64_t key;
/* Number of streams referencing this channel */
int refcount;
/* Tracing session id on the session daemon side. */
uid_t uid;
gid_t gid;
/* Relayd id of the channel. -1 if it does not apply. */
- int relayd_id;
+ int64_t relayd_id;
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
* LTTNG_CONSUMER_GET_CHANNEL.
*/
struct stream_list streams;
+ /*
+ * Set if the channel is metadata. We keep a reference to the stream
+ * because we have to flush data once pushed by the session daemon. For a
+ * regular channel, this is always set to NULL.
+ */
+ struct lttng_consumer_stream *metadata_stream;
+
+ /* for UST */
+ int wait_fd;
+ /* Node within channel thread ht */
+ struct lttng_ht_node_u64 wait_fd_node;
+
+ /* Metadata cache is metadata channel */
+ struct consumer_metadata_cache *metadata_cache;
+ /* For metadata periodical flush */
+ int switch_timer_enabled;
+ timer_t switch_timer;
};
/*
*/
struct lttng_consumer_stream {
/* HT node used by the data_ht and metadata_ht */
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
+ /* stream indexed per channel key node */
+ struct lttng_ht_node_u64 node_channel_id;
/* HT node used in consumer_data.stream_list_ht */
- struct lttng_ht_node_ulong node_session_id;
+ struct lttng_ht_node_u64 node_session_id;
/* Pointer to associated channel. */
struct lttng_consumer_channel *chan;
/* Key by which the stream is indexed for 'node'. */
- int key;
+ uint64_t key;
/*
* File descriptor of the data output file. This can be either a file or a
* socket fd for relayd streaming.
uid_t uid;
gid_t gid;
/* Network sequence number. Indicating on which relayd socket it goes. */
- int net_seq_idx;
+ uint64_t net_seq_idx;
/* Identify if the stream is the metadata */
unsigned int metadata_flag;
/* Used when the stream is set for network streaming */
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
- int net_seq_idx;
+ int64_t net_seq_idx;
/* Number of stream associated with this relayd */
unsigned int refcount;
* this socket is for now only used in a single thread.
*/
struct lttcomm_sock data_sock;
- struct lttng_ht_node_ulong node;
+ struct lttng_ht_node_u64 node;
/* Session id on both sides for the sockets. */
uint64_t relayd_session_id;
* < 0 (error)
*/
int (*on_update_stream)(int sessiond_key, uint32_t state);
+ enum lttng_consumer_type type;
/* socket to communicate errors with sessiond */
int consumer_error_socket;
+ /* socket to ask metadata to sessiond */
+ int consumer_metadata_socket;
/* socket to exchange commands with sessiond */
char *consumer_command_sock_path;
/* communication with splice */
int consumer_thread_pipe[2];
+ int consumer_channel_pipe[2];
int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
int consumer_data_pipe[2];
* This HT uses the "node_session_id" of the consumer stream.
*/
struct lttng_ht *stream_list_ht;
+
+ /*
+ * This HT uses the "node_channel_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_per_chan_id_ht;
};
/*
*/
int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
- int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+ uint64_t stream_key,
enum lttng_consumer_stream_state state,
const char *channel_name,
uid_t uid,
int cpu,
int *alloc_ret,
enum consumer_channel_type type);
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
const char *pathname,
const char *name,
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-int consumer_add_channel(struct lttng_consumer_channel *channel);
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx);
void consumer_del_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
int net_seq_idx);
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key);
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
void consumer_steal_stream_key(int key, struct lttng_ht *ht);
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_data_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
+void *consumer_thread_channel_poll(void *data);
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);