Generate local kernel and UST indexes
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 29ef3a4ce426d1db8755796fdafffceed43525b6..09ccda329526b69cf1e5aa147ed32f4dc8a67e81 100644 (file)
@@ -38,6 +38,7 @@
 #include <common/relayd/relayd.h>
 #include <common/utils.h>
 #include <common/consumer-stream.h>
+#include <common/index/index.h>
 
 #include "kernel-consumer.h"
 
@@ -57,8 +58,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 
        ret = kernctl_snapshot(infd);
        if (ret != 0) {
-               errno = -ret;
                perror("Getting sub-buffer snapshot.");
+               ret = -errno;
        }
 
        return ret;
@@ -77,8 +78,8 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_produced");
+               ret = -errno;
        }
 
        return ret;
@@ -97,95 +98,34 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_consumed(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_consumed");
+               ret = -errno;
        }
 
        return ret;
 }
 
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
-{
-       struct consumer_relayd_sock_pair *relayd;
-       int ret = 0;
-       char *stream_path;
-
-       if (path != NULL) {
-               stream_path = path;
-       } else {
-               stream_path = stream->chan->pathname;
-       }
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               /* Add stream on the relayd */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock,
-                               stream->name, stream_path,
-                               &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto end;
-               }
-               uatomic_inc(&relayd->refcount);
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
-               ret = -1;
-               goto end;
-       }
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
-       struct consumer_relayd_sock_pair *relayd;
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               consumer_stream_relayd_close(stream, relayd);
-       }
-       rcu_read_unlock();
-}
-
 /*
  * Take a snapshot of all the stream of a channel
  *
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
-               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+               uint64_t relayd_id, uint64_t max_stream_size,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned long consumed_pos, produced_pos;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
 
-       DBG("Kernel consumer snapshot channel %lu", key);
+       DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        rcu_read_lock();
 
        channel = consumer_find_channel(key);
        if (!channel) {
-               ERR("No channel found for key %lu", key);
+               ERR("No channel found for key %" PRIu64, key);
                ret = -1;
                goto end;
        }
@@ -197,26 +137,29 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                goto end;
        }
 
-       cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
-                       no_monitor_node) {
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /*
                 * Lock stream because we are about to change its state.
                 */
                pthread_mutex_lock(&stream->lock);
 
+               /*
+                * Assign the received relayd ID so we can use it for streaming. The streams
+                * are not visible to anyone so this is OK to change it.
+                */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
-                       ret = send_relayd_stream(stream, path);
+                       ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
-                       DBG("Stream %s sent to the relayd", stream->name);
                } else {
                        ret = utils_create_stream_file(path, stream->name,
-                                       stream->chan->tracefile_size, stream->tracefile_count_current,
-                                       stream->uid, stream->gid);
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
+                                       stream->uid, stream->gid, NULL);
                        if (ret < 0) {
                                ERR("utils_create_stream_file");
                                goto end_unlock;
@@ -225,13 +168,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        stream->out_fd = ret;
                        stream->tracefile_size_current = 0;
 
-                       DBG("Kernel consumer snapshot stream %s/%s (%lu)", path,
-                                       stream->name, stream->key);
+                       DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
+                                       path, stream->name, stream->key);
                }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel metadata stream");
+                       ERR("Failed to flush kernel stream");
+                       ret = -errno;
                        goto end_unlock;
                }
 
@@ -258,10 +202,20 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                        &stream->max_sb_size);
                        if (ret < 0) {
                                ERR("Getting kernel max_sb_size");
+                               ret = -errno;
                                goto end_unlock;
                        }
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
@@ -272,6 +226,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        if (ret < 0) {
                                if (errno != EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
+                                       ret = -errno;
                                        goto end_unlock;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
@@ -282,22 +237,23 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               goto end_unlock;
+                               ret = -errno;
+                               goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               goto end_unlock;
+                               ret = -errno;
+                               goto error_put_subbuf;
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
-                                       padded_len - len);
+                                       padded_len - len, NULL);
                        /*
-                        * We write the padded len in local tracefiles but the
-                        * data len when using a relay.
-                        * Display the error but continue processing to try to
-                        * release the subbuffer.
+                        * We write the padded len in local tracefiles but the data len
+                        * when using a relay. Display the error but continue processing
+                        * to try to release the subbuffer.
                         */
                        if (relayd_id != (uint64_t) -1ULL) {
                                if (read_len != len) {
@@ -314,18 +270,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_put_subbuf(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_put_subbuf");
+                               ret = -errno;
                                goto end_unlock;
                        }
                        consumed_pos += stream->max_sb_size;
                }
 
                if (relayd_id == (uint64_t) -1ULL) {
-                       ret = close(stream->out_fd);
-                       if (ret < 0) {
-                               PERROR("Kernel consumer snapshot close out_fd");
-                               goto end_unlock;
+                       if (stream->out_fd >= 0) {
+                               ret = close(stream->out_fd);
+                               if (ret < 0) {
+                                       PERROR("Kernel consumer snapshot close out_fd");
+                                       goto end_unlock;
+                               }
+                               stream->out_fd = -1;
                        }
-                       stream->out_fd = -1;
                } else {
                        close_relayd_stream(stream);
                        stream->net_seq_idx = (uint64_t) -1ULL;
@@ -337,6 +296,12 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        ret = 0;
        goto end;
 
+error_put_subbuf:
+       ret = kernctl_put_subbuf(stream->wait_fd);
+       if (ret < 0) {
+               ret = -errno;
+               ERR("Snapshot kernctl_put_subbuf error path");
+       }
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
 end:
@@ -350,11 +315,14 @@ end:
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
 {
+       int ret, use_relayd = 0;
+       ssize_t ret_read;
        struct lttng_consumer_channel *metadata_channel;
        struct lttng_consumer_stream *metadata_stream;
-       int ret;
+
+       assert(ctx);
 
        DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
                        key, path);
@@ -363,38 +331,71 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
 
        metadata_channel = consumer_find_channel(key);
        if (!metadata_channel) {
-               ERR("Snapshot kernel metadata channel not found for key %lu", key);
+               ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
                ret = -1;
-               goto end;
+               goto error;
        }
 
        metadata_stream = metadata_channel->metadata_stream;
        assert(metadata_stream);
 
-       ret = utils_create_stream_file(path, metadata_stream->name,
-                       metadata_stream->chan->tracefile_size,
-                       metadata_stream->tracefile_count_current,
-                       metadata_stream->uid, metadata_stream->gid);
-       if (ret < 0) {
-               goto end;
+       /* Flag once that we have a valid relayd for the stream. */
+       if (relayd_id != (uint64_t) -1ULL) {
+               use_relayd = 1;
        }
-       metadata_stream->out_fd = ret;
 
-       ret = 0;
-       while (ret >= 0) {
-               ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+       if (use_relayd) {
+               ret = consumer_send_relayd_stream(metadata_stream, path);
+               if (ret < 0) {
+                       goto error;
+               }
+       } else {
+               ret = utils_create_stream_file(path, metadata_stream->name,
+                               metadata_stream->chan->tracefile_size,
+                               metadata_stream->tracefile_count_current,
+                               metadata_stream->uid, metadata_stream->gid, NULL);
                if (ret < 0) {
-                       if (ret != -EPERM) {
-                               ERR("Kernel snapshot reading subbuffer");
-                               goto end;
+                       goto error;
+               }
+               metadata_stream->out_fd = ret;
+       }
+
+       do {
+               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               if (ret_read < 0) {
+                       if (ret_read != -EAGAIN) {
+                               ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
+                                               ret_read);
+                               goto error;
                        }
-                       /* "ret" is negative at this point so we will exit the loop. */
+                       /* ret_read is negative at this point so we will exit the loop. */
                        continue;
                }
+       } while (ret_read >= 0);
+
+       if (use_relayd) {
+               close_relayd_stream(metadata_stream);
+               metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+       } else {
+               if (metadata_stream->out_fd >= 0) {
+                       ret = close(metadata_stream->out_fd);
+                       if (ret < 0) {
+                               PERROR("Kernel consumer snapshot metadata close out_fd");
+                               /*
+                                * Don't go on error here since the snapshot was successful at this
+                                * point but somehow the close failed.
+                                */
+                       }
+                       metadata_stream->out_fd = -1;
+               }
        }
 
        ret = 0;
-end:
+
+       cds_list_del(&metadata_stream->send_node);
+       consumer_stream_destroy(metadata_stream, NULL);
+       metadata_channel->metadata_stream = NULL;
+error:
        rcu_read_unlock();
        return ret;
 }
@@ -413,8 +414,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                if (ret > 0) {
+                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                        ret = -1;
                }
                return ret;
@@ -460,13 +461,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid,
                                msg.u.channel.relayd_id, msg.u.channel.output,
                                msg.u.channel.tracefile_size,
-                               msg.u.channel.tracefile_count,
+                               msg.u.channel.tracefile_count, 0,
                                msg.u.channel.monitor);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
                }
                new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
+               switch (msg.u.channel.output) {
+               case LTTNG_EVENT_SPLICE:
+                       new_channel->output = CONSUMER_CHANNEL_SPLICE;
+                       break;
+               case LTTNG_EVENT_MMAP:
+                       new_channel->output = CONSUMER_CHANNEL_MMAP;
+                       break;
+               default:
+                       ERR("Channel output unknown %d", msg.u.channel.output);
+                       goto end_nosignal;
+               }
 
                /* Translate and save channel type. */
                switch (msg.u.channel.type) {
@@ -526,20 +538,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
-                       /*
-                        * Somehow, the session daemon is not responding
-                        * anymore.
-                        */
+                       /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
                if (ret_code != LTTNG_OK) {
-                       /*
-                        * Channel was not found.
-                        */
+                       /* Channel was not found. */
                        goto end_nosignal;
                }
 
-               /* block */
+               /* Blocking call */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                        rcu_read_unlock();
                        return -EINTR;
@@ -574,7 +581,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                channel->session_id,
                                msg.u.stream.cpu,
                                &alloc_ret,
-                               channel->type);
+                               channel->type,
+                               channel->monitor);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
@@ -585,6 +593,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                        goto end_nosignal;
                }
+
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
                switch (channel->output) {
@@ -619,16 +628,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
-               ret = send_relayd_stream(new_stream, NULL);
-               if (ret < 0) {
-                       consumer_del_stream(new_stream, NULL);
-                       goto end_nosignal;
-               }
-
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
-                               consumer_del_stream(new_stream, NULL);
+                               consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
                }
@@ -639,27 +642,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Do not monitor this stream. */
                if (!channel->monitor) {
-                       DBG("Kernel consumer add stream %s in no monitor mode with"
+                       DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
-                                       new_stream->relayd_stream_id);
-                       cds_list_add(&new_stream->no_monitor_node,
-                                       &channel->stream_no_monitor_list.head);
+                                       new_stream->net_seq_idx);
+                       cds_list_add(&new_stream->send_node, &channel->streams.head);
                        break;
                }
 
+               /* Send stream to relayd if the stream has an ID. */
+               if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_stream(new_stream,
+                                       new_stream->chan->pathname);
+                       if (ret < 0) {
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
+               }
+
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
+                       ret = consumer_add_metadata_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
+                       ret = consumer_add_data_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
+               /* Vitible to other threads */
+               new_stream->globally_visible = 1;
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_del_stream(new_stream, NULL);
+                       if (new_stream->metadata_flag) {
+                               consumer_del_stream_for_metadata(new_stream);
+                       } else {
+                               consumer_del_stream_for_data(new_stream);
+                       }
                        goto end_nosignal;
                }
 
@@ -734,7 +767,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                if (msg.u.snapshot_channel.metadata == 1) {
                        ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key,
-                                       msg.u.snapshot_channel.pathname, ctx);
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id, ctx);
                        if (ret < 0) {
                                ERR("Snapshot metadata failed");
                                ret_code = LTTNG_ERR_KERN_META_FAIL;
@@ -742,7 +776,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
-                                       msg.u.snapshot_channel.relayd_id, ctx);
+                                       msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
+                                       ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
                                ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
@@ -808,6 +844,61 @@ error_fatal:
        return -1;
 }
 
+/*
+ * Populate index values of a kernel stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct lttng_packet_index *index, int infd)
+{
+       int ret;
+
+       ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin);
+       if (ret < 0) {
+               PERROR("kernctl_get_timestamp_begin");
+               goto error;
+       }
+       index->timestamp_begin = htobe64(index->timestamp_begin);
+
+       ret = kernctl_get_timestamp_end(infd, &index->timestamp_end);
+       if (ret < 0) {
+               PERROR("kernctl_get_timestamp_end");
+               goto error;
+       }
+       index->timestamp_end = htobe64(index->timestamp_end);
+
+       ret = kernctl_get_events_discarded(infd, &index->events_discarded);
+       if (ret < 0) {
+               PERROR("kernctl_get_events_discarded");
+               goto error;
+       }
+       index->events_discarded = htobe64(index->events_discarded);
+
+       ret = kernctl_get_content_size(infd, &index->content_size);
+       if (ret < 0) {
+               PERROR("kernctl_get_content_size");
+               goto error;
+       }
+       index->content_size = htobe64(index->content_size);
+
+       ret = kernctl_get_packet_size(infd, &index->packet_size);
+       if (ret < 0) {
+               PERROR("kernctl_get_packet_size");
+               goto error;
+       }
+       index->packet_size = htobe64(index->packet_size);
+
+       ret = kernctl_get_stream_id(infd, &index->stream_id);
+       if (ret < 0) {
+               PERROR("kernctl_get_stream_id");
+               goto error;
+       }
+       index->stream_id = htobe64(index->stream_id);
+
+error:
+       return ret;
+}
+
 /*
  * Consume data on a file descriptor and write it on a trace file.
  */
@@ -815,15 +906,21 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err;
+       int err, write_index = 0;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
+       struct lttng_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
+
+       /* Indicate that for this stream we have to write the index. */
+       if (stream->index_fd >= 0) {
+               write_index = 1;
+       }
+
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
-               ret = err;
                /*
                 * This is a debug message even for single-threaded consumer,
                 * because poll() have more relaxed criterions than get subbuf,
@@ -832,18 +929,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
+               ret = -errno;
                goto end;
        }
 
        /* Get the full subbuffer size including padding */
        err = kernctl_get_padded_subbuf_size(infd, &len);
        if (err != 0) {
-               errno = -err;
                perror("Getting sub-buffer len failed.");
-               ret = err;
+               ret = -errno;
                goto end;
        }
 
+       if (!stream->metadata_flag && write_index) {
+               ret = get_index_values(&index, infd);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        switch (stream->chan->output) {
        case CONSUMER_CHANNEL_SPLICE:
                /*
@@ -856,7 +960,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                /* splice the subbuffer to the tracefile */
                ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
-                               padding);
+                               padding, &index);
                /*
                 * XXX: Splice does not support network streaming so the return value
                 * is simply checked against subbuf_size and not like the mmap() op.
@@ -868,15 +972,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                         */
                        ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
                                        ret, subbuf_size);
+                       write_index = 0;
                }
                break;
        case CONSUMER_CHANNEL_MMAP:
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
                if (err != 0) {
-                       errno = -err;
                        perror("Getting sub-buffer len failed.");
-                       ret = err;
+                       ret = -errno;
                        goto end;
                }
 
@@ -887,7 +991,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                /* write the subbuffer to the tracefile */
                ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
-                               padding);
+                               padding, &index);
                /*
                 * The mmap operation should write subbuf_size amount of data when
                 * network streaming or the full padding (len) size when we are _not_
@@ -902,27 +1006,35 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ERR("Error writing to tracefile "
                                        "(ret: %zd != len: %lu != subbuf_size: %lu)",
                                        ret, len, subbuf_size);
+                       write_index = 0;
                }
                break;
        default:
                ERR("Unknown output method");
-               ret = -1;
+               ret = -EPERM;
        }
 
        err = kernctl_put_next_subbuf(infd);
        if (err != 0) {
-               errno = -err;
                if (errno == EFAULT) {
                        perror("Error in unreserving sub buffer\n");
                } else if (errno == EIO) {
                        /* Should never happen with newer LTTng versions */
                        perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
-
-               ret = -err;
+               ret = -errno;
                goto end;
        }
 
+       /* Write index if needed. */
+       if (write_index) {
+               err = index_write(stream->index_fd, &index, sizeof(index));
+               if (err < 0) {
+                       ret = -1;
+                       goto end;
+               }
+       }
+
 end:
        return ret;
 }
@@ -940,12 +1052,23 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
-                               stream->uid, stream->gid);
+                               stream->uid, stream->gid, NULL);
                if (ret < 0) {
                        goto error;
                }
                stream->out_fd = ret;
                stream->tracefile_size_current = 0;
+
+               if (!stream->metadata_flag) {
+                       ret = index_create_file(stream->chan->pathname,
+                                       stream->name, stream->uid, stream->gid,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current);
+                       if (ret < 0) {
+                               goto error;
+                       }
+                       stream->index_fd = ret;
+               }
        }
 
        if (stream->output == LTTNG_EVENT_MMAP) {
@@ -954,8 +1077,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 
                ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
                if (ret != 0) {
-                       errno = -ret;
                        PERROR("kernctl_get_mmap_len");
+                       ret = -errno;
                        goto error_close_fd;
                }
                stream->mmap_len = (size_t) mmap_len;
@@ -973,11 +1096,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        return 0;
 
 error_close_fd:
-       {
+       if (stream->out_fd >= 0) {
                int err;
 
                err = close(stream->out_fd);
                assert(!err);
+               stream->out_fd = -1;
        }
 error:
        return ret;
@@ -997,6 +1121,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
+       if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+               ret = 0;
+               goto end;
+       }
+
        ret = kernctl_get_next_subbuf(stream->wait_fd);
        if (ret == 0) {
                /* There is still data so let's put back this subbuffer. */
This page took 0.033518 seconds and 4 git commands to generate.