/*
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
-extern volatile int consumer_quit;
/*
* Take a snapshot for a specific fd
return ret;
}
+/*
+ * Sample consumed and produced positions for a specific fd.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_kconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+
+ return kernctl_snapshot_sample_positions(stream->wait_fd);
+}
+
/*
* Get the produced position
*
ERR("sending streams sent to relayd");
goto end_unlock;
}
+ channel->streams_sent_to_relayd = true;
}
- ret = kernctl_buffer_flush(stream->wait_fd);
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
if (ret < 0) {
- ERR("Failed to flush kernel stream");
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect
+ * for stream intersection, but required as a
+ * fall-back when "flush_empty" is not
+ * implemented by lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end_unlock;
+ }
goto end_unlock;
}
} else {
ret = consumer_add_channel(new_channel, ctx);
}
- if (CONSUMER_CHANNEL_TYPE_DATA) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ int monitor_start_ret;
+
+ DBG("Consumer starting monitor timer");
consumer_timer_live_start(new_channel,
msg.u.channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ new_channel,
+ msg.u.channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_nosignal;
+ }
+
}
health_code_update();
consumer_stream_free(new_stream);
goto end_nosignal;
}
+
+ /*
+ * If adding an extra stream to an already
+ * existing channel (e.g. cpu hotplug), we need
+ * to send the "streams_sent" command to relayd.
+ */
+ if (channel->streams_sent_to_relayd) {
+ ret = consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
}
/* Get the right pipe where the stream will be sent. */
if (ret < 0) {
goto end_nosignal;
}
+ channel->streams_sent_to_relayd = true;
}
break;
}
break;
}
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+ 1);
+ if (ret != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret = consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret) {
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ break;
+ }
default:
goto end_nosignal;
}
}
ret = update_stream_stats(stream);
if (ret < 0) {
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto end;
+ }
goto end;
}
} else {
write_index = 0;
ret = metadata_stream_check_version(infd, stream);
if (ret < 0) {
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto end;
+ }
goto end;
}
}
stream->tracefile_size_current = 0;
if (!stream->metadata_flag) {
- ret = index_create_file(stream->chan->pathname,
+ struct lttng_index_file *index_file;
+
+ index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!index_file) {
goto error;
}
- stream->index_fd = ret;
+ assert(!stream->index_file);
+ stream->index_file = index_file;
}
}