#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
+#include <common/index/index.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/sessiond-comm/relayd.h>
#include <common/sessiond-comm/sessiond-comm.h>
stream->key = stream_key;
stream->out_fd = -1;
stream->out_fd_offset = 0;
+ stream->output_written = 0;
stream->state = state;
stream->uid = uid;
stream->gid = gid;
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+ stream->index_fd = -1;
pthread_mutex_init(&stream->lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
uint64_t tracefile_size,
uint64_t tracefile_count,
uint64_t session_id_per_pid,
- unsigned int monitor)
+ unsigned int monitor,
+ unsigned int live_timer_interval)
{
struct lttng_consumer_channel *channel;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
+ channel->live_timer_interval = live_timer_interval;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding)
+ unsigned long padding,
+ struct lttng_packet_index *index)
{
unsigned long mmap_offset;
void *mmap_base;
ret = utils_rotate_stream_file(stream->chan->pathname,
stream->name, stream->chan->tracefile_size,
stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current));
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
if (ret < 0) {
ERR("Rotating output file");
goto end;
}
- outfd = stream->out_fd = ret;
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
+ if (index) {
+ index->offset = htobe64(stream->out_fd_offset);
+ }
}
while (len > 0) {
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
}
+ stream->output_written += ret;
written += ret;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding)
+ unsigned long padding,
+ struct lttng_packet_index *index)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
ret = utils_rotate_stream_file(stream->chan->pathname,
stream->name, stream->chan->tracefile_size,
stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current));
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
if (ret < 0) {
ERR("Rotating output file");
goto end;
}
- outfd = stream->out_fd = ret;
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
+ index->offset = htobe64(stream->out_fd_offset);
}
while (len > 0) {
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret_splice;
}
+ stream->output_written += ret_splice;
written += ret_splice;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- /* Just don't waste time if no returned events for the fd */
- if (!revents) {
- continue;
- }
-
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & (LPOLLERR | LPOLLHUP )) {
DBG("Metadata thread pipe hung up");
*/
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (!ret) {
+ /*
+ * An empty output file is not valid. We need at least one packet
+ * generated per stream, even if it contains no event, so it
+ * contains at least one packet header.
+ */
+ if (stream->output_written == 0) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_pending;
+ }
/* Check the stream if there is data in the buffers. */
ret = data_pending(stream);
if (ret == 1) {