consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / common / consumer / consumer.h
index d115a597aa876a77240303b1364229eb4d746b95..000040982d5dfb4bba6abaa2bb2a6a9e1f3d5b75 100644 (file)
@@ -1,20 +1,11 @@
 /*
- * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *               2012 - David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License, version 2 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #ifndef LIB_CONSUMER_H
 
 #include <common/hashtable/hashtable.h>
 #include <common/compat/fcntl.h>
-#include <common/compat/uuid.h>
+#include <common/uuid.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/pipe.h>
 #include <common/index/ctf-index.h>
+#include <common/trace-chunk-registry.h>
+#include <common/credentials.h>
+#include <common/buffer-view.h>
 
 /* Commands for consumer */
 enum lttng_consumer_command {
@@ -62,13 +56,12 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_LOST_PACKETS,
        LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
        LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
-};
-
-/* State of each fd in consumer */
-enum lttng_consumer_stream_state {
-       LTTNG_CONSUMER_ACTIVE_STREAM,
-       LTTNG_CONSUMER_PAUSE_STREAM,
-       LTTNG_CONSUMER_DELETE_STREAM,
+       LTTNG_CONSUMER_ROTATE_CHANNEL,
+       LTTNG_CONSUMER_INIT,
+       LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
+       LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
+       LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
+       LTTNG_CONSUMER_CLEAR_CHANNEL,
 };
 
 enum lttng_consumer_type {
@@ -104,26 +97,45 @@ struct stream_list {
 struct consumer_metadata_cache;
 
 struct lttng_consumer_channel {
+       /* Is the channel published in the channel hash tables? */
+       bool is_published;
+       /*
+        * Was the channel deleted (logically) and waiting to be reclaimed?
+        * If this flag is set, no modification that is not cleaned-up by the
+        * RCU reclamation callback should be made
+        */
+       bool is_deleted;
        /* HT node used for consumer_data.channel_ht */
        struct lttng_ht_node_u64 node;
+       /* HT node used for consumer_data.channels_by_session_id_ht */
+       struct lttng_ht_node_u64 channels_by_session_id_ht_node;
        /* Indexed key. Incremented value in the consumer. */
        uint64_t key;
        /* Number of streams referencing this channel */
        int refcount;
        /* Tracing session id on the session daemon side. */
        uint64_t session_id;
+       /* Current trace chunk of the session in which this channel exists. */
+       struct lttng_trace_chunk *trace_chunk;
        /*
         * Session id when requesting metadata to the session daemon for
         * a session with per-PID buffers.
         */
        uint64_t session_id_per_pid;
-       /* Channel trace file path name. */
+       /*
+        * In the case of local streams, this field contains the channel's
+        * output path; a path relative to the session's output path.
+        *   e.g. ust/uid/1000/64-bit
+        *
+        * In the case of remote streams, the contents of this field depends
+        * on the version of the relay daemon peer. For 2.11+ peers, the
+        * contents are the same as in the local case. However, for legacy
+        * peers, this contains a path of the form:
+        *   /hostname/session_path/ust/uid/1000/64-bit
+        */
        char pathname[PATH_MAX];
        /* Channel name. */
        char name[LTTNG_SYMBOL_NAME_LEN];
-       /* UID and GID of the session owning this channel. */
-       uid_t uid;
-       gid_t gid;
        /* Relayd id of the channel. -1ULL if it does not apply. */
        uint64_t relayd_id;
        /*
@@ -139,7 +151,7 @@ struct lttng_consumer_channel {
        /* For UST */
        uid_t ust_app_uid;      /* Application UID. */
        struct ustctl_consumer_channel *uchan;
-       unsigned char uuid[UUID_STR_LEN];
+       unsigned char uuid[LTTNG_UUID_STR_LEN];
        /*
         * Temporary stream list used to store the streams once created and waiting
         * to be sent to the session daemon by receiving the
@@ -171,6 +183,8 @@ struct lttng_consumer_channel {
        int live_timer_enabled;
        timer_t live_timer;
        int live_timer_error;
+       /* Channel is part of a live session ? */
+       bool is_live;
 
        /* For channel monitoring timer. */
        int monitor_timer_enabled;
@@ -220,6 +234,8 @@ struct lttng_consumer_channel {
        int nr_stream_fds;
        char root_shm_path[PATH_MAX];
        char shm_path[PATH_MAX];
+       /* Only set for UST channels. */
+       LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials;
        /* Total number of discarded events for that channel. */
        uint64_t discarded_events;
        /* Total number of missed packets due to overwriting (overwrite). */
@@ -241,6 +257,12 @@ struct lttng_consumer_stream {
        struct lttng_ht_node_u64 node_session_id;
        /* Pointer to associated channel. */
        struct lttng_consumer_channel *chan;
+       /*
+        * Current trace chunk. Holds a reference to the trace chunk.
+        * `chunk` can be NULL when a stream is not associated to a chunk, e.g.
+        * when it was created in the context of a no-output session.
+        */
+       struct lttng_trace_chunk *trace_chunk;
 
        /* Key by which the stream is indexed for 'node'. */
        uint64_t key;
@@ -253,7 +275,6 @@ struct lttng_consumer_stream {
        off_t out_fd_offset;
        /* Amount of bytes written to the output */
        uint64_t output_written;
-       enum lttng_consumer_stream_state state;
        int shm_fd_is_copy;
        int data_read;
        int hangup_flush_done;
@@ -277,6 +298,11 @@ struct lttng_consumer_stream {
         */
        bool quiescent;
 
+       /*
+        * True if the sequence number is not available (lttng-modules < 2.8).
+        */
+       bool sequence_number_unavailable;
+
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
@@ -292,7 +318,7 @@ struct lttng_consumer_stream {
        bool missed_metadata_flush;
 
        enum lttng_event_output output;
-       /* Maximum subbuffer size. */
+       /* Maximum subbuffer size (in bytes). */
        unsigned long max_sb_size;
 
        /*
@@ -305,9 +331,6 @@ struct lttng_consumer_stream {
        /* For UST */
 
        int wait_fd;
-       /* UID/GID of the user owning the session to which stream belongs */
-       uid_t uid;
-       gid_t gid;
        /* Network sequence number. Indicating on which relayd socket it goes. */
        uint64_t net_seq_idx;
        /*
@@ -413,6 +436,30 @@ struct lttng_consumer_stream {
        pthread_cond_t metadata_rdv;
        pthread_mutex_t metadata_rdv_lock;
 
+       /*
+        * rotate_position represents the packet sequence number of the last
+        * packet which belongs to the current trace chunk prior to the rotation.
+        * When that position is reached, this tracefile can be closed and a
+        * new one is created in channel_read_only_attributes.path.
+        */
+       uint64_t rotate_position;
+
+       /*
+        * Read-only copies of channel values. We cannot safely access the
+        * channel from a stream, so we need to have a local copy of these
+        * fields in the stream object. These fields should be removed from
+        * the stream objects when we introduce refcounting.
+        */
+       struct {
+               uint64_t tracefile_size;
+       } channel_read_only_attributes;
+
+       /*
+        * Flag to inform the data or metadata thread that a stream is
+        * ready to be rotated.
+        */
+       bool rotate_ready;
+
        /* Indicate if the stream still has some data to be read. */
        unsigned int has_data:1;
        /*
@@ -429,7 +476,7 @@ struct consumer_relayd_sock_pair {
        /* Network sequence number. */
        uint64_t net_seq_idx;
        /* Number of stream associated with this relayd */
-       unsigned int refcount;
+       int refcount;
 
        /*
         * This flag indicates whether or not we should destroy this object. The
@@ -463,6 +510,7 @@ struct consumer_relayd_sock_pair {
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
        uint64_t sessiond_session_id;
+       struct lttng_consumer_local_data *ctx;
 };
 
 /*
@@ -552,6 +600,7 @@ struct lttng_consumer_local_data {
         * to the session daemon (write-only).
         */
        int channel_monitor_pipe;
+       LTTNG_OPTIONAL(lttng_uuid) sessiond_uuid;
 };
 
 /*
@@ -576,6 +625,8 @@ struct lttng_consumer_global_data {
 
        /* Channel hash table protected by consumer_data.lock. */
        struct lttng_ht *channel_ht;
+       /* Channel hash table indexed by session id. */
+       struct lttng_ht *channels_by_session_id_ht;
        /*
         * Flag specifying if the local array of FDs needs update in the
         * poll function. Protected by consumer_data.lock.
@@ -602,6 +653,11 @@ struct lttng_consumer_global_data {
         * This HT uses the "node_channel_id" of the consumer stream.
         */
        struct lttng_ht *stream_per_chan_id_ht;
+
+       /*
+        * Trace chunk registry indexed by (session_id, chunk_id).
+        */
+       struct lttng_trace_chunk_registry *chunk_registry;
 };
 
 /*
@@ -619,6 +675,24 @@ extern int consumer_quit;
 /* Flag used to temporarily pause data consumption from testpoints. */
 extern int data_consumption_paused;
 
+/* Return a human-readable consumer type string that is suitable for logging. */
+static inline
+const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+{
+       switch (type) {
+       case LTTNG_CONSUMER_UNKNOWN:
+               return "unknown";
+       case LTTNG_CONSUMER_KERNEL:
+               return "kernel";
+       case LTTNG_CONSUMER32_UST:
+               return "32-bit user space";
+       case LTTNG_CONSUMER64_UST:
+               return "64-bit user space";
+       default:
+               abort();
+       }
+}
+
 /*
  * Init consumer data structures.
  */
@@ -660,24 +734,31 @@ void lttng_consumer_cleanup(void);
  */
 int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
 
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+/*
+ * Copy the fields from the channel that need to be accessed (read-only)
+ * directly from the stream.
+ */
+void consumer_stream_update_channel_attributes(
+               struct lttng_consumer_stream *stream,
+               struct lttng_consumer_channel *channel);
+
+struct lttng_consumer_stream *consumer_allocate_stream(
+               struct lttng_consumer_channel *channel,
+               uint64_t channel_key,
                uint64_t stream_key,
-               enum lttng_consumer_stream_state state,
                const char *channel_name,
-               uid_t uid,
-               gid_t gid,
                uint64_t relayd_id,
                uint64_t session_id,
+               struct lttng_trace_chunk *trace_chunk,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
                unsigned int monitor);
 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
+               const uint64_t *chunk_id,
                const char *pathname,
                const char *name,
-               uid_t uid,
-               gid_t gid,
                uint64_t relayd_id,
                enum lttng_event_output output,
                uint64_t tracefile_size,
@@ -685,6 +766,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id_per_pid,
                unsigned int monitor,
                unsigned int live_timer_interval,
+               bool is_in_live_session,
                const char *root_shm_path,
                const char *shm_path);
 void consumer_del_stream(struct lttng_consumer_stream *stream,
@@ -715,7 +797,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len,
+               struct lttng_consumer_stream *stream,
+               const struct lttng_buffer_view *buffer,
                unsigned long padding,
                struct ctf_packet_index *index);
 ssize_t lttng_consumer_on_read_subbuffer_splice(
@@ -723,9 +806,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_stream *stream, unsigned long len,
                unsigned long padding,
                struct ctf_packet_index *index);
+int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos);
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+               unsigned long *pos);
 int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
 int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
 void *consumer_thread_metadata_poll(void *data);
@@ -754,10 +840,39 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
 unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
                unsigned long produced_pos, uint64_t nb_packets_per_stream,
                uint64_t max_sb_size);
-int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_add_data_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
-int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+               uint64_t key, uint64_t relayd_id, uint32_t metadata,
+               struct lttng_consumer_local_data *ctx);
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream);
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+               uint64_t key, struct lttng_consumer_local_data *ctx);
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+               const uint64_t *relayd_id, uint64_t session_id,
+               uint64_t chunk_id,
+               time_t chunk_creation_timestamp,
+               const char *chunk_override_name,
+               const struct lttng_credentials *credentials,
+               struct lttng_directory_handle *chunk_directory_handle);
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+               const uint64_t *relayd_id, uint64_t session_id,
+               uint64_t chunk_id, time_t chunk_close_timestamp,
+               const enum lttng_trace_chunk_command_type *close_command,
+               char *path);
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
+               const uint64_t *relayd_id, uint64_t session_id,
+               uint64_t chunk_id);
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
+enum lttcomm_return_code lttng_consumer_init_command(
+               struct lttng_consumer_local_data *ctx,
+               const lttng_uuid sessiond_uuid);
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.027987 seconds and 4 git commands to generate.