*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <assert.h>
#include <poll.h>
#include <pthread.h>
#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
+#include <common/compat/endian.h>
#include <common/index/index.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/sessiond-comm/relayd.h>
#include "consumer.h"
#include "consumer-stream.h"
#include "consumer-testpoint.h"
+#include "align.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
uint64_t tracefile_count,
uint64_t session_id_per_pid,
unsigned int monitor,
- unsigned int live_timer_interval)
+ unsigned int live_timer_interval,
+ const char *root_shm_path,
+ const char *shm_path)
{
struct lttng_consumer_channel *channel;
strncpy(channel->name, name, sizeof(channel->name));
channel->name[sizeof(channel->name) - 1] = '\0';
+ if (root_shm_path) {
+ strncpy(channel->root_shm_path, root_shm_path, sizeof(channel->root_shm_path));
+ channel->root_shm_path[sizeof(channel->root_shm_path) - 1] = '\0';
+ }
+ if (shm_path) {
+ strncpy(channel->shm_path, shm_path, sizeof(channel->shm_path));
+ channel->shm_path[sizeof(channel->shm_path) - 1] = '\0';
+ }
+
lttng_ht_node_init_u64(&channel->node, channel->key);
channel->wait_fd = -1;
*/
(*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
(*pollfd)[i].events = POLLIN | POLLPRI;
+
+ (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe);
+ (*pollfd)[i + 1].events = POLLIN | POLLPRI;
return i;
}
/*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
*/
int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
{
goto restart;
}
PERROR("Poll error");
- goto exit;
+ return -1;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
DBG("consumer_should_quit wake up");
- goto exit;
+ return 1;
}
return 0;
-
-exit:
- return -1;
}
/*
goto error_poll_pipe;
}
+ ctx->consumer_wakeup_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_wakeup_pipe) {
+ goto error_wakeup_pipe;
+ }
+
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
goto error_quit_pipe;
}
- ret = pipe(ctx->consumer_thread_pipe);
- if (ret < 0) {
- PERROR("Error creating thread pipe");
- goto error_thread_pipe;
- }
-
ret = pipe(ctx->consumer_channel_pipe);
if (ret < 0) {
PERROR("Error creating channel pipe");
goto error_metadata_pipe;
}
- ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe);
- if (ret < 0) {
- goto error_splice_pipe;
- }
-
return ctx;
-error_splice_pipe:
- lttng_pipe_destroy(ctx->consumer_metadata_pipe);
error_metadata_pipe:
utils_close_pipe(ctx->consumer_channel_pipe);
error_channel_pipe:
- utils_close_pipe(ctx->consumer_thread_pipe);
-error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
error_quit_pipe:
+ lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
+error_wakeup_pipe:
lttng_pipe_destroy(ctx->consumer_data_pipe);
error_poll_pipe:
free(ctx);
DBG("Consumer destroying it. Closing everything.");
+ if (!ctx) {
+ return;
+ }
+
destroy_data_stream_ht(data_ht);
destroy_metadata_stream_ht(metadata_ht);
if (ret) {
PERROR("close");
}
- utils_close_pipe(ctx->consumer_thread_pipe);
utils_close_pipe(ctx->consumer_channel_pipe);
lttng_pipe_destroy(ctx->consumer_data_pipe);
lttng_pipe_destroy(ctx->consumer_metadata_pipe);
+ lttng_pipe_destroy(ctx->consumer_wakeup_pipe);
utils_close_pipe(ctx->consumer_should_quit);
- utils_close_pipe(ctx->consumer_splice_metadata_pipe);
unlink(ctx->consumer_command_sock_path);
free(ctx);
ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
if (ret < sizeof(hdr)) {
/*
- * This error means that the fd's end is closed so ignore the perror
+ * This error means that the fd's end is closed so ignore the PERROR
* not to clubber the error output since this can happen in a normal
* code path.
*/
goto end;
}
}
-
- /*
- * Choose right pipe for splice. Metadata and trace data are handled by
- * different threads hence the use of two pipes in order not to race or
- * corrupt the written data.
- */
- if (stream->metadata_flag) {
- splice_pipe = ctx->consumer_splice_metadata_pipe;
- } else {
- splice_pipe = ctx->consumer_thread_pipe;
- }
+ splice_pipe = stream->splice_pipe;
/* Write metadata stream id before payload */
if (relayd) {
/* Splice data out */
ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("Consumer splice pipe to file, ret %zd", ret_splice);
+ DBG("Consumer splice pipe to file (out_fd: %d), ret %zd",
+ outfd, ret_splice);
if (ret_splice < 0) {
ret = errno;
written = -ret;
DBG("Metadata main loop started");
while (1) {
- health_code_update();
-
- /* Only the metadata pipe is set */
- if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
- err = 0; /* All is OK */
- goto end;
- }
-
restart:
- DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_code_update();
health_poll_entry();
+ DBG("Metadata poll wait");
ret = lttng_poll_wait(&events, -1);
+ DBG("Metadata poll return from wait with %d fd(s)",
+ LTTNG_POLL_GETNB(&events));
health_poll_exit();
DBG("Metadata event catched in thread");
if (ret < 0) {
ERR("Poll EINTR catched");
goto restart;
}
- goto error;
+ if (LTTNG_POLL_GETNB(&events) == 0) {
+ err = 0; /* All is OK */
+ }
+ goto end;
}
nb_fd = ret;
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & (LPOLLERR | LPOLLHUP )) {
DBG("Metadata thread pipe hung up");
/* All is OK */
err = 0;
-error:
end:
DBG("Metadata poll thread exiting");
free(local_stream);
local_stream = NULL;
- /* allocate for all fds + 1 for the consumer_data_pipe */
- pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ /*
+ * Allocate for all fds +1 for the consumer_data_pipe and +1 for
+ * wake up pipe.
+ */
+ pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- /* allocate for all fds + 1 for the consumer_data_pipe */
- local_stream = zmalloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 2) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 1);
+ DBG("polling on %d fd", nb_fd + 2);
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 1, -1);
+ num_rdy = poll(pollfd, nb_fd + 2, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
continue;
}
+ /* Handle wakeup pipe. */
+ if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) {
+ char dummy;
+ ssize_t pipe_readlen;
+
+ pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy,
+ sizeof(dummy));
+ if (pipe_readlen < 0) {
+ PERROR("Consumer data wakeup pipe");
+ }
+ /* We've been awakened to handle stream(s). */
+ ctx->has_wakeup = 0;
+ }
+
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
health_code_update();
continue;
}
if ((pollfd[i].revents & POLLIN) ||
- local_stream[i]->hangup_flush_done) {
+ local_stream[i]->hangup_flush_done ||
+ local_stream[i]->has_data) {
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
DBG("Channel main loop started");
while (1) {
- health_code_update();
-
- /* Only the channel pipe is set */
- if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
- err = 0; /* All is OK */
- goto end;
- }
-
restart:
- DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_code_update();
+ DBG("Channel poll wait");
health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ DBG("Channel poll return from wait with %d fd(s)",
+ LTTNG_POLL_GETNB(&events));
health_poll_exit();
DBG("Channel event catched in thread");
if (ret < 0) {
ERR("Poll EINTR catched");
goto restart;
}
+ if (LTTNG_POLL_GETNB(&events) == 0) {
+ err = 0; /* All is OK */
+ }
goto end;
}
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- /* Just don't waste time if no returned events for the fd */
if (!revents) {
+ /* No activity for this FD (poll implementation). */
continue;
}
+
if (pollfd == ctx->consumer_channel_pipe[0]) {
if (revents & (LPOLLERR | LPOLLHUP)) {
DBG("Channel thread pipe hung up");
assert(ctx);
assert(sockpoll);
- if (lttng_consumer_poll_socket(sockpoll) < 0) {
- ret = -1;
+ ret = lttng_consumer_poll_socket(sockpoll);
+ if (ret) {
goto error;
}
DBG("Metadata connection on client_socket");
consumer_sockpoll[1].fd = client_socket;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
DBG("Connection on client_socket");
* command unix socket.
*/
ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
- if (ret < 0) {
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
health_poll_entry();
ret = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret < 0) {
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
DBG("Incoming command on sock");
ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
- if (ret == -ENOENT) {
- DBG("Received STOP command");
- goto end;
- }
if (ret <= 0) {
/*
* This could simply be a session daemon quitting. Don't output
}
/* Poll on consumer socket. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ if (ret) {
+ /* Needing to exit in the middle of a command: error. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
goto error_nosignal;
*/
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (!ret) {
- /*
- * An empty output file is not valid. We need at least one packet
- * generated per stream, even if it contains no event, so it
- * contains at least one packet header.
- */
- if (stream->output_written == 0) {
- pthread_mutex_unlock(&stream->lock);
- goto data_pending;
- }
/* Check the stream if there is data in the buffers. */
ret = data_pending(stream);
if (ret == 1) {
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
}
-/*
- * Using a maximum stream size with the produced and consumed position of a
- * stream, computes the new consumed position to be as close as possible to the
- * maximum possible stream size.
- *
- * If maximum stream size is lower than the possible buffer size (produced -
- * consumed), the consumed_pos given is returned untouched else the new value
- * is returned.
- */
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
- unsigned long produced_pos, uint64_t max_stream_size)
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t nb_packets_per_stream,
+ uint64_t max_sb_size)
{
- if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
- /* Offset from the produced position to get the latest buffers. */
- return produced_pos - max_stream_size;
- }
+ unsigned long start_pos;
- return consumed_pos;
+ if (!nb_packets_per_stream) {
+ return consumed_pos; /* Grab everything */
+ }
+ start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
+ start_pos -= max_sb_size * nb_packets_per_stream;
+ if ((long) (start_pos - consumed_pos) < 0) {
+ return consumed_pos; /* Grab everything */
+ }
+ return start_pos;
}