Consumer daemon data available command support
[lttng-tools.git] / src / common / consumer.h
index 4b225e43c4ced7a0a38527402690a825e27ed9cc..df002f81a606c62f75db59bf23d752ec29743e62 100644 (file)
@@ -56,6 +56,8 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
        /* Inform the consumer to kill a specific relayd connection */
        LTTNG_CONSUMER_DESTROY_RELAYD,
+       /* Return to the sessiond if there is data pending for a session */
+       LTTNG_CONSUMER_DATA_AVAILABLE,
 };
 
 /* State of each fd in consumer */
@@ -101,8 +103,10 @@ struct lttng_ust_lib_ring_buffer;
  * uniquely a stream.
  */
 struct lttng_consumer_stream {
+       /* HT node used by the data_ht and metadata_ht */
        struct lttng_ht_node_ulong node;
-       struct lttng_ht_node_ulong waitfd_node;
+       /* HT node used in consumer_data.stream_list_ht */
+       struct lttng_ht_node_ulong node_session_id;
        struct lttng_consumer_channel *chan;    /* associated channel */
        /*
         * key is the key used by the session daemon to refer to the
@@ -137,6 +141,10 @@ struct lttng_consumer_stream {
        uint64_t relayd_stream_id;
        /* Next sequence number to use for trace packet */
        uint64_t next_net_seq_num;
+       /* Lock to use the stream FDs since they are used between threads. */
+       pthread_mutex_t lock;
+       /* Tracing session id */
+       uint64_t session_id;
 };
 
 /*
@@ -230,8 +238,8 @@ struct lttng_consumer_local_data {
        /* communication with splice */
        int consumer_thread_pipe[2];
        int consumer_splice_metadata_pipe[2];
-       /* pipe to wake the poll thread when necessary */
-       int consumer_poll_pipe[2];
+       /* Data stream poll thread pipe. To transfer data stream to the thread */
+       int consumer_data_pipe[2];
        /* to let the signal handler wake up the fd receiver thread */
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
@@ -253,13 +261,12 @@ struct lttng_consumer_global_data {
        pthread_mutex_t lock;
 
        /*
-        * Number of streams in the hash table. Protected by consumer_data.lock.
+        * Number of streams in the data stream hash table declared outside.
+        * Protected by consumer_data.lock.
         */
        int stream_count;
-       /*
-        * Hash tables of streams and channels. Protected by consumer_data.lock.
-        */
-       struct lttng_ht *stream_ht;
+
+       /* Channel hash table protected by consumer_data.lock. */
        struct lttng_ht *channel_ht;
        /*
         * Flag specifying if the local array of FDs needs update in the
@@ -273,8 +280,21 @@ struct lttng_consumer_global_data {
         * stream has an index which associate the right relayd socket to use.
         */
        struct lttng_ht *relayd_ht;
+
+       /*
+        * This hash table contains all streams (metadata and data) indexed by
+        * session id. In other words, the ht is indexed by session id and each
+        * bucket contains the list of associated streams.
+        *
+        * This HT uses the "node_session_id" of the consumer stream.
+        */
+       struct lttng_ht *stream_list_ht;
 };
 
+/* Defined in consumer.c and coupled with explanations */
+extern struct lttng_ht *metadata_ht;
+extern struct lttng_ht *data_ht;
+
 /*
  * Init consumer data structures.
  */
@@ -324,10 +344,6 @@ extern void lttng_consumer_sync_trace_file(
  */
 extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-extern int consumer_update_poll_array(
-               struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
-               struct lttng_consumer_stream **local_consumer_streams);
-
 extern struct lttng_consumer_stream *consumer_allocate_stream(
                int channel_key, int stream_key,
                int shm_fd, int wait_fd,
@@ -339,14 +355,12 @@ extern struct lttng_consumer_stream *consumer_allocate_stream(
                gid_t gid,
                int net_index,
                int metadata_flag,
+               uint64_t session_id,
                int *alloc_ret);
-extern int consumer_add_stream(struct lttng_consumer_stream *stream);
 extern void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
-extern void consumer_change_stream_state(int stream_key,
-               enum lttng_consumer_stream_state state);
 extern void consumer_del_channel(struct lttng_consumer_channel *channel);
 extern struct lttng_consumer_channel *consumer_allocate_channel(
                int channel_key,
@@ -362,6 +376,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
 struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
+void consumer_steal_stream_key(int key, struct lttng_ht *ht);
 
 extern struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
@@ -399,5 +414,6 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
 void consumer_flag_relayd_for_destroy(
                struct consumer_relayd_sock_pair *relayd);
+int consumer_data_available(uint64_t id);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.024301 seconds and 4 git commands to generate.