do {
health_code_update();
- ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL);
if (ret_read < 0) {
if (ret_read != -EAGAIN) {
ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
goto end_nosignal;
}
- DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
- new_stream->name, fd, new_stream->relayd_stream_id);
+ DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
+ new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
break;
}
case LTTNG_CONSUMER_STREAMS_SENT:
* Consume data on a file descriptor and write it on a trace file.
*/
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx, bool *rotated)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1;
+ int err, write_index = 1, rotation_ret;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct ctf_packet_index index;
DBG("In read_subbuffer (infd : %d)", infd);
+ /*
+ * If the stream was flagged to be ready for rotation before we extract the
+ * next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before extracting data");
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ }
+
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
ret = err;
- goto end;
+ goto error;
}
/* Get the full subbuffer size including padding */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
if (!stream->metadata_flag) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
ret = update_stream_stats(stream);
if (ret < 0) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
} else {
write_index = 0;
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
}
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
/* Make sure the tracer is not gone mad on us! */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
/* Write index if needed. */
if (!write_index) {
- goto end;
+ goto rotate;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
pthread_mutex_unlock(&stream->metadata_timer_lock);
}
if (err < 0) {
- goto end;
+ goto error;
}
}
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
- goto end;
+ goto error;
}
-end:
+rotate:
+ /*
+ * After extracting the packet, we check if the stream is now ready to be
+ * rotated and perform the action immediately.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ } else if (rotation_ret < 0) {
+ ERR("Checking if stream is ready to rotate");
+ ret = -1;
+ goto error;
+ }
+
+error:
return ret;
}