consumerd: tag metadata channel as being part of a live session
[lttng-tools.git] / src / common / consumer / consumer.c
index 64057c785f9512d734ccbd78649214125fad824d..e2e7438f62d29a942740f37dcd255c6e9242cbf7 100644 (file)
@@ -568,7 +568,9 @@ void consumer_stream_update_channel_attributes(
                        channel->tracefile_size;
 }
 
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+struct lttng_consumer_stream *consumer_allocate_stream(
+               struct lttng_consumer_channel *channel,
+               uint64_t channel_key,
                uint64_t stream_key,
                const char *channel_name,
                uint64_t relayd_id,
@@ -596,6 +598,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        }
 
        rcu_read_lock();
+       stream->chan = channel;
        stream->key = stream_key;
        stream->trace_chunk = trace_chunk;
        stream->out_fd = -1;
@@ -1063,6 +1066,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id_per_pid,
                unsigned int monitor,
                unsigned int live_timer_interval,
+               bool is_in_live_session,
                const char *root_shm_path,
                const char *shm_path)
 {
@@ -1094,6 +1098,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
+       channel->is_live = is_in_live_session;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
@@ -1671,8 +1676,7 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
-               const char *buffer,
-               unsigned long len,
+               const struct lttng_buffer_view *buffer,
                unsigned long padding,
                struct ctf_packet_index *index)
 {
@@ -1682,6 +1686,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = NULL;
        unsigned int relayd_hang_up = 0;
+       const size_t subbuf_content_size = buffer->size - padding;
+       size_t write_len;
 
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
@@ -1699,7 +1705,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
-               unsigned long netlen = len;
+               unsigned long netlen = subbuf_content_size;
 
                /*
                 * Lock the control socket for the complete duration of the function
@@ -1737,10 +1743,10 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                goto write_error;
                        }
                }
-       } else {
-               /* No streaming, we have to set the len with the full padding */
-               len += padding;
 
+               write_len = subbuf_content_size;
+       } else {
+               /* No streaming; we have to write the full padding. */
                if (stream->metadata_flag && stream->reset_metadata_flag) {
                        ret = utils_truncate_stream_file(stream->out_fd, 0);
                        if (ret < 0) {
@@ -1754,7 +1760,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                 * Check if we need to change the tracefile before writing the packet.
                 */
                if (stream->chan->tracefile_size > 0 &&
-                               (stream->tracefile_size_current + len) >
+                               (stream->tracefile_size_current + buffer->size) >
                                stream->chan->tracefile_size) {
                        ret = consumer_stream_rotate_output_files(stream);
                        if (ret) {
@@ -1763,19 +1769,21 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        outfd = stream->out_fd;
                        orig_offset = 0;
                }
-               stream->tracefile_size_current += len;
+               stream->tracefile_size_current += buffer->size;
                if (index) {
                        index->offset = htobe64(stream->out_fd_offset);
                }
+
+               write_len = buffer->size;
        }
 
        /*
         * This call guarantee that len or less is returned. It's impossible to
         * receive a ret value that is bigger than len.
         */
-       ret = lttng_write(outfd, bufferlen);
-       DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-       if (ret < 0 || ((size_t) ret != len)) {
+       ret = lttng_write(outfd, buffer->data, write_len);
+       DBG("Consumer mmap write() ret %zd (len %lu)", ret, write_len);
+       if (ret < 0 || ((size_t) ret != write_len)) {
                /*
                 * Report error to caller if nothing was written else at least send the
                 * amount written.
@@ -1796,7 +1804,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        DBG("Consumer mmap write detected relayd hang up");
                } else {
                        /* Unhandled error, print it and stop function right now. */
-                       PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+                       PERROR("Error in write mmap (ret %zd != write_len %zu)", ret,
+                                       write_len);
                }
                goto write_error;
        }
@@ -1805,9 +1814,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+               lttng_sync_file_range(outfd, stream->out_fd_offset, write_len,
                                SYNC_FILE_RANGE_WRITE);
-               stream->out_fd_offset += len;
+               stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
 
This page took 0.024866 seconds and 4 git commands to generate.