int infd = stream->wait_fd;
ret = kernctl_snapshot(infd);
- if (ret != 0) {
+ /*
+ * -EAGAIN is not an error, it just means that there is no data to
+ * be read.
+ */
+ if (ret != 0 && ret != -EAGAIN) {
PERROR("Getting sub-buffer snapshot.");
}
do {
health_code_update();
- ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL);
if (ret_read < 0) {
if (ret_read != -EAGAIN) {
ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
msg.u.stream.cpu,
&alloc_ret,
channel->type,
- channel->monitor);
+ channel->monitor,
+ msg.u.stream.trace_archive_id);
if (new_stream == NULL) {
switch (alloc_ret) {
case -ENOMEM:
new_stream->chan = channel;
new_stream->wait_fd = fd;
+ consumer_stream_update_channel_attributes(new_stream,
+ channel);
switch (channel->output) {
case CONSUMER_CHANNEL_SPLICE:
new_stream->output = LTTNG_EVENT_SPLICE;
goto end_nosignal;
}
- DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
- new_stream->name, fd, new_stream->relayd_stream_id);
+ DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
+ new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
break;
}
case LTTNG_CONSUMER_STREAMS_SENT:
}
break;
}
+ case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
+ {
+ int channel_rotate_pipe;
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_rotate_pipe, 1);
+ if (ret != (ssize_t) sizeof(channel_rotate_pipe)) {
+ ERR("Failed to receive channel rotate pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel rotate pipe (%d)", channel_rotate_pipe);
+ ctx->channel_rotate_pipe = channel_rotate_pipe;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_rotate_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel rotate pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_rotate_pipe, F_SETFL, flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel rotate pipe");
+ goto error_fatal;
+ }
+ DBG("Channel rotate pipe set as non-blocking");
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_ROTATE_CHANNEL:
+ {
+ DBG("Consumer rotate channel %" PRIu64, msg.u.rotate_channel.key);
+
+ /*
+ * Sample the rotate position of all the streams in this channel.
+ */
+ ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key,
+ msg.u.rotate_channel.pathname,
+ msg.u.rotate_channel.relayd_id,
+ msg.u.rotate_channel.metadata,
+ msg.u.rotate_channel.new_chunk_id,
+ ctx);
+ if (ret < 0) {
+ ERR("Rotate channel failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ /* Rotate the streams that are ready right now. */
+ ret = lttng_consumer_rotate_ready_streams(
+ msg.u.rotate_channel.key, ctx);
+ if (ret < 0) {
+ ERR("Rotate ready streams failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_ROTATE_RENAME:
+ {
+ DBG("Consumer rename session %" PRIu64 " after rotation, old path = \"%s\", new path = \"%s\"",
+ msg.u.rotate_rename.session_id,
+ msg.u.rotate_rename.old_path,
+ msg.u.rotate_rename.new_path);
+ ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path,
+ msg.u.rotate_rename.new_path,
+ msg.u.rotate_rename.uid,
+ msg.u.rotate_rename.gid,
+ msg.u.rotate_rename.relayd_id);
+ if (ret < 0) {
+ ERR("Rotate rename failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ {
+ uint32_t pending_reply;
+
+ DBG("Consumer rotate pending on relay for session %" PRIu64,
+ msg.u.rotate_pending_relay.session_id);
+ ret = lttng_consumer_rotate_pending_relay(
+ msg.u.rotate_pending_relay.session_id,
+ msg.u.rotate_pending_relay.relayd_id,
+ msg.u.rotate_pending_relay.chunk_id);
+ if (ret < 0) {
+ ERR("Rotate pending relay failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pending_reply = !!ret;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending_reply,
+ sizeof(pending_reply));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
+ goto error_fatal;
+ }
+ break;
+ }
case LTTNG_CONSUMER_MKDIR:
{
DBG("Consumer mkdir %s in session %" PRIu64,
* Consume data on a file descriptor and write it on a trace file.
*/
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx, bool *rotated)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1;
+ int err, write_index = 1, rotation_ret;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct ctf_packet_index index;
DBG("In read_subbuffer (infd : %d)", infd);
+ /*
+ * If the stream was flagged to be ready for rotation before we extract the
+ * next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before extracting data");
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ }
+
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
ret = err;
- goto end;
+ goto error;
}
/* Get the full subbuffer size including padding */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
if (!stream->metadata_flag) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
ret = update_stream_stats(stream);
if (ret < 0) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
} else {
write_index = 0;
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
}
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
/* Make sure the tracer is not gone mad on us! */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
/* Write index if needed. */
if (!write_index) {
- goto end;
+ goto rotate;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
pthread_mutex_unlock(&stream->metadata_timer_lock);
}
if (err < 0) {
- goto end;
+ goto error;
}
}
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
- goto end;
+ goto error;
}
-end:
+rotate:
+ /*
+ * After extracting the packet, we check if the stream is now ready to be
+ * rotated and perform the action immediately.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ } else if (rotation_ret < 0) {
+ ERR("Checking if stream is ready to rotate");
+ ret = -1;
+ goto error;
+ }
+
+error:
return ret;
}