Generate local kernel and UST indexes
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 21ac08f1b4d278c3a4fd35226508e274903673ce..09ccda329526b69cf1e5aa147ed32f4dc8a67e81 100644 (file)
@@ -38,6 +38,7 @@
 #include <common/relayd/relayd.h>
 #include <common/utils.h>
 #include <common/consumer-stream.h>
+#include <common/index/index.h>
 
 #include "kernel-consumer.h"
 
@@ -57,8 +58,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 
        ret = kernctl_snapshot(infd);
        if (ret != 0) {
-               errno = -ret;
                perror("Getting sub-buffer snapshot.");
+               ret = -errno;
        }
 
        return ret;
@@ -77,8 +78,8 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_produced");
+               ret = -errno;
        }
 
        return ret;
@@ -97,8 +98,8 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_consumed(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_consumed");
+               ret = -errno;
        }
 
        return ret;
@@ -118,13 +119,13 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
 
-       DBG("Kernel consumer snapshot channel %lu", key);
+       DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        rcu_read_lock();
 
        channel = consumer_find_channel(key);
        if (!channel) {
-               ERR("No channel found for key %lu", key);
+               ERR("No channel found for key %" PRIu64, key);
                ret = -1;
                goto end;
        }
@@ -158,7 +159,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = utils_create_stream_file(path, stream->name,
                                        stream->chan->tracefile_size,
                                        stream->tracefile_count_current,
-                                       stream->uid, stream->gid);
+                                       stream->uid, stream->gid, NULL);
                        if (ret < 0) {
                                ERR("utils_create_stream_file");
                                goto end_unlock;
@@ -167,13 +168,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        stream->out_fd = ret;
                        stream->tracefile_size_current = 0;
 
-                       DBG("Kernel consumer snapshot stream %s/%s (%lu)", path,
-                                       stream->name, stream->key);
+                       DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
+                                       path, stream->name, stream->key);
                }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
                        ERR("Failed to flush kernel stream");
+                       ret = -errno;
                        goto end_unlock;
                }
 
@@ -200,6 +202,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                        &stream->max_sb_size);
                        if (ret < 0) {
                                ERR("Getting kernel max_sb_size");
+                               ret = -errno;
                                goto end_unlock;
                        }
                }
@@ -223,6 +226,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        if (ret < 0) {
                                if (errno != EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
+                                       ret = -errno;
                                        goto end_unlock;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
@@ -233,17 +237,19 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
+                               ret = -errno;
                                goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
+                               ret = -errno;
                                goto error_put_subbuf;
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
-                                       padded_len - len);
+                                       padded_len - len, NULL);
                        /*
                         * We write the padded len in local tracefiles but the data len
                         * when using a relay. Display the error but continue processing
@@ -264,6 +270,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_put_subbuf(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_put_subbuf");
+                               ret = -errno;
                                goto end_unlock;
                        }
                        consumed_pos += stream->max_sb_size;
@@ -292,6 +299,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 error_put_subbuf:
        ret = kernctl_put_subbuf(stream->wait_fd);
        if (ret < 0) {
+               ret = -errno;
                ERR("Snapshot kernctl_put_subbuf error path");
        }
 end_unlock:
@@ -345,7 +353,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
                ret = utils_create_stream_file(path, metadata_stream->name,
                                metadata_stream->chan->tracefile_size,
                                metadata_stream->tracefile_count_current,
-                               metadata_stream->uid, metadata_stream->gid);
+                               metadata_stream->uid, metadata_stream->gid, NULL);
                if (ret < 0) {
                        goto error;
                }
@@ -355,8 +363,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        do {
                ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
                if (ret_read < 0) {
-                       if (ret_read != -EPERM) {
-                               ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)",
+                       if (ret_read != -EAGAIN) {
+                               ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
                                                ret_read);
                                goto error;
                        }
@@ -653,21 +661,40 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
+                       ret = consumer_add_metadata_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
+                       ret = consumer_add_data_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
+               /* Vitible to other threads */
+               new_stream->globally_visible = 1;
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_stream_free(new_stream);
+                       if (new_stream->metadata_flag) {
+                               consumer_del_stream_for_metadata(new_stream);
+                       } else {
+                               consumer_del_stream_for_data(new_stream);
+                       }
                        goto end_nosignal;
                }
-               /* Successfully sent to the right thread. */
-               new_stream->globally_visible = 1;
 
                DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
                                new_stream->name, fd, new_stream->relayd_stream_id);
@@ -817,6 +844,61 @@ error_fatal:
        return -1;
 }
 
+/*
+ * Populate index values of a kernel stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct lttng_packet_index *index, int infd)
+{
+       int ret;
+
+       ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin);
+       if (ret < 0) {
+               PERROR("kernctl_get_timestamp_begin");
+               goto error;
+       }
+       index->timestamp_begin = htobe64(index->timestamp_begin);
+
+       ret = kernctl_get_timestamp_end(infd, &index->timestamp_end);
+       if (ret < 0) {
+               PERROR("kernctl_get_timestamp_end");
+               goto error;
+       }
+       index->timestamp_end = htobe64(index->timestamp_end);
+
+       ret = kernctl_get_events_discarded(infd, &index->events_discarded);
+       if (ret < 0) {
+               PERROR("kernctl_get_events_discarded");
+               goto error;
+       }
+       index->events_discarded = htobe64(index->events_discarded);
+
+       ret = kernctl_get_content_size(infd, &index->content_size);
+       if (ret < 0) {
+               PERROR("kernctl_get_content_size");
+               goto error;
+       }
+       index->content_size = htobe64(index->content_size);
+
+       ret = kernctl_get_packet_size(infd, &index->packet_size);
+       if (ret < 0) {
+               PERROR("kernctl_get_packet_size");
+               goto error;
+       }
+       index->packet_size = htobe64(index->packet_size);
+
+       ret = kernctl_get_stream_id(infd, &index->stream_id);
+       if (ret < 0) {
+               PERROR("kernctl_get_stream_id");
+               goto error;
+       }
+       index->stream_id = htobe64(index->stream_id);
+
+error:
+       return ret;
+}
+
 /*
  * Consume data on a file descriptor and write it on a trace file.
  */
@@ -824,15 +906,21 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err;
+       int err, write_index = 0;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
+       struct lttng_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
+
+       /* Indicate that for this stream we have to write the index. */
+       if (stream->index_fd >= 0) {
+               write_index = 1;
+       }
+
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
-               ret = err;
                /*
                 * This is a debug message even for single-threaded consumer,
                 * because poll() have more relaxed criterions than get subbuf,
@@ -841,18 +929,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
+               ret = -errno;
                goto end;
        }
 
        /* Get the full subbuffer size including padding */
        err = kernctl_get_padded_subbuf_size(infd, &len);
        if (err != 0) {
-               errno = -err;
                perror("Getting sub-buffer len failed.");
-               ret = err;
+               ret = -errno;
                goto end;
        }
 
+       if (!stream->metadata_flag && write_index) {
+               ret = get_index_values(&index, infd);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        switch (stream->chan->output) {
        case CONSUMER_CHANNEL_SPLICE:
                /*
@@ -865,7 +960,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                /* splice the subbuffer to the tracefile */
                ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
-                               padding);
+                               padding, &index);
                /*
                 * XXX: Splice does not support network streaming so the return value
                 * is simply checked against subbuf_size and not like the mmap() op.
@@ -877,15 +972,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                         */
                        ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
                                        ret, subbuf_size);
+                       write_index = 0;
                }
                break;
        case CONSUMER_CHANNEL_MMAP:
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
                if (err != 0) {
-                       errno = -err;
                        perror("Getting sub-buffer len failed.");
-                       ret = err;
+                       ret = -errno;
                        goto end;
                }
 
@@ -896,7 +991,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                /* write the subbuffer to the tracefile */
                ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
-                               padding);
+                               padding, &index);
                /*
                 * The mmap operation should write subbuf_size amount of data when
                 * network streaming or the full padding (len) size when we are _not_
@@ -911,27 +1006,35 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ERR("Error writing to tracefile "
                                        "(ret: %zd != len: %lu != subbuf_size: %lu)",
                                        ret, len, subbuf_size);
+                       write_index = 0;
                }
                break;
        default:
                ERR("Unknown output method");
-               ret = -1;
+               ret = -EPERM;
        }
 
        err = kernctl_put_next_subbuf(infd);
        if (err != 0) {
-               errno = -err;
                if (errno == EFAULT) {
                        perror("Error in unreserving sub buffer\n");
                } else if (errno == EIO) {
                        /* Should never happen with newer LTTng versions */
                        perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
-
-               ret = -err;
+               ret = -errno;
                goto end;
        }
 
+       /* Write index if needed. */
+       if (write_index) {
+               err = index_write(stream->index_fd, &index, sizeof(index));
+               if (err < 0) {
+                       ret = -1;
+                       goto end;
+               }
+       }
+
 end:
        return ret;
 }
@@ -949,12 +1052,23 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
-                               stream->uid, stream->gid);
+                               stream->uid, stream->gid, NULL);
                if (ret < 0) {
                        goto error;
                }
                stream->out_fd = ret;
                stream->tracefile_size_current = 0;
+
+               if (!stream->metadata_flag) {
+                       ret = index_create_file(stream->chan->pathname,
+                                       stream->name, stream->uid, stream->gid,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current);
+                       if (ret < 0) {
+                               goto error;
+                       }
+                       stream->index_fd = ret;
+               }
        }
 
        if (stream->output == LTTNG_EVENT_MMAP) {
@@ -963,8 +1077,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 
                ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
                if (ret != 0) {
-                       errno = -ret;
                        PERROR("kernctl_get_mmap_len");
+                       ret = -errno;
                        goto error_close_fd;
                }
                stream->mmap_len = (size_t) mmap_len;
@@ -1007,6 +1121,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
+       if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+               ret = 0;
+               goto end;
+       }
+
        ret = kernctl_get_next_subbuf(stream->wait_fd);
        if (ret == 0) {
                /* There is still data so let's put back this subbuffer. */
This page took 0.028863 seconds and 4 git commands to generate.