/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
* Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
#include <limits.h>
#include <poll.h>
+#include <stdint.h>
#include <unistd.h>
#include <urcu/list.h>
#include <common/buffer-view.h>
#include <common/dynamic-array.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
struct lttng_consumer_local_data;
/* Commands for consumer */
*/
typedef void (*unlock_cb)(struct lttng_consumer_stream *);
+/*
+ * Assert that the stream and channel lock and any other stream type specific
+ * lock that need to be acquired during the processing of a read_subbuffer
+ * operation is acquired.
+ */
+typedef void (*assert_locked_cb)(struct lttng_consumer_stream *);
+
/*
* Invoked when a subbuffer's metadata version does not match the last
* known metadata version.
send_live_beacon_cb send_live_beacon;
on_sleep_cb on_sleep;
unlock_cb unlock;
+ assert_locked_cb assert_locked;
} read_subbuffer_ops;
struct metadata_bucket *metadata_bucket;
};
* This is nested OUTSIDE the stream lock.
* This is nested OUTSIDE the consumer_relayd_sock_pair lock.
*/
- pthread_mutex_t lock;
+ pthread_mutex_t lock {};
/*
* Number of streams in the data stream hash table declared outside.
* Protected by consumer_data.lock.
*/
- int stream_count;
+ int stream_count = 0;
/* Channel hash table protected by consumer_data.lock. */
- struct lttng_ht *channel_ht;
+ struct lttng_ht *channel_ht = nullptr;
/* Channel hash table indexed by session id. */
- struct lttng_ht *channels_by_session_id_ht;
+ struct lttng_ht *channels_by_session_id_ht = nullptr;
/*
* Flag specifying if the local array of FDs needs update in the
* poll function. Protected by consumer_data.lock.
*/
- unsigned int need_update;
- enum lttng_consumer_type type;
+ unsigned int need_update = 1;
+ enum lttng_consumer_type type = LTTNG_CONSUMER_UNKNOWN;
/*
* Relayd socket(s) hashtable indexed by network sequence number. Each
* stream has an index which associate the right relayd socket to use.
*/
- struct lttng_ht *relayd_ht;
+ struct lttng_ht *relayd_ht = nullptr;
/*
* This hash table contains all streams (metadata and data) indexed by
*
* This HT uses the "node_session_id" of the consumer stream.
*/
- struct lttng_ht *stream_list_ht;
+ struct lttng_ht *stream_list_ht = nullptr;
/*
* This HT uses the "node_channel_id" of the consumer stream.
*/
- struct lttng_ht *stream_per_chan_id_ht;
+ struct lttng_ht *stream_per_chan_id_ht = nullptr;
/*
* Trace chunk registry indexed by (session_id, chunk_id).
*/
- struct lttng_trace_chunk_registry *chunk_registry;
+ struct lttng_trace_chunk_registry *chunk_registry = nullptr;
};
/*
*/
extern int consumer_quit;
-/* Flag used to temporarily pause data consumption from testpoints. */
-extern int data_consumption_paused;
+/*
+ * Flag used to temporarily pause data consumption from testpoints.
+ *
+ * This variable is dlsym-ed from a test, so needs to be exported.
+ */
+LTTNG_EXPORT extern int data_consumption_paused;
/* Return a human-readable consumer type string that is suitable for logging. */
static inline
struct lttng_consumer_local_data *ctx,
bool locked_by_caller);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
- struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
- uint64_t sessiond_id, uint64_t relayd_session_id);
+void consumer_add_relayd_socket(uint64_t net_seq_idx,
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
+ struct pollfd *consumer_sockpoll,
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
void lttng_consumer_sigbus_handle(void *addr);
-#ifdef __cplusplus
-}
-#endif
-
#endif /* LIB_CONSUMER_H */