Move stream file rotation functions to utils
[lttng-tools.git] / src / common / consumer.h
index 92f9e20957f09351834a366a38a1a708498c8dad..19a590e801e9739ff2aca138c5bf9cfe62417e30 100644 (file)
@@ -49,6 +49,10 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_ASK_CHANNEL_CREATION,
        LTTNG_CONSUMER_GET_CHANNEL,
        LTTNG_CONSUMER_DESTROY_CHANNEL,
+       LTTNG_CONSUMER_PUSH_METADATA,
+       LTTNG_CONSUMER_CLOSE_METADATA,
+       LTTNG_CONSUMER_SETUP_METADATA,
+       LTTNG_CONSUMER_FLUSH_CHANNEL,
 };
 
 /* State of each fd in consumer */
@@ -77,7 +81,7 @@ enum consumer_channel_output {
 
 enum consumer_channel_type {
        CONSUMER_CHANNEL_TYPE_METADATA  = 0,
-       CONSUMER_CHANNEL_TYPE_DATA              = 1,
+       CONSUMER_CHANNEL_TYPE_DATA      = 1,
 };
 
 struct stream_list {
@@ -85,11 +89,14 @@ struct stream_list {
        unsigned int count;
 };
 
+/* Stub. */
+struct consumer_metadata_cache;
+
 struct lttng_consumer_channel {
        /* HT node used for consumer_data.channel_ht */
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
        /* Indexed key. Incremented value in the consumer. */
-       int key;
+       uint64_t key;
        /* Number of streams referencing this channel */
        int refcount;
        /* Tracing session id on the session daemon side. */
@@ -102,7 +109,7 @@ struct lttng_consumer_channel {
        uid_t uid;
        gid_t gid;
        /* Relayd id of the channel. -1 if it does not apply. */
-       int relayd_id;
+       int64_t relayd_id;
        /*
         * Number of streams NOT initialized yet. This is used in order to not
         * delete this channel if streams are getting initialized.
@@ -122,6 +129,26 @@ struct lttng_consumer_channel {
         * LTTNG_CONSUMER_GET_CHANNEL.
         */
        struct stream_list streams;
+       /*
+        * Set if the channel is metadata. We keep a reference to the stream
+        * because we have to flush data once pushed by the session daemon. For a
+        * regular channel, this is always set to NULL.
+        */
+       struct lttng_consumer_stream *metadata_stream;
+
+       /* for UST */
+       int wait_fd;
+       /* Node within channel thread ht */
+       struct lttng_ht_node_u64 wait_fd_node;
+
+       /* Metadata cache is metadata channel */
+       struct consumer_metadata_cache *metadata_cache;
+       /* For metadata periodical flush */
+       int switch_timer_enabled;
+       timer_t switch_timer;
+       /* On-disk circular buffer */
+       uint64_t tracefile_size;
+       uint64_t tracefile_count;
 };
 
 /*
@@ -130,14 +157,16 @@ struct lttng_consumer_channel {
  */
 struct lttng_consumer_stream {
        /* HT node used by the data_ht and metadata_ht */
-       struct lttng_ht_node_ulong node;
+       struct lttng_ht_node_u64 node;
+       /* stream indexed per channel key node */
+       struct lttng_ht_node_u64 node_channel_id;
        /* HT node used in consumer_data.stream_list_ht */
-       struct lttng_ht_node_ulong node_session_id;
+       struct lttng_ht_node_u64 node_session_id;
        /* Pointer to associated channel. */
        struct lttng_consumer_channel *chan;
 
        /* Key by which the stream is indexed for 'node'. */
-       int key;
+       uint64_t key;
        /*
         * File descriptor of the data output file. This can be either a file or a
         * socket fd for relayd streaming.
@@ -167,7 +196,7 @@ struct lttng_consumer_stream {
        uid_t uid;
        gid_t gid;
        /* Network sequence number. Indicating on which relayd socket it goes. */
-       int net_seq_idx;
+       uint64_t net_seq_idx;
        /* Identify if the stream is the metadata */
        unsigned int metadata_flag;
        /* Used when the stream is set for network streaming */
@@ -207,6 +236,9 @@ struct lttng_consumer_stream {
        /* Internal state of libustctl. */
        struct ustctl_consumer_stream *ustream;
        struct cds_list_head send_node;
+       /* On-disk circular buffer */
+       uint64_t tracefile_size_current;
+       uint64_t tracefile_count_current;
 };
 
 /*
@@ -214,7 +246,7 @@ struct lttng_consumer_stream {
  */
 struct consumer_relayd_sock_pair {
        /* Network sequence number. */
-       int net_seq_idx;
+       int64_t net_seq_idx;
        /* Number of stream associated with this relayd */
        unsigned int refcount;
 
@@ -237,15 +269,15 @@ struct consumer_relayd_sock_pair {
        pthread_mutex_t ctrl_sock_mutex;
 
        /* Control socket. Command and metadata are passed over it */
-       struct lttcomm_sock control_sock;
+       struct lttcomm_relayd_sock control_sock;
 
        /*
         * We don't need a mutex at this point since we only splice or write single
         * large chunk of data with a header appended at the begining. Moreover,
         * this socket is for now only used in a single thread.
         */
-       struct lttcomm_sock data_sock;
-       struct lttng_ht_node_ulong node;
+       struct lttcomm_relayd_sock data_sock;
+       struct lttng_ht_node_u64 node;
 
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
@@ -300,12 +332,16 @@ struct lttng_consumer_local_data {
         *    < 0 (error)
         */
        int (*on_update_stream)(int sessiond_key, uint32_t state);
+       enum lttng_consumer_type type;
        /* socket to communicate errors with sessiond */
        int consumer_error_socket;
+       /* socket to ask metadata to sessiond */
+       int consumer_metadata_socket;
        /* socket to exchange commands with sessiond */
        char *consumer_command_sock_path;
        /* communication with splice */
        int consumer_thread_pipe[2];
+       int consumer_channel_pipe[2];
        int consumer_splice_metadata_pipe[2];
        /* Data stream poll thread pipe. To transfer data stream to the thread */
        int consumer_data_pipe[2];
@@ -358,6 +394,11 @@ struct lttng_consumer_global_data {
         * This HT uses the "node_session_id" of the consumer stream.
         */
        struct lttng_ht *stream_list_ht;
+
+       /*
+        * This HT uses the "node_channel_id" of the consumer stream.
+        */
+       struct lttng_ht *stream_per_chan_id_ht;
 };
 
 /*
@@ -407,8 +448,8 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
  */
 int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
-               int stream_key,
+struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+               uint64_t stream_key,
                enum lttng_consumer_stream_state state,
                const char *channel_name,
                uid_t uid,
@@ -418,26 +459,29 @@ struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type);
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
                const char *name,
                uid_t uid,
                gid_t gid,
                int relayd_id,
-               enum lttng_event_output output);
+               enum lttng_event_output output,
+               uint64_t tracefile_size,
+               uint64_t tracefile_count);
 void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
-int consumer_add_channel(struct lttng_consumer_channel *channel);
+int consumer_add_channel(struct lttng_consumer_channel *channel,
+               struct lttng_consumer_local_data *ctx);
 void consumer_del_channel(struct lttng_consumer_channel *channel);
 
 /* lttng-relayd consumer command */
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
                int net_seq_idx);
-struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
-struct lttng_consumer_channel *consumer_find_channel(unsigned long key);
+struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
+struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
 void consumer_steal_stream_key(int key, struct lttng_ht *ht);
@@ -464,6 +508,7 @@ int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 void *consumer_thread_metadata_poll(void *data);
 void *consumer_thread_data_poll(void *data);
 void *consumer_thread_sessiond_poll(void *data);
+void *consumer_thread_channel_poll(void *data);
 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
@@ -472,7 +517,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
-               struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+               struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
                unsigned int sessiond_id);
 void consumer_flag_relayd_for_destroy(
                struct consumer_relayd_sock_pair *relayd);
This page took 0.026258 seconds and 4 git commands to generate.