X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.h;h=4770671c003a35f40d67ce4d85f199587323e93f;hp=9e77ff999601f1ed3efddd67e5dcf378eb5c2760;hb=e2d1190b9ea09c54e5d7373643d62e2034bc1531;hpb=d73bf3d793ee0b0c5b56cb47cb50c27d1789d3bd diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 9e77ff999..4770671c0 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -1,20 +1,11 @@ /* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * 2012 - David Goulet + * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 Mathieu Desnoyers + * Copyright (C) 2012 David Goulet + * Copyright (C) 2018 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License, version 2 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef LIB_CONSUMER_H @@ -29,10 +20,15 @@ #include #include -#include +#include #include #include #include +#include +#include +#include + +struct lttng_consumer_local_data; /* Commands for consumer */ enum lttng_consumer_command { @@ -62,16 +58,12 @@ enum lttng_consumer_command { LTTNG_CONSUMER_LOST_PACKETS, LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, - LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE, - LTTNG_CONSUMER_ROTATE_RENAME, - LTTNG_CONSUMER_MKDIR, -}; - -/* State of each fd in consumer */ -enum lttng_consumer_stream_state { - LTTNG_CONSUMER_ACTIVE_STREAM, - LTTNG_CONSUMER_PAUSE_STREAM, - LTTNG_CONSUMER_DELETE_STREAM, + LTTNG_CONSUMER_ROTATE_CHANNEL, + LTTNG_CONSUMER_INIT, + LTTNG_CONSUMER_CREATE_TRACE_CHUNK, + LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, + LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, + LTTNG_CONSUMER_CLEAR_CHANNEL, }; enum lttng_consumer_type { @@ -107,26 +99,45 @@ struct stream_list { struct consumer_metadata_cache; struct lttng_consumer_channel { + /* Is the channel published in the channel hash tables? */ + bool is_published; + /* + * Was the channel deleted (logically) and waiting to be reclaimed? + * If this flag is set, no modification that is not cleaned-up by the + * RCU reclamation callback should be made + */ + bool is_deleted; /* HT node used for consumer_data.channel_ht */ struct lttng_ht_node_u64 node; + /* HT node used for consumer_data.channels_by_session_id_ht */ + struct lttng_ht_node_u64 channels_by_session_id_ht_node; /* Indexed key. Incremented value in the consumer. */ uint64_t key; /* Number of streams referencing this channel */ int refcount; /* Tracing session id on the session daemon side. */ uint64_t session_id; + /* Current trace chunk of the session in which this channel exists. */ + struct lttng_trace_chunk *trace_chunk; /* * Session id when requesting metadata to the session daemon for * a session with per-PID buffers. */ uint64_t session_id_per_pid; - /* Channel trace file path name. */ + /* + * In the case of local streams, this field contains the channel's + * output path; a path relative to the session's output path. + * e.g. ust/uid/1000/64-bit + * + * In the case of remote streams, the contents of this field depends + * on the version of the relay daemon peer. For 2.11+ peers, the + * contents are the same as in the local case. However, for legacy + * peers, this contains a path of the form: + * /hostname/session_path/ust/uid/1000/64-bit + */ char pathname[PATH_MAX]; /* Channel name. */ char name[LTTNG_SYMBOL_NAME_LEN]; - /* UID and GID of the session owning this channel. */ - uid_t uid; - gid_t gid; /* Relayd id of the channel. -1ULL if it does not apply. */ uint64_t relayd_id; /* @@ -142,7 +153,7 @@ struct lttng_consumer_channel { /* For UST */ uid_t ust_app_uid; /* Application UID. */ struct ustctl_consumer_channel *uchan; - unsigned char uuid[UUID_STR_LEN]; + unsigned char uuid[LTTNG_UUID_STR_LEN]; /* * Temporary stream list used to store the streams once created and waiting * to be sent to the session daemon by receiving the @@ -174,6 +185,8 @@ struct lttng_consumer_channel { int live_timer_enabled; timer_t live_timer; int live_timer_error; + /* Channel is part of a live session ? */ + bool is_live; /* For channel monitoring timer. */ int monitor_timer_enabled; @@ -223,28 +236,160 @@ struct lttng_consumer_channel { int nr_stream_fds; char root_shm_path[PATH_MAX]; char shm_path[PATH_MAX]; + /* Only set for UST channels. */ + LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials; /* Total number of discarded events for that channel. */ uint64_t discarded_events; /* Total number of missed packets due to overwriting (overwrite). */ uint64_t lost_packets; bool streams_sent_to_relayd; +}; - /* - * Account how many streams are waiting for their rotation to be - * complete. When this number reaches 0, we inform the session - * daemon that this channel has finished its rotation. - */ - uint64_t nr_stream_rotate_pending; - - /* - * The chunk id where we currently write the data. This value is sent - * to the relay when we add a stream and when a stream rotates. This - * allows to keep track of where each stream on the relay is writing. - */ - uint64_t current_chunk_id; +struct stream_subbuffer { + union { + /* + * CONSUMER_CHANNEL_SPLICE + * No ownership assumed. + */ + int fd; + /* CONSUMER_CHANNEL_MMAP */ + struct lttng_buffer_view buffer; + } buffer; + union { + /* + * Common members are fine to access through either + * union entries (as per C11, Common Initial Sequence). + */ + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t version; + /* + * Left unset when unsupported. + * + * Indicates that this is the last sub-buffer of + * a series of sub-buffer that makes-up a coherent + * (parseable) unit of metadata. + */ + LTTNG_OPTIONAL(bool) coherent; + } metadata; + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) sequence_number; + uint64_t stream_id; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) stream_instance_id; + } data; + } info; }; +/* + * Perform any operation required to acknowledge + * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe). + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); + +/* + * Perform any operation required before a consumer stream is put + * to sleep before awaiting a data availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_sleep_cb)(struct lttng_consumer_stream *, + struct lttng_consumer_local_data *); + +/* + * Acquire the subbuffer at the current 'consumed' position. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Populate the stream_subbuffer's info member. The info to populate + * depends on the type (metadata/data) of the stream. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*extract_subbuffer_info_cb)( + struct lttng_consumer_stream *, struct stream_subbuffer *); + +/* + * Invoked after a subbuffer's info has been filled. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Consume subbuffer contents. + * + * Stream and channel locks are acquired during this call. + */ +typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, + struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Release the current subbuffer and advance the 'consumed' position by + * one subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Invoked after consuming a subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*post_consume_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *, + struct lttng_consumer_local_data *); + +/* + * Send a live beacon if no data is available. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *); + +/* + * Lock the stream and channel locks and any other stream-type specific + * lock that need to be acquired during the processing of an + * availability notification. + */ +typedef void (*lock_cb)(struct lttng_consumer_stream *); + +/* + * Unlock the stream and channel locks and any other stream-type specific + * lock before sleeping until the next availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*unlock_cb)(struct lttng_consumer_stream *); + +/* + * Invoked when a subbuffer's metadata version does not match the last + * known metadata version. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *); + /* * Internal representation of the streams, sessiond_key is used to identify * uniquely a stream. @@ -258,6 +403,12 @@ struct lttng_consumer_stream { struct lttng_ht_node_u64 node_session_id; /* Pointer to associated channel. */ struct lttng_consumer_channel *chan; + /* + * Current trace chunk. Holds a reference to the trace chunk. + * `chunk` can be NULL when a stream is not associated to a chunk, e.g. + * when it was created in the context of a no-output session. + */ + struct lttng_trace_chunk *trace_chunk; /* Key by which the stream is indexed for 'node'. */ uint64_t key; @@ -270,7 +421,6 @@ struct lttng_consumer_stream { off_t out_fd_offset; /* Amount of bytes written to the output */ uint64_t output_written; - enum lttng_consumer_stream_state state; int shm_fd_is_copy; int data_read; int hangup_flush_done; @@ -294,6 +444,11 @@ struct lttng_consumer_stream { */ bool quiescent; + /* + * True if the sequence number is not available (lttng-modules < 2.8). + */ + bool sequence_number_unavailable; + /* * metadata_timer_lock protects flags waiting_on_metadata and * missed_metadata_flush. @@ -309,7 +464,7 @@ struct lttng_consumer_stream { bool missed_metadata_flush; enum lttng_event_output output; - /* Maximum subbuffer size. */ + /* Maximum subbuffer size (in bytes). */ unsigned long max_sb_size; /* @@ -322,9 +477,6 @@ struct lttng_consumer_stream { /* For UST */ int wait_fd; - /* UID/GID of the user owning the session to which stream belongs */ - uid_t uid; - gid_t gid; /* Network sequence number. Indicating on which relayd socket it goes. */ uint64_t net_seq_idx; /* @@ -430,6 +582,14 @@ struct lttng_consumer_stream { pthread_cond_t metadata_rdv; pthread_mutex_t metadata_rdv_lock; + /* + * rotate_position represents the packet sequence number of the last + * packet which belongs to the current trace chunk prior to the rotation. + * When that position is reached, this tracefile can be closed and a + * new one is created in channel_read_only_attributes.path. + */ + uint64_t rotate_position; + /* * Read-only copies of channel values. We cannot safely access the * channel from a stream, so we need to have a local copy of these @@ -437,10 +597,15 @@ struct lttng_consumer_stream { * the stream objects when we introduce refcounting. */ struct { - char path[LTTNG_PATH_MAX]; uint64_t tracefile_size; } channel_read_only_attributes; + /* + * Flag to inform the data or metadata thread that a stream is + * ready to be rotated. + */ + bool rotate_ready; + /* Indicate if the stream still has some data to be read. */ unsigned int has_data:1; /* @@ -448,6 +613,25 @@ struct lttng_consumer_stream { * file before writing in it (regeneration). */ unsigned int reset_metadata_flag:1; + struct { + /* + * Invoked in the order of declaration. + * See callback type definitions. + */ + lock_cb lock; + on_wake_up_cb on_wake_up; + get_next_subbuffer_cb get_next_subbuffer; + extract_subbuffer_info_cb extract_subbuffer_info; + pre_consume_subbuffer_cb pre_consume_subbuffer; + reset_metadata_cb reset_metadata; + consume_subbuffer_cb consume_subbuffer; + put_next_subbuffer_cb put_next_subbuffer; + post_consume_cb post_consume; + send_live_beacon_cb send_live_beacon; + on_sleep_cb on_sleep; + unlock_cb unlock; + } read_subbuffer_ops; + struct metadata_bucket *metadata_bucket; }; /* @@ -457,7 +641,7 @@ struct consumer_relayd_sock_pair { /* Network sequence number. */ uint64_t net_seq_idx; /* Number of stream associated with this relayd */ - unsigned int refcount; + int refcount; /* * This flag indicates whether or not we should destroy this object. The @@ -491,6 +675,7 @@ struct consumer_relayd_sock_pair { /* Session id on both sides for the sockets. */ uint64_t relayd_session_id; uint64_t sessiond_session_id; + struct lttng_consumer_local_data *ctx; }; /* @@ -503,7 +688,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -580,11 +766,7 @@ struct lttng_consumer_local_data { * to the session daemon (write-only). */ int channel_monitor_pipe; - /* - * Pipe used to inform the session daemon that a stream has finished - * its rotation (write-only). - */ - int channel_rotate_pipe; + LTTNG_OPTIONAL(lttng_uuid) sessiond_uuid; }; /* @@ -609,6 +791,8 @@ struct lttng_consumer_global_data { /* Channel hash table protected by consumer_data.lock. */ struct lttng_ht *channel_ht; + /* Channel hash table indexed by session id. */ + struct lttng_ht *channels_by_session_id_ht; /* * Flag specifying if the local array of FDs needs update in the * poll function. Protected by consumer_data.lock. @@ -635,6 +819,11 @@ struct lttng_consumer_global_data { * This HT uses the "node_channel_id" of the consumer stream. */ struct lttng_ht *stream_per_chan_id_ht; + + /* + * Trace chunk registry indexed by (session_id, chunk_id). + */ + struct lttng_trace_chunk_registry *chunk_registry; }; /* @@ -652,6 +841,24 @@ extern int consumer_quit; /* Flag used to temporarily pause data consumption from testpoints. */ extern int data_consumption_paused; +/* Return a human-readable consumer type string that is suitable for logging. */ +static inline +const char *lttng_consumer_type_str(enum lttng_consumer_type type) +{ + switch (type) { + case LTTNG_CONSUMER_UNKNOWN: + return "unknown"; + case LTTNG_CONSUMER_KERNEL: + return "kernel"; + case LTTNG_CONSUMER32_UST: + return "32-bit user space"; + case LTTNG_CONSUMER64_UST: + return "64-bit user space"; + default: + abort(); + } +} + /* * Init consumer data structures. */ @@ -701,24 +908,23 @@ void consumer_stream_update_channel_attributes( struct lttng_consumer_stream *stream, struct lttng_consumer_channel *channel); -struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, +struct lttng_consumer_stream *consumer_allocate_stream( + struct lttng_consumer_channel *channel, + uint64_t channel_key, uint64_t stream_key, - enum lttng_consumer_stream_state state, const char *channel_name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, int cpu, int *alloc_ret, enum consumer_channel_type type, unsigned int monitor); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, + const uint64_t *chunk_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, enum lttng_event_output output, uint64_t tracefile_size, @@ -726,6 +932,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id_per_pid, unsigned int monitor, unsigned int live_timer_interval, + bool is_in_live_session, const char *root_shm_path, const char *shm_path); void consumer_del_stream(struct lttng_consumer_stream *stream, @@ -749,21 +956,20 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht); struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx), + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index); + struct lttng_consumer_stream *stream, + const struct lttng_buffer_view *buffer, + unsigned long padding); ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index); + unsigned long padding); int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, @@ -780,7 +986,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, @@ -803,12 +1010,34 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); +int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, + uint64_t key, uint64_t relayd_id, uint32_t metadata, + struct lttng_consumer_local_data *ctx); +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream); -int lttng_consumer_rotate_rename(const char *current_path, const char *new_path, - uid_t uid, gid_t gid, uint64_t relayd_id); +int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, + uint64_t key, struct lttng_consumer_local_data *ctx); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); -int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid, - uint64_t relayd_id); +enum lttcomm_return_code lttng_consumer_create_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle); +enum lttcomm_return_code lttng_consumer_close_trace_chunk( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id, time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command, + char *path); +enum lttcomm_return_code lttng_consumer_trace_chunk_exists( + const uint64_t *relayd_id, uint64_t session_id, + uint64_t chunk_id); +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); +enum lttcomm_return_code lttng_consumer_init_command( + struct lttng_consumer_local_data *ctx, + const lttng_uuid sessiond_uuid); +int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */