*/
struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
- int (*buffer_ready)(struct lttng_consumer_stream *stream),
+ int (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
int (*update_stream)(int stream_key, uint32_t state))
goto end;
}
- /* No FDs and consumer_quit, kconsumer_cleanup the thread */
+ /* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && consumer_quit == 1) {
goto end;
}
* array. We want to prioritize array update over
* low-priority reads.
*/
- if (pollfd[nb_fd].revents == POLLIN) {
+ if (pollfd[nb_fd].revents & POLLIN) {
DBG("consumer_poll_pipe wake up");
tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
if (tmp2 < 0) {
- perror("read kconsumer poll");
+ perror("read consumer poll");
}
continue;
}
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
- switch(pollfd[i].revents) {
- case POLLERR:
+ if (pollfd[i].revents & POLLPRI) {
+ DBG("Urgent read on fd %d", pollfd[i].fd);
+ high_prio = 1;
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ /* it's ok to have an unavailable sub-buffer */
+ if (ret == EAGAIN) {
+ ret = 0;
+ }
+ } else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLHUP:
- DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ } else if (pollfd[i].revents & POLLNVAL) {
+ ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLNVAL:
- ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
+ } else if ((pollfd[i].revents & POLLHUP &&
+ !(pollfd[i].revents & POLLIN))) {
+ DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLPRI:
- DBG("Urgent read on fd %d", pollfd[i].fd);
- high_prio = 1;
- ret = ctx->on_buffer_ready(local_stream[i]);
- /* it's ok to have an unavailable sub-buffer */
- if (ret == EAGAIN) {
- ret = 0;
- }
- break;
}
}
/* Take care of low priority channels. */
if (high_prio == 0) {
for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents == POLLIN) {
+ if (pollfd[i].revents & POLLIN) {
DBG("Normal read on fd %d", pollfd[i].fd);
- ret = ctx->on_buffer_ready(local_stream[i]);
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable subbuffer */
if (ret == EAGAIN) {
ret = 0;
}
return NULL;
}
+
+int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_read_subbuffer(stream, ctx);
+ case LTTNG_CONSUMER_UST:
+ return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_on_recv_stream(stream);
+ case LTTNG_CONSUMER_UST:
+ return lttng_ustconsumer_on_recv_stream(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}