#include <unistd.h>
#include <sys/stat.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/relayd/relayd.h>
#include <common/utils.h>
#include <common/consumer-stream.h>
+#include <common/index/index.h>
+#include <common/consumer-timer.h>
#include "kernel-consumer.h"
}
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/*
* Lock stream because we are about to change its state.
*/
ret = utils_create_stream_file(path, stream->name,
stream->chan->tracefile_size,
stream->tracefile_count_current,
- stream->uid, stream->gid);
+ stream->uid, stream->gid, NULL);
if (ret < 0) {
ERR("utils_create_stream_file");
goto end_unlock;
ssize_t read_len;
unsigned long len, padded_len;
+ health_code_update();
+
DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
}
read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
- padded_len - len);
+ padded_len - len, NULL);
/*
* We write the padded len in local tracefiles but the data len
* when using a relay. Display the error but continue processing
ret = utils_create_stream_file(path, metadata_stream->name,
metadata_stream->chan->tracefile_size,
metadata_stream->tracefile_count_current,
- metadata_stream->uid, metadata_stream->gid);
+ metadata_stream->uid, metadata_stream->gid, NULL);
if (ret < 0) {
goto error;
}
}
do {
+ health_code_update();
+
ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
if (ret_read < 0) {
if (ret_read != -EAGAIN) {
enum lttng_error_code ret_code = LTTNG_OK;
struct lttcomm_consumer_msg msg;
+ health_code_update();
+
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
if (ret > 0) {
}
return ret;
}
+
+ health_code_update();
+
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
/*
* Notify the session daemon that the command is completed.
return -ENOENT;
}
+ health_code_update();
+
/* relayd needs RCU read-side protection */
rcu_read_lock();
/* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
struct lttng_consumer_channel *new_channel;
int ret_recv;
+ health_code_update();
+
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
+
+ health_code_update();
+
DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
msg.u.channel.session_id, msg.u.channel.pathname,
msg.u.channel.relayd_id, msg.u.channel.output,
msg.u.channel.tracefile_size,
msg.u.channel.tracefile_count, 0,
- msg.u.channel.monitor);
+ msg.u.channel.monitor,
+ msg.u.channel.live_timer_interval);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
goto end_nosignal;
};
+ health_code_update();
+
if (ctx->on_recv_channel != NULL) {
ret_recv = ctx->on_recv_channel(new_channel);
if (ret_recv == 0) {
} else {
ret = consumer_add_channel(new_channel, ctx);
}
+ if (CONSUMER_CHANNEL_TYPE_DATA) {
+ consumer_timer_live_start(new_channel,
+ msg.u.channel.live_timer_interval);
+ }
+
+ health_code_update();
/* If we received an error in add_channel, we need to report it. */
if (ret < 0) {
ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
}
+ health_code_update();
+
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
+
+ health_code_update();
+
if (ret_code != LTTNG_OK) {
/* Channel was not found. */
goto end_nosignal;
}
/* Blocking call */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
rcu_read_unlock();
return -EINTR;
}
+ health_code_update();
+
/* Get stream file descriptor from socket */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
return ret;
}
+ health_code_update();
+
/*
* Send status code to session daemon only if the recv works. If the
* above recv() failed, the session daemon is notified through the
goto end_nosignal;
}
+ health_code_update();
+
new_stream = consumer_allocate_stream(channel->key,
fd,
LTTNG_CONSUMER_ACTIVE_STREAM,
*/
new_stream->hangup_flush_done = 0;
+ health_code_update();
+
if (ctx->on_recv_stream) {
ret = ctx->on_recv_stream(new_stream);
if (ret < 0) {
}
}
+ health_code_update();
+
if (new_stream->metadata_flag) {
channel->metadata_stream = new_stream;
}
/* Vitible to other threads */
new_stream->globally_visible = 1;
+ health_code_update();
+
ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
consumer_flag_relayd_for_destroy(relayd);
}
+ health_code_update();
+
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
ret = consumer_data_pending(id);
+ health_code_update();
+
/* Send back returned value to session daemon */
ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
if (ret < 0) {
}
}
+ health_code_update();
+
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
}
+ health_code_update();
+
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
+ health_code_update();
+
/*
* This command should ONLY be issued for channel with streams set in
* no monitor mode.
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
*/
+ health_code_update();
return 1;
error_fatal:
return -1;
}
+/*
+ * Populate index values of a kernel stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct lttng_packet_index *index, int infd)
+{
+ int ret;
+
+ ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin);
+ if (ret < 0) {
+ PERROR("kernctl_get_timestamp_begin");
+ goto error;
+ }
+ index->timestamp_begin = htobe64(index->timestamp_begin);
+
+ ret = kernctl_get_timestamp_end(infd, &index->timestamp_end);
+ if (ret < 0) {
+ PERROR("kernctl_get_timestamp_end");
+ goto error;
+ }
+ index->timestamp_end = htobe64(index->timestamp_end);
+
+ ret = kernctl_get_events_discarded(infd, &index->events_discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto error;
+ }
+ index->events_discarded = htobe64(index->events_discarded);
+
+ ret = kernctl_get_content_size(infd, &index->content_size);
+ if (ret < 0) {
+ PERROR("kernctl_get_content_size");
+ goto error;
+ }
+ index->content_size = htobe64(index->content_size);
+
+ ret = kernctl_get_packet_size(infd, &index->packet_size);
+ if (ret < 0) {
+ PERROR("kernctl_get_packet_size");
+ goto error;
+ }
+ index->packet_size = htobe64(index->packet_size);
+
+ ret = kernctl_get_stream_id(infd, &index->stream_id);
+ if (ret < 0) {
+ PERROR("kernctl_get_stream_id");
+ goto error;
+ }
+ index->stream_id = htobe64(index->stream_id);
+
+error:
+ return ret;
+}
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+ int ret;
+
+ assert(metadata);
+
+ ret = kernctl_buffer_flush(metadata->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+
+ ret = kernctl_snapshot(metadata->wait_fd);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking kernel snapshot failed.");
+ goto end;
+ }
+ DBG("Sync metadata, no new kernel metadata");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
/*
* Consume data on a file descriptor and write it on a trace file.
*/
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err;
+ int err, write_index = 1;
ssize_t ret = 0;
int infd = stream->wait_fd;
+ struct lttng_packet_index index;
DBG("In read_subbuffer (infd : %d)", infd);
+
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
goto end;
}
+ if (!stream->metadata_flag) {
+ ret = get_index_values(&index, infd);
+ if (ret < 0) {
+ goto end;
+ }
+ } else {
+ write_index = 0;
+ }
+
switch (stream->chan->output) {
case CONSUMER_CHANNEL_SPLICE:
/*
/* splice the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
- padding);
+ padding, &index);
/*
* XXX: Splice does not support network streaming so the return value
* is simply checked against subbuf_size and not like the mmap() op.
*/
ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
ret, subbuf_size);
+ write_index = 0;
}
break;
case CONSUMER_CHANNEL_MMAP:
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
- padding);
+ padding, &index);
/*
* The mmap operation should write subbuf_size amount of data when
* network streaming or the full padding (len) size when we are _not_
ERR("Error writing to tracefile "
"(ret: %zd != len: %lu != subbuf_size: %lu)",
ret, len, subbuf_size);
+ write_index = 0;
}
break;
default:
goto end;
}
+ /* Write index if needed. */
+ if (!write_index) {
+ goto end;
+ }
+
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
+ if (err < 0) {
+ goto end;
+ }
+ }
+
+ err = consumer_stream_write_index(stream, &index);
+ if (err < 0) {
+ goto end;
+ }
+
end:
return ret;
}
if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
ret = utils_create_stream_file(stream->chan->pathname, stream->name,
stream->chan->tracefile_size, stream->tracefile_count_current,
- stream->uid, stream->gid);
+ stream->uid, stream->gid, NULL);
if (ret < 0) {
goto error;
}
stream->out_fd = ret;
stream->tracefile_size_current = 0;
+
+ if (!stream->metadata_flag) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->index_fd = ret;
+ }
}
if (stream->output == LTTNG_EVENT_MMAP) {