struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int stream_key, uint32_t state))
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd->sessiond_session_id = sessiond_id;
relayd_created = 1;
}
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
}
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size)
+{
+ if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+ /* Offset from the produced position to get the latest buffers. */
+ return produced_pos - max_stream_size;
+ }
+
+ return consumed_pos;
+}