Live timer set up
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 9b1675e8be94b89b560dcd77db1e36e5fedb8775..f0147af4eee84b83164ea692601cb19908729766 100644 (file)
@@ -40,6 +40,7 @@
 #include <common/consumer-stream.h>
 #include <common/consumer-timer.h>
 #include <common/utils.h>
+#include <common/index/index.h>
 
 #include "ust-consumer.h"
 
@@ -116,14 +117,15 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
                const char *pathname, const char *name, uid_t uid, gid_t gid,
                uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
                uint64_t tracefile_size, uint64_t tracefile_count,
-               uint64_t session_id_per_pid, unsigned int monitor)
+               uint64_t session_id_per_pid, unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        assert(pathname);
        assert(name);
 
        return consumer_allocate_channel(key, session_id, pathname, name, uid,
                        gid, relayd_id, output, tracefile_size,
-                       tracefile_count, session_id_per_pid, monitor);
+                       tracefile_count, session_id_per_pid, monitor, live_timer_interval);
 }
 
 /*
@@ -605,6 +607,45 @@ error:
        rcu_read_unlock();
        return ret;
 }
+/*
+ * Close metadata stream wakeup_fd using the given key to retrieve the channel.
+ * RCU read side lock MUST be acquired before calling this function.
+ *
+ * NOTE: This function does NOT take any channel nor stream lock.
+ *
+ * Return 0 on success else LTTng error code.
+ */
+static int _close_metadata(struct lttng_consumer_channel *channel)
+{
+       int ret = LTTNG_OK;
+
+       assert(channel);
+       assert(channel->type == CONSUMER_CHANNEL_TYPE_METADATA);
+
+       if (channel->switch_timer_enabled == 1) {
+               DBG("Deleting timer on metadata channel");
+               consumer_timer_switch_stop(channel);
+       }
+
+       if (channel->metadata_stream) {
+               ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
+               if (ret < 0) {
+                       ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
+                       ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               }
+
+               if (channel->monitor) {
+                       /* Close the read-side in consumer_del_metadata_stream */
+                       ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
+                       if (ret < 0) {
+                               PERROR("Close UST metadata write-side poll pipe");
+                               ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
+                       }
+               }
+       }
+
+       return ret;
+}
 
 /*
  * Close metadata stream wakeup_fd using the given key to retrieve the channel.
@@ -639,26 +680,7 @@ static int close_metadata(uint64_t chan_key)
                goto error_unlock;
        }
 
-       if (channel->switch_timer_enabled == 1) {
-               DBG("Deleting timer on metadata channel");
-               consumer_timer_switch_stop(channel);
-       }
-
-       if (channel->metadata_stream) {
-               ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
-               if (ret < 0) {
-                       ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
-                       ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
-                       goto error_unlock;
-               }
-               if (channel->monitor) {
-                       /* close the read-side in consumer_del_metadata_stream */
-                       ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]);
-                       if (ret < 0) {
-                               PERROR("Close UST metadata write-side poll pipe");
-                       }
-               }
-       }
+       ret = _close_metadata(channel);
 
 error_unlock:
        pthread_mutex_unlock(&channel->lock);
@@ -805,7 +827,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
                ret = utils_create_stream_file(path, metadata_stream->name,
                                metadata_stream->chan->tracefile_size,
                                metadata_stream->tracefile_count_current,
-                               metadata_stream->uid, metadata_stream->gid);
+                               metadata_stream->uid, metadata_stream->gid, NULL);
                if (ret < 0) {
                        goto error_stream;
                }
@@ -885,7 +907,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        ret = utils_create_stream_file(path, stream->name,
                                        stream->chan->tracefile_size,
                                        stream->tracefile_count_current,
-                                       stream->uid, stream->gid);
+                                       stream->uid, stream->gid, NULL);
                        if (ret < 0) {
                                goto error_unlock;
                        }
@@ -955,7 +977,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
-                                       padded_len - len);
+                                       padded_len - len, NULL);
                        if (use_relayd) {
                                if (read_len != len) {
                                        ret = -EPERM;
@@ -1173,7 +1195,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.ask_channel.tracefile_size,
                                msg.u.ask_channel.tracefile_count,
                                msg.u.ask_channel.session_id_per_pid,
-                               msg.u.ask_channel.monitor);
+                               msg.u.ask_channel.monitor,
+                               msg.u.ask_channel.live_timer_interval);
                if (!channel) {
                        goto end_channel_error;
                }
@@ -1580,14 +1603,72 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
        ustctl_destroy_stream(stream->ustream);
 }
 
+/*
+ * Populate index values of a UST stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct lttng_packet_index *index,
+               struct ustctl_consumer_stream *ustream)
+{
+       int ret;
+
+       ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
+       if (ret < 0) {
+               PERROR("ustctl_get_timestamp_begin");
+               goto error;
+       }
+       index->timestamp_begin = htobe64(index->timestamp_begin);
+
+       ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
+       if (ret < 0) {
+               PERROR("ustctl_get_timestamp_end");
+               goto error;
+       }
+       index->timestamp_end = htobe64(index->timestamp_end);
+
+       ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
+       if (ret < 0) {
+               PERROR("ustctl_get_events_discarded");
+               goto error;
+       }
+       index->events_discarded = htobe64(index->events_discarded);
+
+       ret = ustctl_get_content_size(ustream, &index->content_size);
+       if (ret < 0) {
+               PERROR("ustctl_get_content_size");
+               goto error;
+       }
+       index->content_size = htobe64(index->content_size);
+
+       ret = ustctl_get_packet_size(ustream, &index->packet_size);
+       if (ret < 0) {
+               PERROR("ustctl_get_packet_size");
+               goto error;
+       }
+       index->packet_size = htobe64(index->packet_size);
+
+       ret = ustctl_get_stream_id(ustream, &index->stream_id);
+       if (ret < 0) {
+               PERROR("ustctl_get_stream_id");
+               goto error;
+       }
+       index->stream_id = htobe64(index->stream_id);
+
+error:
+       return ret;
+}
+
+
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err;
+       int err, write_index = 0;
        long ret = 0;
        char dummy;
        struct ustctl_consumer_stream *ustream;
+       struct lttng_packet_index index;
 
        assert(stream);
        assert(stream->ustream);
@@ -1599,6 +1680,11 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        /* Ease our life for what's next. */
        ustream = stream->ustream;
 
+       /* Indicate that for this stream we have to write the index. */
+       if (stream->index_fd >= 0) {
+               write_index = 1;
+       }
+
        /* We can consume the 1 byte written into the wait_fd by UST */
        if (stream->monitor && !stream->hangup_flush_done) {
                ssize_t readlen;
@@ -1656,6 +1742,15 @@ retry:
                goto end;
        }
        assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+       if (!stream->metadata_flag && write_index) {
+               index.offset = htobe64(stream->out_fd_offset);
+               ret = get_index_values(&index, ustream);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        /* Get the full padded subbuffer size */
        err = ustctl_get_padded_subbuf_size(ustream, &len);
        assert(err == 0);
@@ -1669,7 +1764,7 @@ retry:
 
        padding = len - subbuf_size;
        /* write the subbuffer to the tracefile */
-       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
+       ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
        /*
         * The mmap operation should write subbuf_size amount of data when network
         * streaming or the full padding (len) size when we are _not_ streaming.
@@ -1687,10 +1782,20 @@ retry:
                DBG("Error writing to tracefile "
                                "(ret: %ld != len: %lu != subbuf_size: %lu)",
                                ret, len, subbuf_size);
+               write_index = 0;
        }
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
+       /* Write index if needed. */
+       if (write_index) {
+               err = index_write(stream->index_fd, &index, sizeof(index));
+               if (err < 0) {
+                       ret = -1;
+                       goto end;
+               }
+       }
+
 end:
        return ret;
 }
@@ -1710,12 +1815,23 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
-                               stream->uid, stream->gid);
+                               stream->uid, stream->gid, NULL);
                if (ret < 0) {
                        goto error;
                }
                stream->out_fd = ret;
                stream->tracefile_size_current = 0;
+
+               if (!stream->metadata_flag) {
+                       ret = index_create_file(stream->chan->pathname,
+                                       stream->name, stream->uid, stream->gid,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current);
+                       if (ret < 0) {
+                               goto error;
+                       }
+                       stream->index_fd = ret;
+               }
        }
        ret = 0;
 
@@ -1746,6 +1862,12 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
        }
 
        if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               uint64_t contiguous, pushed;
+
+               /* Ease our life a bit. */
+               contiguous = stream->chan->metadata_cache->contiguous;
+               pushed = stream->ust_metadata_pushed;
+
                /*
                 * We can simply check whether all contiguously available data
                 * has been pushed to the ring buffer, since the push operation
@@ -1757,10 +1879,10 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                 * metadata has been consumed from the metadata stream.
                 */
                DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
-                       stream->chan->metadata_cache->contiguous,
-                       stream->ust_metadata_pushed);
-               if (stream->chan->metadata_cache->contiguous
-                               != stream->ust_metadata_pushed) {
+                               contiguous, pushed);
+               assert(((int64_t) contiguous - pushed) >= 0);
+               if ((contiguous != pushed) ||
+                               (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
                        ret = 1;        /* Data is pending */
                        goto end;
                }
@@ -1796,7 +1918,6 @@ end:
  */
 void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
 {
-       int ret;
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
@@ -1808,17 +1929,16 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
        rcu_read_lock();
        cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
                        node.node) {
-               int fd = stream->wait_fd;
-
+               pthread_mutex_lock(&stream->chan->lock);
                /*
-                * Whatever happens here we have to continue to try to close every
-                * streams. Let's report at least the error on failure.
+                * Whatever returned value, we must continue to try to close everything
+                * so ignore it.
                 */
-               ret = ustctl_stream_close_wakeup_fd(stream->ustream);
-               if (ret) {
-                       ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
-               }
-               DBG("Metadata wait fd %d closed", fd);
+               (void) _close_metadata(stream->chan);
+               DBG("Metadata wait fd %d and poll pipe fd %d closed", stream->wait_fd,
+                               stream->ust_metadata_poll_pipe[1]);
+               pthread_mutex_unlock(&stream->chan->lock);
+
        }
        rcu_read_unlock();
 }
This page took 0.028324 seconds and 4 git commands to generate.