X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=include%2Flttng%2Flttng-consumer.h;fp=include%2Flttng%2Flttng-consumer.h;h=7ca94cc1da5818a5c25c00d37298a97cc7ec8a2e;hp=0000000000000000000000000000000000000000;hb=3bd1e0819b577ffcb44acd7c2f8e02ff09654b7b;hpb=f84efadf55274918ca038a4e06e0a8af1a320654 diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h new file mode 100644 index 000000000..7ca94cc1d --- /dev/null +++ b/include/lttng/lttng-consumer.h @@ -0,0 +1,297 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Copyright (C) 2011 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTTNG_CONSUMER_H +#define _LTTNG_CONSUMER_H + +#include +#include +#include +#include + +/* + * When the receiving thread dies, we need to have a way to make the polling + * thread exit eventually. If all FDs hang up (normal case when the + * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for + * whatever reason some FDs remain open, the consumer should still exit + * eventually. + * + * If the timeout is reached, it means that during this period no events + * occurred on the FDs so we need to force an exit. This case should not happen + * but it is a safety to ensure we won't block the consumer indefinitely. + * + * The value of 2 seconds is an arbitrary choice. + */ +#define LTTNG_CONSUMER_POLL_TIMEOUT 2000 + +/* Commands for consumer */ +enum lttng_consumer_command { + LTTNG_CONSUMER_ADD_CHANNEL, + LTTNG_CONSUMER_ADD_STREAM, + /* pause, delete, active depending on fd state */ + LTTNG_CONSUMER_UPDATE_STREAM, + /* inform the consumer to quit when all fd has hang up */ + LTTNG_CONSUMER_STOP, +}; + +/* State of each fd in consumer */ +enum lttng_consumer_stream_state { + LTTNG_CONSUMER_ACTIVE_STREAM, + LTTNG_CONSUMER_PAUSE_STREAM, + LTTNG_CONSUMER_DELETE_STREAM, +}; + +struct lttng_consumer_channel_list { + struct cds_list_head head; +}; + +struct lttng_consumer_stream_list { + struct cds_list_head head; +}; + +enum lttng_consumer_type { + LTTNG_CONSUMER_UNKNOWN = 0, + LTTNG_CONSUMER_KERNEL, + LTTNG_CONSUMER_UST, +}; + +struct lttng_consumer_channel { + struct cds_list_head list; + int key; + uint64_t max_sb_size; /* the subbuffer size for this channel */ + int refcount; /* Number of streams referencing this channel */ + /* For UST */ + int shm_fd; + int wait_fd; + void *mmap_base; + size_t mmap_len; + struct shm_handle *handle; + int nr_streams; +}; + +/* Forward declaration for UST. */ +struct lib_ring_buffer; + +/* + * Internal representation of the streams, sessiond_key is used to identify + * uniquely a stream. + */ +struct lttng_consumer_stream { + struct cds_list_head list; + struct lttng_consumer_channel *chan; /* associated channel */ + /* + * key is the key used by the session daemon to refer to the + * object in the consumer daemon. + */ + int key; + int shm_fd; + int wait_fd; + int out_fd; /* output file to write the data */ + off_t out_fd_offset; /* write position in the output file descriptor */ + char path_name[PATH_MAX]; /* tracefile name */ + enum lttng_consumer_stream_state state; + size_t shm_len; + void *mmap_base; + size_t mmap_len; + enum lttng_event_output output; /* splice or mmap */ + /* For UST */ + struct lib_ring_buffer *buf; + int cpu; +}; + +/* + * UST consumer local data to the program. One or more instance per + * process. + */ +struct lttng_consumer_local_data { + /* function to call when data is available on a buffer */ + int (*on_buffer_ready)(struct lttng_consumer_stream *stream); + /* + * function to call when we receive a new channel, it receives a + * newly allocated channel, depending on the return code of this + * function, the new channel will be handled by the application + * or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_recv_channel)(struct lttng_consumer_channel *channel); + /* + * function to call when we receive a new stream, it receives a + * newly allocated stream, depending on the return code of this + * function, the new stream will be handled by the application + * or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_recv_stream)(struct lttng_consumer_stream *stream); + /* + * function to call when a stream is getting updated by the session + * daemon, this function receives the sessiond key and the new + * state, depending on the return code of this function the + * update of state for the stream is handled by the application + * or the library. + * + * Returns: + * > 0 (success, FD is kept by application) + * == 0 (success, FD is left to library) + * < 0 (error) + */ + int (*on_update_stream)(int sessiond_key, uint32_t state); + /* socket to communicate errors with sessiond */ + int consumer_error_socket; + /* socket to exchange commands with sessiond */ + char *consumer_command_sock_path; + /* communication with splice */ + int consumer_thread_pipe[2]; + /* pipe to wake the poll thread when necessary */ + int consumer_poll_pipe[2]; + /* to let the signal handler wake up the fd receiver thread */ + int consumer_should_quit[2]; +}; + +/* + * Library-level data. One instance per process. + */ +struct lttng_consumer_global_data { + /* + * consumer_data.lock protects consumer_data.fd_list, + * consumer_data.stream_count, and consumer_data.need_update. It + * ensures the count matches the number of items in the fd_list. + * It ensures the list updates *always* trigger an fd_array + * update (therefore need to make list update vs + * consumer_data.need_update flag update atomic, and also flag + * read, fd array and flag clear atomic). + */ + pthread_mutex_t lock; + /* + * Number of streams in the list below. Protected by + * consumer_data.lock. + */ + int stream_count; + /* + * Lists of streams and channels. Protected by consumer_data.lock. + */ + struct lttng_consumer_stream_list stream_list; + struct lttng_consumer_channel_list channel_list; + /* + * Flag specifying if the local array of FDs needs update in the + * poll function. Protected by consumer_data.lock. + */ + unsigned int need_update; + enum lttng_consumer_type type; +}; + +/* + * Set the error socket for communication with a session daemon. + */ +extern void lttng_consumer_set_error_sock( + struct lttng_consumer_local_data *ctx, int sock); + +/* + * Set the command socket path for communication with a session daemon. + */ +extern void lttng_consumer_set_command_sock_path( + struct lttng_consumer_local_data *ctx, char *sock); + +/* + * Send return code to session daemon. + * + * Returns the return code of sendmsg : the number of bytes transmitted or -1 + * on error. + */ +extern int lttng_consumer_send_error( + struct lttng_consumer_local_data *ctx, int cmd); + +/* + * Called from signal handler to ensure a clean exit. + */ +extern void lttng_consumer_should_exit( + struct lttng_consumer_local_data *ctx); + +/* + * Cleanup the daemon's socket on exit. + */ +extern void lttng_consumer_cleanup(void); + +/* + * Flush pending writes to trace output disk file. + */ +extern void lttng_consumer_sync_trace_file( + struct lttng_consumer_stream *stream, off_t orig_offset); + +/* + * Poll on the should_quit pipe and the command socket return -1 on error and + * should exit, 0 if data is available on the command socket + */ +extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); + +extern int consumer_update_poll_array( + struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, + struct lttng_consumer_stream **local_consumer_streams); + +extern struct lttng_consumer_stream *consumer_allocate_stream( + int channel_key, int stream_key, + int shm_fd, int wait_fd, + enum lttng_consumer_stream_state state, + uint64_t mmap_len, + enum lttng_event_output output, + const char *path_name); +extern int consumer_add_stream(struct lttng_consumer_stream *stream); +extern void consumer_del_stream(struct lttng_consumer_stream *stream); +extern void consumer_change_stream_state(int stream_key, + enum lttng_consumer_stream_state state); +extern void consumer_del_channel(struct lttng_consumer_channel *channel); +extern struct lttng_consumer_channel *consumer_allocate_channel( + int channel_key, + int shm_fd, int wait_fd, + uint64_t mmap_len, + uint64_t max_sb_size); +int consumer_add_channel(struct lttng_consumer_channel *channel); + +extern struct lttng_consumer_local_data *lttng_consumer_create( + enum lttng_consumer_type type, + int (*buffer_ready)(struct lttng_consumer_stream *stream), + int (*recv_channel)(struct lttng_consumer_channel *channel), + int (*recv_stream)(struct lttng_consumer_stream *stream), + int (*update_stream)(int sessiond_key, uint32_t state)); +extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); +extern int lttng_consumer_on_read_subbuffer_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); +extern int lttng_consumer_on_read_subbuffer_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, unsigned long len); +extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream); +extern int lttng_consumer_get_produced_snapshot( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long *pos); +extern void *lttng_consumer_thread_poll_fds(void *data); +extern void *lttng_consumer_thread_receive_fds(void *data); +extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll); + +#endif /* _LTTNG_CONSUMER_H */