* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* 2012 - David Goulet <dgoulet@efficios.com>
+ * 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
LTTNG_CONSUMER_LOST_PACKETS,
LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE,
- LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE,
LTTNG_CONSUMER_ROTATE_CHANNEL,
LTTNG_CONSUMER_ROTATE_RENAME,
- LTTNG_CONSUMER_ROTATE_PENDING_RELAY,
+ LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL,
+ LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY,
LTTNG_CONSUMER_MKDIR,
};
bool streams_sent_to_relayd;
- /*
- * Account how many streams are waiting for their rotation to be
- * complete. When this number reaches 0, we inform the session
- * daemon that this channel has finished its rotation.
- */
- uint64_t nr_stream_rotate_pending;
-
/*
* The chunk id where we currently write the data. This value is sent
* to the relay when we add a stream and when a stream rotates. This
uint64_t last_discarded_events;
/* Copy of the sequence number of the last packet extracted. */
uint64_t last_sequence_number;
+ /*
+ * A stream is created with a trace_archive_id matching the session's
+ * current trace archive id at the time of the creation of the stream.
+ * It is incremented when the rotate_position is reached.
+ */
+ uint64_t trace_archive_id;
/*
* Index file object of the index file for this stream.
*/
/* Session id on both sides for the sockets. */
uint64_t relayd_session_id;
uint64_t sessiond_session_id;
+ struct lttng_consumer_local_data *ctx;
};
/*
* to the session daemon (write-only).
*/
int channel_monitor_pipe;
- /*
- * Pipe used to inform the session daemon that a stream has finished
- * its rotation (write-only).
- */
- int channel_rotate_pipe;
};
/*
/* Flag used to temporarily pause data consumption from testpoints. */
extern int data_consumption_paused;
+/* Return a human-readable consumer type string that is suitable for logging. */
+static inline
+const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+{
+ switch (type) {
+ case LTTNG_CONSUMER_UNKNOWN:
+ return "unknown";
+ case LTTNG_CONSUMER_KERNEL:
+ return "kernel";
+ case LTTNG_CONSUMER32_UST:
+ return "32-bit user space";
+ case LTTNG_CONSUMER64_UST:
+ return "64-bit user space";
+ default:
+ abort();
+ }
+}
+
/*
* Init consumer data structures.
*/
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
- unsigned int monitor);
+ unsigned int monitor,
+ uint64_t trace_archive_id);
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
uint64_t session_id,
const char *pathname,
void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
-int lttng_consumer_rotate_channel(uint64_t key, const char *path,
- uint64_t relayd_id, uint32_t metadata,
- uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx);
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, const char *path, uint64_t relayd_id,
+ uint32_t metadata, uint64_t new_chunk_id,
+ struct lttng_consumer_local_data *ctx);
int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, bool *rotated);
-int lttng_consumer_rotate_ready_streams(uint64_t key,
- struct lttng_consumer_local_data *ctx);
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx);
int lttng_consumer_rotate_rename(const char *current_path, const char *new_path,
uid_t uid, gid_t gid, uint64_t relayd_id);
-int lttng_consumer_rotate_pending_relay( uint64_t session_id,
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+ uint64_t chunk_id);
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
uint64_t relayd_id, uint64_t chunk_id);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
uint64_t relayd_id);
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
#endif /* LIB_CONSUMER_H */