#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
+#include <common/compat/endian.h>
#include <common/index/index.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/sessiond-comm/relayd.h>
*/
(*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
(*pollfd)[i].events = POLLIN | POLLPRI;
+
+ (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+ (*pollfd)[i + 1].events = POLLIN | POLLPRI;
return i;
}
goto error_poll_pipe;
}
+ ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_wakeup_pipe) {
+ goto error_wakeup_pipe;
+ }
+
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
error_quit_pipe:
+ lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
lttng_pipe_destroy(ctx->consumer_data_pipe);
error_poll_pipe:
free(ctx);
DBG("Consumer destroying it. Closing everything.");
+ if (!ctx) {
+ return;
+ }
+
destroy_data_stream_ht(data_ht);
destroy_metadata_stream_ht(metadata_ht);
utils_close_pipe(ctx->consumer_channel_pipe);
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+ lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
utils_close_pipe(ctx->consumer_should_quit);
utils_close_pipe(ctx->consumer_splice_metadata_pipe);
free(local_stream);
local_stream = NULL;
- /* allocate for all fds + 1 for the consumer_data_pipe */
- pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ /*
+ * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+ * wake up pipe.
+ */
+ pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- /* allocate for all fds + 1 for the consumer_data_pipe */
- local_stream = zmalloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 2) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 1);
+ DBG("polling on %d fd", nb_fd + 2);
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 1, -1);
+ num_rdy = poll(pollfd, nb_fd + 2, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
continue;
}
+ /* Handle wakeup pipe. */
+ if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+ char dummy;
+ ssize_t pipe_readlen;
+
+ pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+ sizeof(dummy));
+ if (pipe_readlen < 0) {
+ PERROR("Consumer data wakeup pipe");
+ }
+ /* We've been awakened to handle stream(s). */
+ ctx->has_wakeup = 0;
+ }
+
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
health_code_update();
continue;
}
if ((pollfd[i].revents & POLLIN) ||
- local_stream[i]->hangup_flush_done) {
+ local_stream[i]->hangup_flush_done ||
+ local_stream[i]->has_data) {
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */