* contiguousness and order.
*/
uint64_t contig_metadata_written;
+
+ /* for UST */
+ int wait_fd;
+ /* Node within channel thread ht */
+ struct lttng_ht_node_u64 wait_fd_node;
};
/*
struct lttng_consumer_stream {
/* HT node used by the data_ht and metadata_ht */
struct lttng_ht_node_u64 node;
+ /* stream indexed per channel key node */
+ struct lttng_ht_node_u64 node_channel_id;
/* HT node used in consumer_data.stream_list_ht */
struct lttng_ht_node_u64 node_session_id;
/* Pointer to associated channel. */
char *consumer_command_sock_path;
/* communication with splice */
int consumer_thread_pipe[2];
+ int consumer_channel_pipe[2];
int consumer_splice_metadata_pipe[2];
/* Data stream poll thread pipe. To transfer data stream to the thread */
int consumer_data_pipe[2];
* This HT uses the "node_session_id" of the consumer stream.
*/
struct lttng_ht *stream_list_ht;
+
+ /*
+ * This HT uses the "node_channel_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_per_chan_id_ht;
};
/*
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-int consumer_add_channel(struct lttng_consumer_channel *channel);
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+ struct lttng_consumer_local_data *ctx);
void consumer_del_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_data_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
+void *consumer_thread_channel_poll(void *data);
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);