X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=92f9e20957f09351834a366a38a1a708498c8dad;hp=830514659772b0afa6bb5de1ad7be678e033dcfb;hb=ffe600149a7608221985751e1bf293234bf2545c;hpb=ccf7af6c78ba7a206baa9d0b9578468a1af734e1 diff --git a/src/common/consumer.h b/src/common/consumer.h index 830514659..92f9e2095 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -23,11 +23,13 @@ #include #include #include +#include #include #include #include +#include #include /* Commands for consumer */ @@ -43,6 +45,10 @@ enum lttng_consumer_command { LTTNG_CONSUMER_DESTROY_RELAYD, /* Return to the sessiond if there is data pending for a session */ LTTNG_CONSUMER_DATA_PENDING, + /* Consumer creates a channel and returns it to sessiond. */ + LTTNG_CONSUMER_ASK_CHANNEL_CREATION, + LTTNG_CONSUMER_GET_CHANNEL, + LTTNG_CONSUMER_DESTROY_CHANNEL, }; /* State of each fd in consumer */ @@ -64,30 +70,60 @@ enum consumer_endpoint_status { CONSUMER_ENDPOINT_INACTIVE, }; +enum consumer_channel_output { + CONSUMER_CHANNEL_MMAP = 0, + CONSUMER_CHANNEL_SPLICE = 1, +}; + +enum consumer_channel_type { + CONSUMER_CHANNEL_TYPE_METADATA = 0, + CONSUMER_CHANNEL_TYPE_DATA = 1, +}; + +struct stream_list { + struct cds_list_head head; + unsigned int count; +}; + struct lttng_consumer_channel { + /* HT node used for consumer_data.channel_ht */ struct lttng_ht_node_ulong node; + /* Indexed key. Incremented value in the consumer. */ int key; - uint64_t max_sb_size; /* the subbuffer size for this channel */ - int refcount; /* Number of streams referencing this channel */ + /* Number of streams referencing this channel */ + int refcount; + /* Tracing session id on the session daemon side. */ + uint64_t session_id; + /* Channel trace file path name. */ + char pathname[PATH_MAX]; + /* Channel name. */ + char name[LTTNG_SYMBOL_NAME_LEN]; + /* UID and GID of the channel. */ + uid_t uid; + gid_t gid; + /* Relayd id of the channel. -1 if it does not apply. */ + int relayd_id; /* - * The number of streams to receive initially. Used to guarantee that we do - * not destroy a channel before receiving all its associated streams. + * Number of streams NOT initialized yet. This is used in order to not + * delete this channel if streams are getting initialized. */ - unsigned int nb_init_streams; + unsigned int nb_init_stream_left; + /* Output type (mmap or splice). */ + enum consumer_channel_output output; + /* Channel type for stream */ + enum consumer_channel_type type; /* For UST */ - int shm_fd; - int wait_fd; - void *mmap_base; - size_t mmap_len; - struct lttng_ust_shm_handle *handle; - int wait_fd_is_copy; - int cpucount; + struct ustctl_consumer_channel *uchan; + unsigned char uuid[UUID_STR_LEN]; + /* + * Temporary stream list used to store the streams once created and waiting + * to be sent to the session daemon by receiving the + * LTTNG_CONSUMER_GET_CHANNEL. + */ + struct stream_list streams; }; -/* Forward declaration for UST. */ -struct lttng_ust_lib_ring_buffer; - /* * Internal representation of the streams, sessiond_key is used to identify * uniquely a stream. @@ -97,29 +133,36 @@ struct lttng_consumer_stream { struct lttng_ht_node_ulong node; /* HT node used in consumer_data.stream_list_ht */ struct lttng_ht_node_ulong node_session_id; - struct lttng_consumer_channel *chan; /* associated channel */ + /* Pointer to associated channel. */ + struct lttng_consumer_channel *chan; + + /* Key by which the stream is indexed for 'node'. */ + int key; /* - * key is the key used by the session daemon to refer to the - * object in the consumer daemon. + * File descriptor of the data output file. This can be either a file or a + * socket fd for relayd streaming. */ - int key; - int shm_fd; - int wait_fd; int out_fd; /* output file to write the data */ - off_t out_fd_offset; /* write position in the output file descriptor */ - char path_name[PATH_MAX]; /* tracefile name */ + /* Write position in the output file descriptor */ + off_t out_fd_offset; enum lttng_consumer_stream_state state; - size_t shm_len; - void *mmap_base; - size_t mmap_len; - enum lttng_event_output output; /* splice or mmap */ int shm_fd_is_copy; - int wait_fd_is_copy; - /* For UST */ - struct lttng_ust_lib_ring_buffer *buf; - int cpu; int data_read; int hangup_flush_done; + enum lttng_event_output output; + /* Maximum subbuffer size. */ + unsigned long max_sb_size; + + /* + * Still used by the kernel for MMAP output. For UST, the ustctl getter is + * used for the mmap base and offset. + */ + void *mmap_base; + unsigned long mmap_len; + + /* For UST */ + + int wait_fd; /* UID/GID of the user owning the session to which stream belongs */ uid_t uid; gid_t gid; @@ -159,6 +202,11 @@ struct lttng_consumer_stream { * consumer data appropriate pipe. */ enum consumer_endpoint_status endpoint_status; + /* Stream name. Format is: _ */ + char name[LTTNG_SYMBOL_NAME_LEN]; + /* Internal state of libustctl. */ + struct ustctl_consumer_stream *ustream; + struct cds_list_head send_node; }; /* @@ -315,18 +363,18 @@ struct lttng_consumer_global_data { /* * Init consumer data structures. */ -extern void lttng_consumer_init(void); +void lttng_consumer_init(void); /* * Set the error socket for communication with a session daemon. */ -extern void lttng_consumer_set_error_sock( - struct lttng_consumer_local_data *ctx, int sock); +void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, + int sock); /* * Set the command socket path for communication with a session daemon. */ -extern void lttng_consumer_set_command_sock_path( +void lttng_consumer_set_command_sock_path( struct lttng_consumer_local_data *ctx, char *sock); /* @@ -335,92 +383,88 @@ extern void lttng_consumer_set_command_sock_path( * Returns the return code of sendmsg : the number of bytes transmitted or -1 * on error. */ -extern int lttng_consumer_send_error( - struct lttng_consumer_local_data *ctx, int cmd); +int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd); /* * Called from signal handler to ensure a clean exit. */ -extern void lttng_consumer_should_exit( - struct lttng_consumer_local_data *ctx); +void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx); /* * Cleanup the daemon's socket on exit. */ -extern void lttng_consumer_cleanup(void); +void lttng_consumer_cleanup(void); /* * Flush pending writes to trace output disk file. */ -extern void lttng_consumer_sync_trace_file( - struct lttng_consumer_stream *stream, off_t orig_offset); +void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, + off_t orig_offset); /* * Poll on the should_quit pipe and the command socket return -1 on error and * should exit, 0 if data is available on the command socket */ -extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); +int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); -extern struct lttng_consumer_stream *consumer_allocate_stream( - int channel_key, int stream_key, - int shm_fd, int wait_fd, +struct lttng_consumer_stream *consumer_allocate_stream(int channel_key, + int stream_key, enum lttng_consumer_stream_state state, - uint64_t mmap_len, - enum lttng_event_output output, - const char *path_name, + const char *channel_name, uid_t uid, gid_t gid, - int net_index, - int metadata_flag, + int relayd_id, uint64_t session_id, - int *alloc_ret); -extern void consumer_del_stream(struct lttng_consumer_stream *stream, + int cpu, + int *alloc_ret, + enum consumer_channel_type type); +struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key, + uint64_t session_id, + const char *pathname, + const char *name, + uid_t uid, + gid_t gid, + int relayd_id, + enum lttng_event_output output); +void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); -extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, +void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); -extern void consumer_del_channel(struct lttng_consumer_channel *channel); -extern struct lttng_consumer_channel *consumer_allocate_channel( - int channel_key, - int shm_fd, int wait_fd, - uint64_t mmap_len, - uint64_t max_sb_size, - unsigned int nb_init_streams); int consumer_add_channel(struct lttng_consumer_channel *channel); +void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( int net_seq_idx); struct consumer_relayd_sock_pair *consumer_find_relayd(int key); +struct lttng_consumer_channel *consumer_find_channel(unsigned long key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); void consumer_steal_stream_key(int key, struct lttng_ht *ht); -extern struct lttng_consumer_local_data *lttng_consumer_create( +struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int sessiond_key, uint32_t state)); -extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); -extern ssize_t lttng_consumer_on_read_subbuffer_mmap( +void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); +ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding); -extern ssize_t lttng_consumer_on_read_subbuffer_splice( +ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, unsigned long padding); -extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream); -extern int lttng_consumer_get_produced_snapshot( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, +int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); +int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); -extern void *consumer_thread_metadata_poll(void *data); -extern void *consumer_thread_data_poll(void *data); -extern void *consumer_thread_sessiond_poll(void *data); -extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, +void *consumer_thread_metadata_poll(void *data); +void *consumer_thread_data_poll(void *data); +void *consumer_thread_sessiond_poll(void *data); +int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, @@ -434,5 +478,7 @@ void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); int consumer_send_status_msg(int sock, int ret_code); +int consumer_send_status_channel(int sock, + struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */