#include <inttypes.h>
#include <unistd.h>
#include <sys/stat.h>
+#include <stdint.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/optional.h>
#include <common/buffer-view.h>
#include <common/consumer/consumer.h>
-#include <stdint.h>
+#include <common/consumer/metadata-bucket.h>
#include "kernel-consumer.h"
subbuf_view = lttng_buffer_view_init(
subbuf_addr, 0, padded_len);
- read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+ read_len = lttng_consumer_on_read_subbuffer_mmap(
stream, &subbuf_view,
padded_len - len);
/*
case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
{
const struct lttng_credentials credentials = {
- .uid = msg.u.create_trace_chunk.credentials.value.uid,
- .gid = msg.u.create_trace_chunk.credentials.value.gid,
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
};
const bool is_local_trace =
!msg.u.create_trace_chunk.relayd_id.is_set;
msg.u.trace_chunk_exists.chunk_id);
goto end_msg_sessiond;
}
+ case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+ {
+ const uint64_t key = msg.u.open_channel_packets.key;
+ struct lttng_consumer_channel *channel =
+ consumer_find_channel(key);
+
+ if (channel) {
+ pthread_mutex_lock(&channel->lock);
+ ret_code = lttng_consumer_open_channel_packets(channel);
+ pthread_mutex_unlock(&channel->lock);
+ } else {
+ WARN("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+ goto end_msg_sessiond;
+ }
default:
goto end_nosignal;
}
* metadata thread can consumer them.
*
* Metadata stream lock MUST be acquired.
- *
- * Return 0 if new metadatda is available, EAGAIN if the metadata stream
- * is empty or a negative value on error.
*/
-int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+enum sync_metadata_status lttng_kconsumer_sync_metadata(
+ struct lttng_consumer_stream *metadata)
{
int ret;
+ enum sync_metadata_status status;
assert(metadata);
ret = kernctl_buffer_flush(metadata->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
+ status = SYNC_METADATA_STATUS_ERROR;
goto end;
}
ret = kernctl_snapshot(metadata->wait_fd);
if (ret < 0) {
- if (ret != -EAGAIN) {
+ if (errno == EAGAIN) {
+ /* No new metadata, exit. */
+ DBG("Sync metadata, no new kernel metadata");
+ status = SYNC_METADATA_STATUS_NO_DATA;
+ } else {
ERR("Sync metadata, taking kernel snapshot failed.");
- goto end;
+ status = SYNC_METADATA_STATUS_ERROR;
}
- DBG("Sync metadata, no new kernel metadata");
- /* No new metadata, exit. */
- ret = ENODATA;
- goto end;
+ } else {
+ status = SYNC_METADATA_STATUS_NEW_DATA;
}
end:
- return ret;
+ return status;
}
static
return ret;
}
+static
+int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ int ret;
+ const char *addr;
+ bool coherent;
+
+ ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
+ &coherent);
+ if (ret) {
+ goto end;
+ }
+
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+ stream, subbuffer);
+ if (ret) {
+ goto end;
+ }
+
+ LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+
+ ret = get_current_subbuf_addr(stream, &addr);
+ if (ret) {
+ goto end;
+ }
+
+ subbuffer->buffer.buffer = lttng_buffer_view_init(
+ addr, 0, subbuffer->info.data.padded_subbuf_size);
+ DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+ subbuffer->info.metadata.padded_subbuf_size,
+ coherent ? "true" : "false");
+end:
+ return ret;
+}
+
static
int put_next_subbuffer(struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
return ret;
}
-static void lttng_kconsumer_set_stream_ops(
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+ return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
+ -ENOTTY;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
struct lttng_consumer_stream *stream)
{
- if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_mmap;
- } else {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_splice;
+ int ret = 0;
+
+ if (stream->metadata_flag && stream->chan->is_live) {
+ DBG("Attempting to enable metadata bucketization for live consumers");
+ if (is_get_next_check_metadata_available(stream->wait_fd)) {
+ DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_metadata_check;
+ ret = consumer_stream_enable_metadata_bucketization(
+ stream);
+ if (ret) {
+ goto end;
+ }
+ } else {
+ /*
+ * The kernel tracer version is too old to indicate
+ * when the metadata stream has reached a "coherent"
+ * (parseable) point.
+ *
+ * This means that a live viewer may see an incoherent
+ * sequence of metadata and fail to parse it.
+ */
+ WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+ metadata_bucket_destroy(stream->metadata_bucket);
+ stream->metadata_bucket = NULL;
+ }
+ }
+
+ if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+ if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_mmap;
+ } else {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_splice;
+ }
}
if (stream->metadata_flag) {
}
stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+ return ret;
}
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
}
}
- lttng_kconsumer_set_stream_ops(stream);
+ ret = lttng_kconsumer_set_stream_ops(stream);
+ if (ret) {
+ goto error_close_fd;
+ }
/* we return 0 to let the library handle the FD internally */
return 0;