consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / common / consumer / consumer.h
index 3af7db9136f3b011eef851397a2747ac86878ce2..000040982d5dfb4bba6abaa2bb2a6a9e1f3d5b75 100644 (file)
@@ -1,20 +1,11 @@
 /*
- * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *               2012 - David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License, version 2 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #ifndef LIB_CONSUMER_H
 
 #include <common/hashtable/hashtable.h>
 #include <common/compat/fcntl.h>
-#include <common/compat/uuid.h>
+#include <common/uuid.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/pipe.h>
 #include <common/index/ctf-index.h>
+#include <common/trace-chunk-registry.h>
+#include <common/credentials.h>
+#include <common/buffer-view.h>
 
 /* Commands for consumer */
 enum lttng_consumer_command {
@@ -60,13 +54,14 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_STREAMS_SENT,
        LTTNG_CONSUMER_DISCARDED_EVENTS,
        LTTNG_CONSUMER_LOST_PACKETS,
-};
-
-/* State of each fd in consumer */
-enum lttng_consumer_stream_state {
-       LTTNG_CONSUMER_ACTIVE_STREAM,
-       LTTNG_CONSUMER_PAUSE_STREAM,
-       LTTNG_CONSUMER_DELETE_STREAM,
+       LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
+       LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
+       LTTNG_CONSUMER_ROTATE_CHANNEL,
+       LTTNG_CONSUMER_INIT,
+       LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
+       LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
+       LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
+       LTTNG_CONSUMER_CLEAR_CHANNEL,
 };
 
 enum lttng_consumer_type {
@@ -102,26 +97,45 @@ struct stream_list {
 struct consumer_metadata_cache;
 
 struct lttng_consumer_channel {
+       /* Is the channel published in the channel hash tables? */
+       bool is_published;
+       /*
+        * Was the channel deleted (logically) and waiting to be reclaimed?
+        * If this flag is set, no modification that is not cleaned-up by the
+        * RCU reclamation callback should be made
+        */
+       bool is_deleted;
        /* HT node used for consumer_data.channel_ht */
        struct lttng_ht_node_u64 node;
+       /* HT node used for consumer_data.channels_by_session_id_ht */
+       struct lttng_ht_node_u64 channels_by_session_id_ht_node;
        /* Indexed key. Incremented value in the consumer. */
        uint64_t key;
        /* Number of streams referencing this channel */
        int refcount;
        /* Tracing session id on the session daemon side. */
        uint64_t session_id;
+       /* Current trace chunk of the session in which this channel exists. */
+       struct lttng_trace_chunk *trace_chunk;
        /*
         * Session id when requesting metadata to the session daemon for
         * a session with per-PID buffers.
         */
        uint64_t session_id_per_pid;
-       /* Channel trace file path name. */
+       /*
+        * In the case of local streams, this field contains the channel's
+        * output path; a path relative to the session's output path.
+        *   e.g. ust/uid/1000/64-bit
+        *
+        * In the case of remote streams, the contents of this field depends
+        * on the version of the relay daemon peer. For 2.11+ peers, the
+        * contents are the same as in the local case. However, for legacy
+        * peers, this contains a path of the form:
+        *   /hostname/session_path/ust/uid/1000/64-bit
+        */
        char pathname[PATH_MAX];
        /* Channel name. */
        char name[LTTNG_SYMBOL_NAME_LEN];
-       /* UID and GID of the session owning this channel. */
-       uid_t uid;
-       gid_t gid;
        /* Relayd id of the channel. -1ULL if it does not apply. */
        uint64_t relayd_id;
        /*
@@ -137,7 +151,7 @@ struct lttng_consumer_channel {
        /* For UST */
        uid_t ust_app_uid;      /* Application UID. */
        struct ustctl_consumer_channel *uchan;
-       unsigned char uuid[UUID_STR_LEN];
+       unsigned char uuid[LTTNG_UUID_STR_LEN];
        /*
         * Temporary stream list used to store the streams once created and waiting
         * to be sent to the session daemon by receiving the
@@ -159,6 +173,7 @@ struct lttng_consumer_channel {
 
        /* Metadata cache is metadata channel */
        struct consumer_metadata_cache *metadata_cache;
+
        /* For UST metadata periodical flush */
        int switch_timer_enabled;
        timer_t switch_timer;
@@ -168,6 +183,12 @@ struct lttng_consumer_channel {
        int live_timer_enabled;
        timer_t live_timer;
        int live_timer_error;
+       /* Channel is part of a live session ? */
+       bool is_live;
+
+       /* For channel monitoring timer. */
+       int monitor_timer_enabled;
+       timer_t monitor_timer;
 
        /* On-disk circular buffer */
        uint64_t tracefile_size;
@@ -213,10 +234,14 @@ struct lttng_consumer_channel {
        int nr_stream_fds;
        char root_shm_path[PATH_MAX];
        char shm_path[PATH_MAX];
+       /* Only set for UST channels. */
+       LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials;
        /* Total number of discarded events for that channel. */
        uint64_t discarded_events;
        /* Total number of missed packets due to overwriting (overwrite). */
        uint64_t lost_packets;
+
+       bool streams_sent_to_relayd;
 };
 
 /*
@@ -232,6 +257,12 @@ struct lttng_consumer_stream {
        struct lttng_ht_node_u64 node_session_id;
        /* Pointer to associated channel. */
        struct lttng_consumer_channel *chan;
+       /*
+        * Current trace chunk. Holds a reference to the trace chunk.
+        * `chunk` can be NULL when a stream is not associated to a chunk, e.g.
+        * when it was created in the context of a no-output session.
+        */
+       struct lttng_trace_chunk *trace_chunk;
 
        /* Key by which the stream is indexed for 'node'. */
        uint64_t key;
@@ -244,11 +275,34 @@ struct lttng_consumer_stream {
        off_t out_fd_offset;
        /* Amount of bytes written to the output */
        uint64_t output_written;
-       enum lttng_consumer_stream_state state;
        int shm_fd_is_copy;
        int data_read;
        int hangup_flush_done;
 
+       /*
+        * Whether the stream is in a "complete" state (e.g. it does not have a
+        * partially written sub-buffer.
+        *
+        * Initialized to "false" on stream creation (first packet is empty).
+        *
+        * The various transitions of the quiescent state are:
+        *     - On "start" tracing: set to false, since the stream is not
+        *       "complete".
+        *     - On "stop" tracing: if !quiescent -> flush FINAL (update
+        *       timestamp_end), and set to true; the stream has entered a
+        *       complete/quiescent state.
+        *     - On "destroy" or stream/application hang-up: if !quiescent ->
+        *       flush FINAL, and set to true.
+        *
+        * NOTE: Update and read are protected by the stream lock.
+        */
+       bool quiescent;
+
+       /*
+        * True if the sequence number is not available (lttng-modules < 2.8).
+        */
+       bool sequence_number_unavailable;
+
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
@@ -264,7 +318,7 @@ struct lttng_consumer_stream {
        bool missed_metadata_flush;
 
        enum lttng_event_output output;
-       /* Maximum subbuffer size. */
+       /* Maximum subbuffer size (in bytes). */
        unsigned long max_sb_size;
 
        /*
@@ -277,9 +331,6 @@ struct lttng_consumer_stream {
        /* For UST */
 
        int wait_fd;
-       /* UID/GID of the user owning the session to which stream belongs */
-       uid_t uid;
-       gid_t gid;
        /* Network sequence number. Indicating on which relayd socket it goes. */
        uint64_t net_seq_idx;
        /*
@@ -292,6 +343,11 @@ struct lttng_consumer_stream {
 
        /* Identify if the stream is the metadata */
        unsigned int metadata_flag;
+       /*
+        * Last known metadata version, reset the metadata file in case
+        * of change.
+        */
+       uint64_t metadata_version;
        /* Used when the stream is set for network streaming */
        uint64_t relayd_stream_id;
        /*
@@ -312,9 +368,9 @@ struct lttng_consumer_stream {
         * Lock to use the stream FDs since they are used between threads.
         *
         * This is nested INSIDE the consumer_data lock.
-        * This is nested INSIDE the metadata cache lock.
         * This is nested INSIDE the channel lock.
         * This is nested INSIDE the channel timer lock.
+        * This is nested OUTSIDE the metadata cache lock.
         * This is nested OUTSIDE consumer_relayd_sock_pair lock.
         */
        pthread_mutex_t lock;
@@ -365,9 +421,9 @@ struct lttng_consumer_stream {
        /* Copy of the sequence number of the last packet extracted. */
        uint64_t last_sequence_number;
        /*
-        * FD of the index file for this stream.
+        * Index file object of the index file for this stream.
         */
-       int index_fd;
+       struct lttng_index_file *index_file;
 
        /*
         * Local pipe to extract data when using splice.
@@ -380,8 +436,37 @@ struct lttng_consumer_stream {
        pthread_cond_t metadata_rdv;
        pthread_mutex_t metadata_rdv_lock;
 
+       /*
+        * rotate_position represents the packet sequence number of the last
+        * packet which belongs to the current trace chunk prior to the rotation.
+        * When that position is reached, this tracefile can be closed and a
+        * new one is created in channel_read_only_attributes.path.
+        */
+       uint64_t rotate_position;
+
+       /*
+        * Read-only copies of channel values. We cannot safely access the
+        * channel from a stream, so we need to have a local copy of these
+        * fields in the stream object. These fields should be removed from
+        * the stream objects when we introduce refcounting.
+        */
+       struct {
+               uint64_t tracefile_size;
+       } channel_read_only_attributes;
+
+       /*
+        * Flag to inform the data or metadata thread that a stream is
+        * ready to be rotated.
+        */
+       bool rotate_ready;
+
        /* Indicate if the stream still has some data to be read. */
        unsigned int has_data:1;
+       /*
+        * Inform the consumer or relay to reset the metadata
+        * file before writing in it (regeneration).
+        */
+       unsigned int reset_metadata_flag:1;
 };
 
 /*
@@ -391,7 +476,7 @@ struct consumer_relayd_sock_pair {
        /* Network sequence number. */
        uint64_t net_seq_idx;
        /* Number of stream associated with this relayd */
-       unsigned int refcount;
+       int refcount;
 
        /*
         * This flag indicates whether or not we should destroy this object. The
@@ -425,6 +510,7 @@ struct consumer_relayd_sock_pair {
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
        uint64_t sessiond_session_id;
+       struct lttng_consumer_local_data *ctx;
 };
 
 /*
@@ -509,6 +595,12 @@ struct lttng_consumer_local_data {
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
        struct lttng_pipe *consumer_metadata_pipe;
+       /*
+        * Pipe used by the channel monitoring timers to provide state samples
+        * to the session daemon (write-only).
+        */
+       int channel_monitor_pipe;
+       LTTNG_OPTIONAL(lttng_uuid) sessiond_uuid;
 };
 
 /*
@@ -533,6 +625,8 @@ struct lttng_consumer_global_data {
 
        /* Channel hash table protected by consumer_data.lock. */
        struct lttng_ht *channel_ht;
+       /* Channel hash table indexed by session id. */
+       struct lttng_ht *channels_by_session_id_ht;
        /*
         * Flag specifying if the local array of FDs needs update in the
         * poll function. Protected by consumer_data.lock.
@@ -559,8 +653,46 @@ struct lttng_consumer_global_data {
         * This HT uses the "node_channel_id" of the consumer stream.
         */
        struct lttng_ht *stream_per_chan_id_ht;
+
+       /*
+        * Trace chunk registry indexed by (session_id, chunk_id).
+        */
+       struct lttng_trace_chunk_registry *chunk_registry;
 };
 
+/*
+ * Set to nonzero when the consumer is exiting. Updated by signal
+ * handler and thread exit, read by threads.
+ */
+extern int consumer_quit;
+
+/*
+ * Set to nonzero when the consumer is exiting. Updated by signal
+ * handler and thread exit, read by threads.
+ */
+extern int consumer_quit;
+
+/* Flag used to temporarily pause data consumption from testpoints. */
+extern int data_consumption_paused;
+
+/* Return a human-readable consumer type string that is suitable for logging. */
+static inline
+const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+{
+       switch (type) {
+       case LTTNG_CONSUMER_UNKNOWN:
+               return "unknown";
+       case LTTNG_CONSUMER_KERNEL:
+               return "kernel";
+       case LTTNG_CONSUMER32_UST:
+               return "32-bit user space";
+       case LTTNG_CONSUMER64_UST:
+               return "64-bit user space";
+       default:
+               abort();
+       }
+}
+
 /*
  * Init consumer data structures.
  */
@@ -596,36 +728,37 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx);
  */
 void lttng_consumer_cleanup(void);
 
-/*
- * Flush pending writes to trace output disk file.
- */
-void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
-               off_t orig_offset);
-
 /*
  * Poll on the should_quit pipe and the command socket return -1 on error and
  * should exit, 0 if data is available on the command socket
  */
 int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+/*
+ * Copy the fields from the channel that need to be accessed (read-only)
+ * directly from the stream.
+ */
+void consumer_stream_update_channel_attributes(
+               struct lttng_consumer_stream *stream,
+               struct lttng_consumer_channel *channel);
+
+struct lttng_consumer_stream *consumer_allocate_stream(
+               struct lttng_consumer_channel *channel,
+               uint64_t channel_key,
                uint64_t stream_key,
-               enum lttng_consumer_stream_state state,
                const char *channel_name,
-               uid_t uid,
-               gid_t gid,
                uint64_t relayd_id,
                uint64_t session_id,
+               struct lttng_trace_chunk *trace_chunk,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
                unsigned int monitor);
 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
+               const uint64_t *chunk_id,
                const char *pathname,
                const char *name,
-               uid_t uid,
-               gid_t gid,
                uint64_t relayd_id,
                enum lttng_event_output output,
                uint64_t tracefile_size,
@@ -633,6 +766,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id_per_pid,
                unsigned int monitor,
                unsigned int live_timer_interval,
+               bool is_in_live_session,
                const char *root_shm_path,
                const char *shm_path);
 void consumer_del_stream(struct lttng_consumer_stream *stream,
@@ -663,7 +797,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len,
+               struct lttng_consumer_stream *stream,
+               const struct lttng_buffer_view *buffer,
                unsigned long padding,
                struct ctf_packet_index *index);
 ssize_t lttng_consumer_on_read_subbuffer_splice(
@@ -671,9 +806,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
                struct ctf_packet_index *index);
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos);
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+               unsigned long *pos);
 int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
 int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
 void *consumer_thread_metadata_poll(void *data);
@@ -686,7 +824,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
                uint64_t sessiond_id, uint64_t relayd_session_id);
@@ -702,10 +840,39 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
                unsigned long produced_pos, uint64_t nb_packets_per_stream,
                uint64_t max_sb_size);
-int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_add_data_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
-int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+               uint64_t key, uint64_t relayd_id, uint32_t metadata,
+               struct lttng_consumer_local_data *ctx);
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+               uint64_t key, struct lttng_consumer_local_data *ctx);
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+               const uint64_t *relayd_id, uint64_t session_id,
+               uint64_t chunk_id,
+               time_t chunk_creation_timestamp,
+               const char *chunk_override_name,
+               const struct lttng_credentials *credentials,
+               struct lttng_directory_handle *chunk_directory_handle);
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+               const uint64_t *relayd_id, uint64_t session_id,
+               uint64_t chunk_id, time_t chunk_close_timestamp,
+               const enum lttng_trace_chunk_command_type *close_command,
+               char *path);
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
+               const uint64_t *relayd_id, uint64_t session_id,
+               uint64_t chunk_id);
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
+enum lttcomm_return_code lttng_consumer_init_command(
+               struct lttng_consumer_local_data *ctx,
+               const lttng_uuid sessiond_uuid);
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.029613 seconds and 4 git commands to generate.