/*
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
ret = kernctl_snapshot(infd);
if (ret != 0) {
PERROR("Getting sub-buffer snapshot.");
- ret = -errno;
}
return ret;
}
+/*
+ * Sample consumed and produced positions for a specific fd.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_kconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+
+ return kernctl_snapshot_sample_positions(stream->wait_fd);
+}
+
/*
* Get the produced position
*
ret = kernctl_snapshot_get_produced(infd, pos);
if (ret != 0) {
PERROR("kernctl_snapshot_get_produced");
- ret = -errno;
}
return ret;
ret = kernctl_snapshot_get_consumed(infd, pos);
if (ret != 0) {
PERROR("kernctl_snapshot_get_consumed");
- ret = -errno;
}
return ret;
struct lttng_consumer_local_data *ctx)
{
int ret;
- unsigned long consumed_pos, produced_pos;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
/* Are we at a position _before_ the first available packet ? */
bool before_first_packet = true;
+ unsigned long consumed_pos, produced_pos;
health_code_update();
ERR("sending streams sent to relayd");
goto end_unlock;
}
+ channel->streams_sent_to_relayd = true;
}
- ret = kernctl_buffer_flush(stream->wait_fd);
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
if (ret < 0) {
- ERR("Failed to flush kernel stream");
- ret = -errno;
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect
+ * for stream intersection, but required as a
+ * fall-back when "flush_empty" is not
+ * implemented by lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end_unlock;
+ }
goto end_unlock;
}
&stream->max_sb_size);
if (ret < 0) {
ERR("Getting kernel max_sb_size");
- ret = -errno;
goto end_unlock;
}
}
ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
if (ret < 0) {
- if (errno != EAGAIN) {
+ if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- ret = -errno;
goto end_unlock;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
if (ret < 0) {
ERR("Snapshot kernctl_get_subbuf_size");
- ret = -errno;
goto error_put_subbuf;
}
ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
if (ret < 0) {
ERR("Snapshot kernctl_get_padded_subbuf_size");
- ret = -errno;
goto error_put_subbuf;
}
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
- ret = -errno;
goto end_unlock;
}
consumed_pos += stream->max_sb_size;
error_put_subbuf:
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
- ret = -errno;
ERR("Snapshot kernctl_put_subbuf error path");
}
end_unlock:
} else {
ret = consumer_add_channel(new_channel, ctx);
}
- if (CONSUMER_CHANNEL_TYPE_DATA) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ int monitor_start_ret;
+
+ DBG("Consumer starting monitor timer");
consumer_timer_live_start(new_channel,
msg.u.channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ new_channel,
+ msg.u.channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_nosignal;
+ }
+
}
health_code_update();
consumer_stream_free(new_stream);
goto end_nosignal;
}
+
+ /*
+ * If adding an extra stream to an already
+ * existing channel (e.g. cpu hotplug), we need
+ * to send the "streams_sent" command to relayd.
+ */
+ if (channel->streams_sent_to_relayd) {
+ ret = consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
}
/* Get the right pipe where the stream will be sent. */
if (ret < 0) {
goto end_nosignal;
}
+ channel->streams_sent_to_relayd = true;
}
break;
}
break;
}
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+ 1);
+ if (ret != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret = consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret) {
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ break;
+ }
default:
goto end_nosignal;
}
ret = kernctl_get_instance_id(infd, &index->stream_instance_id);
if (ret < 0) {
- PERROR("kernctl_get_instance_id");
- goto error;
+ if (ret == -ENOTTY) {
+ /* Command not implemented by lttng-modules. */
+ index->stream_instance_id = -1ULL;
+ ret = 0;
+ } else {
+ PERROR("kernctl_get_instance_id");
+ goto error;
+ }
}
index->stream_instance_id = htobe64(index->stream_instance_id);
ret = kernctl_get_sequence_number(infd, &index->packet_seq_num);
if (ret < 0) {
- PERROR("kernctl_get_sequence_number");
- goto error;
+ if (ret == -ENOTTY) {
+ /* Command not implemented by lttng-modules. */
+ index->packet_seq_num = -1ULL;
+ ret = 0;
+ } else {
+ PERROR("kernctl_get_sequence_number");
+ goto error;
+ }
}
index->packet_seq_num = htobe64(index->packet_seq_num);
ret = kernctl_snapshot(metadata->wait_fd);
if (ret < 0) {
- if (errno != EAGAIN) {
+ if (ret != -EAGAIN) {
ERR("Sync metadata, taking kernel snapshot failed.");
goto end;
}
ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
if (ret < 0) {
- PERROR("kernctl_get_sequence_number");
- goto end;
+ if (ret == -ENOTTY) {
+ /* Command not implemented by lttng-modules. */
+ seq = -1ULL;
+ ret = 0;
+ } else {
+ PERROR("kernctl_get_sequence_number");
+ goto end;
+ }
}
/*
}
if (discarded < stream->last_discarded_events) {
/*
- * Overflow has occured. We assume only one wrap-around
- * has occured.
+ * Overflow has occurred. We assume only one wrap-around
+ * has occurred.
*/
stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
stream->last_discarded_events + discarded;
ret = kernctl_get_metadata_version(infd, &cur_version);
if (ret < 0) {
+ if (ret == -ENOTTY) {
+ /*
+ * LTTng-modules does not implement this
+ * command.
+ */
+ ret = 0;
+ goto end;
+ }
ERR("Failed to get the metadata version");
goto end;
}
*/
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
- ret = -errno;
+ ret = err;
goto end;
}
PERROR("Getting sub-buffer len failed.");
err = kernctl_put_subbuf(infd);
if (err != 0) {
- if (errno == EFAULT) {
+ if (err == -EFAULT) {
PERROR("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
+ } else if (err == -EIO) {
/* Should never happen with newer LTTng versions */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
- ret = -errno;
+ ret = err;
goto end;
}
- ret = -errno;
+ ret = err;
goto end;
}
if (ret < 0) {
err = kernctl_put_subbuf(infd);
if (err != 0) {
- if (errno == EFAULT) {
+ if (err == -EFAULT) {
PERROR("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
+ } else if (err == -EIO) {
/* Should never happen with newer LTTng versions */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
- ret = -errno;
+ ret = err;
goto end;
}
goto end;
}
ret = update_stream_stats(stream);
if (ret < 0) {
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto end;
+ }
goto end;
}
} else {
write_index = 0;
ret = metadata_stream_check_version(infd, stream);
if (ret < 0) {
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto end;
+ }
goto end;
}
}
PERROR("Getting sub-buffer len failed.");
err = kernctl_put_subbuf(infd);
if (err != 0) {
- if (errno == EFAULT) {
+ if (err == -EFAULT) {
PERROR("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
+ } else if (err == -EIO) {
/* Should never happen with newer LTTng versions */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
- ret = -errno;
+ ret = err;
goto end;
}
- ret = -errno;
+ ret = err;
goto end;
}
err = kernctl_put_next_subbuf(infd);
if (err != 0) {
- if (errno == EFAULT) {
+ if (err == -EFAULT) {
PERROR("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
+ } else if (err == -EIO) {
/* Should never happen with newer LTTng versions */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
- ret = -errno;
+ ret = err;
goto end;
}
stream->tracefile_size_current = 0;
if (!stream->metadata_flag) {
- ret = index_create_file(stream->chan->pathname,
+ struct lttng_index_file *index_file;
+
+ index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!index_file) {
goto error;
}
- stream->index_fd = ret;
+ assert(!stream->index_file);
+ stream->index_file = index_file;
}
}
ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
if (ret != 0) {
PERROR("kernctl_get_mmap_len");
- ret = -errno;
goto error_close_fd;
}
stream->mmap_len = (size_t) mmap_len;