Implement the relayd live features
[lttng-tools.git] / src / common / consumer.c
index 221c348df055722918793e32617b6812dcfbc680..b8695698da7301b69b114fe373680b248294b486 100644 (file)
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
 
 #include "consumer.h"
 #include "consumer-stream.h"
@@ -304,6 +306,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_stream_destroy(stream, NULL);
        }
 
+       if (channel->live_timer_enabled == 1) {
+               consumer_timer_live_stop(channel);
+       }
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
@@ -498,6 +504,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->key = stream_key;
        stream->out_fd = -1;
        stream->out_fd_offset = 0;
+       stream->output_written = 0;
        stream->state = state;
        stream->uid = uid;
        stream->gid = gid;
@@ -505,6 +512,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->session_id = session_id;
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+       stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -726,6 +734,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                if (ret < 0) {
                        goto end;
                }
+
                uatomic_inc(&relayd->refcount);
                stream->sent_to_relayd = 1;
        } else {
@@ -836,7 +845,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
-               unsigned int monitor)
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        struct lttng_consumer_channel *channel;
 
@@ -857,6 +867,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       channel->live_timer_interval = live_timer_interval;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
@@ -1316,7 +1327,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1423,16 +1435,34 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               if (index) {
+                       index->offset = htobe64(stream->out_fd_offset);
+               }
        }
 
        while (len > 0) {
@@ -1473,6 +1503,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret;
                }
+               stream->output_written += ret;
                written += ret;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1506,7 +1537,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1606,16 +1638,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -1684,6 +1732,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret_splice;
                }
+               stream->output_written += ret_splice;
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -2190,11 +2239,6 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
-                       if (!revents) {
-                               continue;
-                       }
-
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -3073,7 +3117,8 @@ void lttng_consumer_init(void)
 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
+               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+               uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
@@ -3173,29 +3218,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
 
-               /*
-                * Create a session on the relayd and store the returned id. Lock the
-                * control socket mutex if the relayd was NOT created before.
-                */
-               if (!relayd_created) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               }
-               ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->relayd_session_id);
-               if (!relayd_created) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               }
-               if (ret < 0) {
-                       /*
-                        * Close all sockets of a relayd object. It will be freed if it was
-                        * created at the error code path or else it will be garbage
-                        * collect.
-                        */
-                       (void) relayd_close(&relayd->control_sock);
-                       (void) relayd_close(&relayd->data_sock);
-                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
-                       goto error;
-               }
+               relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
@@ -3396,6 +3419,15 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
+                       /*
+                        * An empty output file is not valid. We need at least one packet
+                        * generated per stream, even if it contains no event, so it
+                        * contains at least one packet header.
+                        */
+                       if (stream->output_written == 0) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_pending;
+                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
This page took 0.026171 seconds and 4 git commands to generate.