X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=include%2Flttng%2Flttng-consumer.h;h=f5ad3e6f2ad416d5e3d6064444d1064ff1caa519;hp=7ca94cc1da5818a5c25c00d37298a97cc7ec8a2e;hb=6065ceec9574bf18eb79ae707f627322f2713d18;hpb=3bd1e0819b577ffcb44acd7c2f8e02ff09654b7b diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h index 7ca94cc1d..f5ad3e6f2 100644 --- a/include/lttng/lttng-consumer.h +++ b/include/lttng/lttng-consumer.h @@ -22,14 +22,16 @@ #include #include -#include +#include + #include +#include /* * When the receiving thread dies, we need to have a way to make the polling * thread exit eventually. If all FDs hang up (normal case when the - * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for - * whatever reason some FDs remain open, the consumer should still exit + * lttng-sessiond stops), we can exit cleanly, but if there is a problem and + * for whatever reason some FDs remain open, the consumer should still exit * eventually. * * If the timeout is reached, it means that during this period no events @@ -57,22 +59,15 @@ enum lttng_consumer_stream_state { LTTNG_CONSUMER_DELETE_STREAM, }; -struct lttng_consumer_channel_list { - struct cds_list_head head; -}; - -struct lttng_consumer_stream_list { - struct cds_list_head head; -}; - enum lttng_consumer_type { LTTNG_CONSUMER_UNKNOWN = 0, LTTNG_CONSUMER_KERNEL, - LTTNG_CONSUMER_UST, + LTTNG_CONSUMER64_UST, + LTTNG_CONSUMER32_UST, }; struct lttng_consumer_channel { - struct cds_list_head list; + struct lttng_ht_node_ulong node; int key; uint64_t max_sb_size; /* the subbuffer size for this channel */ int refcount; /* Number of streams referencing this channel */ @@ -81,19 +76,22 @@ struct lttng_consumer_channel { int wait_fd; void *mmap_base; size_t mmap_len; - struct shm_handle *handle; + struct lttng_ust_shm_handle *handle; int nr_streams; + int shm_fd_is_copy; + int wait_fd_is_copy; + int cpucount; }; /* Forward declaration for UST. */ -struct lib_ring_buffer; +struct lttng_ust_lib_ring_buffer; /* * Internal representation of the streams, sessiond_key is used to identify * uniquely a stream. */ struct lttng_consumer_stream { - struct cds_list_head list; + struct lttng_ht_node_ulong node; struct lttng_consumer_channel *chan; /* associated channel */ /* * key is the key used by the session daemon to refer to the @@ -110,9 +108,15 @@ struct lttng_consumer_stream { void *mmap_base; size_t mmap_len; enum lttng_event_output output; /* splice or mmap */ + int shm_fd_is_copy; + int wait_fd_is_copy; /* For UST */ - struct lib_ring_buffer *buf; + struct lttng_ust_lib_ring_buffer *buf; int cpu; + int hangup_flush_done; + /* UID/GID of the user owning the session to which stream belongs */ + uid_t uid; + gid_t gid; }; /* @@ -121,7 +125,8 @@ struct lttng_consumer_stream { */ struct lttng_consumer_local_data { /* function to call when data is available on a buffer */ - int (*on_buffer_ready)(struct lttng_consumer_stream *stream); + int (*on_buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -175,26 +180,26 @@ struct lttng_consumer_local_data { * Library-level data. One instance per process. */ struct lttng_consumer_global_data { + /* - * consumer_data.lock protects consumer_data.fd_list, - * consumer_data.stream_count, and consumer_data.need_update. It - * ensures the count matches the number of items in the fd_list. - * It ensures the list updates *always* trigger an fd_array - * update (therefore need to make list update vs - * consumer_data.need_update flag update atomic, and also flag - * read, fd array and flag clear atomic). + * At this time, this lock is used to ensure coherence between the count + * and number of element in the hash table. It's also a protection for + * concurrent read/write between threads. + * + * XXX: We need to see if this lock is still needed with the lockless RCU + * hash tables. */ pthread_mutex_t lock; + /* - * Number of streams in the list below. Protected by - * consumer_data.lock. + * Number of streams in the hash table. Protected by consumer_data.lock. */ int stream_count; /* - * Lists of streams and channels. Protected by consumer_data.lock. + * Hash tables of streams and channels. Protected by consumer_data.lock. */ - struct lttng_consumer_stream_list stream_list; - struct lttng_consumer_channel_list channel_list; + struct lttng_ht *stream_ht; + struct lttng_ht *channel_ht; /* * Flag specifying if the local array of FDs needs update in the * poll function. Protected by consumer_data.lock. @@ -203,6 +208,11 @@ struct lttng_consumer_global_data { enum lttng_consumer_type type; }; +/* + * Init consumer data structures. + */ +extern void lttng_consumer_init(void); + /* * Set the error socket for communication with a session daemon. */ @@ -257,7 +267,9 @@ extern struct lttng_consumer_stream *consumer_allocate_stream( enum lttng_consumer_stream_state state, uint64_t mmap_len, enum lttng_event_output output, - const char *path_name); + const char *path_name, + uid_t uid, + gid_t gid); extern int consumer_add_stream(struct lttng_consumer_stream *stream); extern void consumer_del_stream(struct lttng_consumer_stream *stream); extern void consumer_change_stream_state(int stream_key, @@ -272,7 +284,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel); extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, - int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int sessiond_key, uint32_t state)); @@ -294,4 +307,8 @@ extern void *lttng_consumer_thread_receive_fds(void *data); extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); +int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx); +int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); + #endif /* _LTTNG_CONSUMER_H */