Live timer set up
[lttng-tools.git] / src / common / consumer.c
index a0fb2384cd566fe59b58bbca771b3a279f8670c2..0661f1264fa0e7fa136a9b4772f302558287ab56 100644 (file)
@@ -33,6 +33,7 @@
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -292,7 +293,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
-       pthread_mutex_lock(&channel->timer_lock);
 
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
@@ -326,7 +326,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
-       pthread_mutex_unlock(&channel->timer_lock);
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
@@ -459,6 +458,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        consumer_stream_destroy(stream, ht);
 }
 
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, metadata_ht);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -487,6 +499,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->key = stream_key;
        stream->out_fd = -1;
        stream->out_fd_offset = 0;
+       stream->output_written = 0;
        stream->state = state;
        stream->uid = uid;
        stream->gid = gid;
@@ -494,6 +507,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->session_id = session_id;
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+       stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -541,9 +555,9 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-static int add_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = data_ht;
        int ret = 0;
 
        assert(stream);
@@ -598,6 +612,11 @@ static int add_stream(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+       consumer_del_stream(stream, data_ht);
+}
+
 /*
  * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
  * be acquired before calling this.
@@ -820,7 +839,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
-               unsigned int monitor)
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        struct lttng_consumer_channel *channel;
 
@@ -841,6 +861,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       channel->live_timer_interval = live_timer_interval;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
@@ -1300,7 +1321,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1318,6 +1340,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1327,28 +1350,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        case LTTNG_CONSUMER_KERNEL:
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = -errno;
+                       goto end;
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
                mmap_base = lttng_ustctl_get_mmap_base(stream);
                if (!mmap_base) {
                        ERR("read mmap get mmap base for stream %s", stream->name);
-                       written = -1;
+                       written = -EPERM;
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = ret;
+                       goto end;
+               }
                break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
        }
-       if (ret != 0) {
-               errno = -ret;
-               PERROR("tracer ctl get_mmap_read_offset");
-               written = ret;
-               goto end;
-       }
 
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
@@ -1403,16 +1429,34 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               if (index) {
+                       index->offset = htobe64(stream->out_fd_offset);
+               }
        }
 
        while (len > 0) {
@@ -1429,7 +1473,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                         */
                        DBG("Error in file write mmap");
                        if (written == 0) {
-                               written = ret;
+                               written = -errno;
                        }
                        /* Socket operation failed. We consider the relayd dead */
                        if (errno == EPIPE || errno == EINVAL) {
@@ -1453,6 +1497,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret;
                }
+               stream->output_written += ret;
                written += ret;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1486,7 +1531,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1517,6 +1563,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1585,16 +1632,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -1663,6 +1726,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret_splice;
                }
+               stream->output_written += ret_splice;
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1869,7 +1933,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
-       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1964,13 +2027,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 end:
        /*
         * Nullify the stream reference so it is not used after deletion. The
-        * consumer data lock MUST be acquired before being able to check for a
-        * NULL pointer value.
+        * channel lock MUST be acquired before being able to check for
+        * NULL pointer value.
         */
        stream->chan->metadata_stream = NULL;
 
        pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
@@ -1986,9 +2048,9 @@ free_stream_rcu:
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = metadata_ht;
        int ret = 0;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
@@ -2171,11 +2233,6 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
-                       if (!revents) {
-                               continue;
-                       }
-
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2193,7 +2250,7 @@ restart:
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
                                        if (pipe_len < 0) {
-                                               ERR("read metadata stream, ret: %ld", pipe_len);
+                                               ERR("read metadata stream, ret: %zd", pipe_len);
                                                /*
                                                 * Continue here to handle the rest of the streams.
                                                 */
@@ -2210,14 +2267,6 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       ret = add_metadata_stream(stream, metadata_ht);
-                                       if (ret) {
-                                               ERR("Unable to add metadata stream");
-                                               /* Stream was not setup properly. Continuing. */
-                                               consumer_del_metadata_stream(stream, NULL);
-                                               continue;
-                                       }
-
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI);
@@ -2416,7 +2465,7 @@ void *consumer_thread_data_poll(void *data)
                        pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
                                        &new_stream, sizeof(new_stream));
                        if (pipe_readlen < 0) {
-                               ERR("Consumer data pipe ret %ld", pipe_readlen);
+                               ERR("Consumer data pipe ret %zd", pipe_readlen);
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2431,17 +2480,6 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = add_stream(new_stream, data_ht);
-                       if (ret) {
-                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               /*
-                                * At this point, if the add_stream fails, it is not in the
-                                * hash table thus passing the NULL value here.
-                                */
-                               consumer_del_stream(new_stream, NULL);
-                       }
-
                        /* Continue to update the local streams and handle prio ones */
                        continue;
                }
@@ -3396,6 +3434,15 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
+                       /*
+                        * An empty output file is not valid. We need at least one packet
+                        * generated per stream, even if it contains no event, so it
+                        * contains at least one packet header.
+                        */
+                       if (stream->output_written == 0) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_pending;
+                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
This page took 0.028398 seconds and 4 git commands to generate.