X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.h;h=71ae39903a8477c341aff3c322acf6602f049604;hb=5bbf8b1b7cb4cc72bd96917de7dac906d2628161;hp=dc5fc9968be02fbf45b223ef20a8371fdc4e0005;hpb=10a8a2237343699e3923d87e24dbf2d7fe225377;p=lttng-tools.git diff --git a/src/common/consumer.h b/src/common/consumer.h index dc5fc9968..71ae39903 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -26,7 +26,8 @@ #include -#include "src/common/hashtable/hashtable.h" +#include +#include /* * When the receiving thread dies, we need to have a way to make the polling @@ -113,6 +114,7 @@ struct lttng_consumer_stream { /* For UST */ struct lttng_ust_lib_ring_buffer *buf; int cpu; + int data_read; int hangup_flush_done; /* UID/GID of the user owning the session to which stream belongs */ uid_t uid; @@ -124,8 +126,11 @@ struct lttng_consumer_stream { * process. */ struct lttng_consumer_local_data { - /* function to call when data is available on a buffer */ - int (*on_buffer_ready)(struct lttng_consumer_stream *stream, + /* + * Function to call when data is available on a buffer. + * Returns the number of bytes read, or negative error value. + */ + ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); /* * function to call when we receive a new channel, it receives a @@ -284,16 +289,16 @@ int consumer_add_channel(struct lttng_consumer_channel *channel); extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, - int (*buffer_ready)(struct lttng_consumer_stream *stream, + ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(int sessiond_key, uint32_t state)); extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); -extern int lttng_consumer_on_read_subbuffer_mmap( +extern ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len); -extern int lttng_consumer_on_read_subbuffer_splice( +extern ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len); extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx, @@ -307,7 +312,7 @@ extern void *lttng_consumer_thread_receive_fds(void *data); extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); -int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, +ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);