consumerd: move rotation logic to domain-agnostic read path
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 275c963df2305dee886327378f0a6a78e977e8db..4a2e4e07d7c0b8e5eac8e34927afad5f8762da4a 100644 (file)
@@ -7,6 +7,7 @@
  *
  */
 
+#include "common/buffer-view.h"
 #include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
@@ -249,9 +250,9 @@ static int lttng_kconsumer_snapshot_channel(
                        ssize_t read_len;
                        unsigned long len, padded_len;
                        const char *subbuf_addr;
+                       struct lttng_buffer_view subbuf_view;
 
                        health_code_update();
-
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -283,8 +284,10 @@ static int lttng_kconsumer_snapshot_channel(
                                goto error_put_subbuf;
                        }
 
+                       subbuf_view = lttng_buffer_view_init(
+                                       subbuf_addr, 0, padded_len);
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
-                                       stream, subbuf_addr, len,
+                                       stream, &subbuf_view,
                                        padded_len - len, NULL);
                        /*
                         * We write the padded len in local tracefiles but the data len
@@ -510,6 +513,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.tracefile_count, 0,
                                msg.u.channel.monitor,
                                msg.u.channel.live_timer_interval,
+                               msg.u.channel.is_live,
                                NULL, NULL);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
@@ -651,7 +655,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                pthread_mutex_lock(&channel->lock);
-               new_stream = consumer_allocate_stream(channel->key,
+               new_stream = consumer_allocate_stream(
+                               channel,
+                               channel->key,
                                fd,
                                channel->name,
                                channel->relayd_id,
@@ -673,7 +679,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto error_add_stream_nosignal;
                }
 
-               new_stream->chan = channel;
                new_stream->wait_fd = fd;
                ret = kernctl_get_max_subbuf_size(new_stream->wait_fd,
                                &new_stream->max_sb_size);
@@ -1585,26 +1590,14 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret;
+       int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct ctf_packet_index index = {};
+       bool in_error_state = false;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
-       /*
-        * If the stream was flagged to be ready for rotation before we extract the
-        * next packet, rotate it now.
-        */
-       if (stream->rotate_ready) {
-               DBG("Rotate stream before extracting data");
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (rotation_ret < 0) {
-                       ERR("Stream rotation error");
-                       ret = -1;
-                       goto error;
-               }
-       }
 
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
@@ -1720,6 +1713,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        case CONSUMER_CHANNEL_MMAP:
        {
                const char *subbuf_addr;
+               struct lttng_buffer_view subbuf_view;
 
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
@@ -1750,15 +1744,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                padding = len - subbuf_size;
 
+               subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
                /* write the subbuffer to the tracefile */
-               ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
-                               subbuf_addr,
-                               subbuf_size,
-                               padding, &index);
+               ret = lttng_consumer_on_read_subbuffer_mmap(
+                               ctx, stream, &subbuf_view, padding, &index);
                /*
-                * The mmap operation should write subbuf_size amount of data when
-                * network streaming or the full padding (len) size when we are _not_
-                * streaming.
+                * The mmap operation should write subbuf_size amount of data
+                * when network streaming or the full padding (len) size when we
+                * are _not_ streaming.
                 */
                if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
                                (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
@@ -1789,11 +1783,13 @@ error_put_subbuf:
                }
                ret = err;
                goto error;
+       } else if (in_error_state) {
+               goto error;
        }
 
        /* Write index if needed. */
        if (!write_index) {
-               goto rotate;
+               goto end;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -1826,25 +1822,7 @@ error_put_subbuf:
                goto error;
        }
 
-rotate:
-       /*
-        * After extracting the packet, we check if the stream is now ready to be
-        * rotated and perform the action immediately.
-        */
-       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
-       if (rotation_ret == 1) {
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (rotation_ret < 0) {
-                       ERR("Stream rotation error");
-                       ret = -1;
-                       goto error;
-               }
-       } else if (rotation_ret < 0) {
-               ERR("Checking if stream is ready to rotate");
-               ret = -1;
-               goto error;
-       }
-
+end:
 error:
        return ret;
 }
This page took 0.025943 seconds and 4 git commands to generate.