#include <common/utils.h>
#include <common/consumer-stream.h>
#include <common/index/index.h>
+#include <common/consumer-timer.h>
#include "kernel-consumer.h"
/* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
} else {
ret = consumer_add_channel(new_channel, ctx);
}
+ if (CONSUMER_CHANNEL_TYPE_DATA) {
+ consumer_timer_live_start(new_channel,
+ msg.u.channel.live_timer_interval);
+ }
/* If we received an error in add_channel, we need to report it. */
if (ret < 0) {
error:
return ret;
}
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+ int ret;
+
+ assert(metadata);
+
+ ret = kernctl_buffer_flush(metadata->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+
+ ret = kernctl_snapshot(metadata->wait_fd);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking kernel snapshot failed.");
+ goto end;
+ }
+ DBG("Sync metadata, no new kernel metadata");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+end:
+ return ret;
+}
/*
* Consume data on a file descriptor and write it on a trace file.
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 0;
+ int err, write_index = 1;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct lttng_packet_index index;
DBG("In read_subbuffer (infd : %d)", infd);
- /* Indicate that for this stream we have to write the index. */
- if (stream->index_fd >= 0) {
- write_index = 1;
- }
-
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
goto end;
}
- if (!stream->metadata_flag && write_index) {
+ if (!stream->metadata_flag) {
ret = get_index_values(&index, infd);
if (ret < 0) {
goto end;
}
+ } else {
+ write_index = 0;
}
switch (stream->chan->output) {
}
/* Write index if needed. */
- if (write_index) {
- err = index_write(stream->index_fd, &index, sizeof(index));
+ if (!write_index) {
+ goto end;
+ }
+
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
if (err < 0) {
- ret = -1;
goto end;
}
}
+ err = consumer_stream_write_index(stream, &index);
+ if (err < 0) {
+ goto end;
+ }
+
end:
return ret;
}