+static
+int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuf)
+{
+ int ret;
+
+ ret = extract_common_subbuffer_info(stream, subbuf);
+ if (ret) {
+ goto end;
+ }
+
+ ret = kernctl_get_metadata_version(
+ stream->wait_fd, &subbuf->info.metadata.version);
+ if (ret) {
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+static
+int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuf)
+{
+ int ret;
+
+ ret = extract_common_subbuffer_info(stream, subbuf);
+ if (ret) {
+ goto end;
+ }
+
+ ret = kernctl_get_packet_size(
+ stream->wait_fd, &subbuf->info.data.packet_size);
+ if (ret < 0) {
+ PERROR("Failed to get sub-buffer packet size");
+ goto end;
+ }
+
+ ret = kernctl_get_content_size(
+ stream->wait_fd, &subbuf->info.data.content_size);
+ if (ret < 0) {
+ PERROR("Failed to get sub-buffer content size");
+ goto end;
+ }
+
+ ret = kernctl_get_timestamp_begin(
+ stream->wait_fd, &subbuf->info.data.timestamp_begin);
+ if (ret < 0) {
+ PERROR("Failed to get sub-buffer begin timestamp");
+ goto end;
+ }
+
+ ret = kernctl_get_timestamp_end(
+ stream->wait_fd, &subbuf->info.data.timestamp_end);
+ if (ret < 0) {
+ PERROR("Failed to get sub-buffer end timestamp");
+ goto end;
+ }
+
+ ret = kernctl_get_events_discarded(
+ stream->wait_fd, &subbuf->info.data.events_discarded);
+ if (ret) {
+ PERROR("Failed to get sub-buffer events discarded count");
+ goto end;
+ }
+
+ ret = kernctl_get_sequence_number(stream->wait_fd,
+ &subbuf->info.data.sequence_number.value);
+ if (ret) {
+ /* May not be supported by older LTTng-modules. */
+ if (ret != -ENOTTY) {
+ PERROR("Failed to get sub-buffer sequence number");
+ goto end;
+ }
+ } else {
+ subbuf->info.data.sequence_number.is_set = true;
+ }
+
+ ret = kernctl_get_stream_id(
+ stream->wait_fd, &subbuf->info.data.stream_id);
+ if (ret < 0) {
+ PERROR("Failed to get stream id");
+ goto end;
+ }
+
+ ret = kernctl_get_instance_id(stream->wait_fd,
+ &subbuf->info.data.stream_instance_id.value);
+ if (ret) {
+ /* May not be supported by older LTTng-modules. */
+ if (ret != -ENOTTY) {
+ PERROR("Failed to get stream instance id");
+ goto end;
+ }
+ } else {
+ subbuf->info.data.stream_instance_id.is_set = true;
+ }
+end:
+ return ret;
+}
+
+static
+int get_subbuffer_common(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ int ret;
+
+ ret = kernctl_get_next_subbuf(stream->wait_fd);
+ if (ret) {
+ goto end;
+ }
+
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+ stream, subbuffer);
+end:
+ return ret;
+}
+
+static
+int get_next_subbuffer_splice(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ int ret;
+
+ ret = get_subbuffer_common(stream, subbuffer);
+ if (ret) {
+ goto end;
+ }
+
+ subbuffer->buffer.fd = stream->wait_fd;
+end:
+ return ret;
+}
+
+static
+int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ int ret;
+ const char *addr;
+
+ ret = get_subbuffer_common(stream, subbuffer);
+ if (ret) {
+ goto end;
+ }
+
+ ret = get_current_subbuf_addr(stream, &addr);
+ if (ret) {
+ goto end;
+ }
+
+ subbuffer->buffer.buffer = lttng_buffer_view_init(
+ addr, 0, subbuffer->info.data.padded_subbuf_size);
+end:
+ return ret;
+}
+
+static
+int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ int ret;
+ const char *addr;
+ bool coherent;
+
+ ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
+ &coherent);
+ if (ret) {
+ goto end;
+ }
+
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+ stream, subbuffer);
+ if (ret) {
+ goto end;
+ }
+
+ LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+
+ ret = get_current_subbuf_addr(stream, &addr);
+ if (ret) {
+ goto end;
+ }
+
+ subbuffer->buffer.buffer = lttng_buffer_view_init(
+ addr, 0, subbuffer->info.data.padded_subbuf_size);
+ DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+ subbuffer->info.metadata.padded_subbuf_size,
+ coherent ? "true" : "false");
+end:
+ return ret;
+}
+
+static
+int put_next_subbuffer(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ const int ret = kernctl_put_next_subbuf(stream->wait_fd);
+
+ if (ret) {
+ if (ret == -EFAULT) {
+ PERROR("Error in unreserving sub buffer");
+ } else if (ret == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
+ }
+ }
+
+ return ret;
+}
+
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+ return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
+ -ENOTTY;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ if (stream->metadata_flag && stream->chan->is_live) {
+ DBG("Attempting to enable metadata bucketization for live consumers");
+ if (is_get_next_check_metadata_available(stream->wait_fd)) {
+ DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_metadata_check;
+ ret = consumer_stream_enable_metadata_bucketization(
+ stream);
+ if (ret) {
+ goto end;
+ }
+ } else {
+ /*
+ * The kernel tracer version is too old to indicate
+ * when the metadata stream has reached a "coherent"
+ * (parseable) point.
+ *
+ * This means that a live viewer may see an incoherent
+ * sequence of metadata and fail to parse it.
+ */
+ WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+ metadata_bucket_destroy(stream->metadata_bucket);
+ stream->metadata_bucket = NULL;
+ }
+ }
+
+ if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+ if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_mmap;
+ } else {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_splice;
+ }
+ }
+
+ if (stream->metadata_flag) {
+ stream->read_subbuffer_ops.extract_subbuffer_info =
+ extract_metadata_subbuffer_info;
+ } else {
+ stream->read_subbuffer_ops.extract_subbuffer_info =
+ extract_data_subbuffer_info;
+ if (stream->chan->is_live) {
+ stream->read_subbuffer_ops.send_live_beacon =
+ consumer_flush_kernel_index;
+ }
+ }
+
+ stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+ return ret;
+}
+