Clean-up: consumer-stream: change space to tabs
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index b262b54d56868c27d2704b7cadeb52880c3ec40e..1e3bfe7853f905addf8cee2efbefb3ccd6f9446f 100644 (file)
@@ -22,6 +22,7 @@
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
 #include <common/consumer/metadata-bucket.h>
+#include <common/kernel-ctl/kernel-ctl.h>
 
 #include "consumer-stream.h"
 
@@ -69,8 +70,7 @@ static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
 {
        int ret = 0;
        uint64_t sequence_number;
-       const uint64_t discarded_events =
-                       LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number);
+       const uint64_t discarded_events = subbuf->info.data.events_discarded;
 
        if (!subbuf->info.data.sequence_number.is_set) {
                /* Command not supported by the tracer. */
@@ -152,9 +152,40 @@ static ssize_t consumer_stream_consume_mmap(
        const unsigned long padding_size =
                        subbuffer->info.data.padded_subbuf_size -
                        subbuffer->info.data.subbuf_size;
-
-       return lttng_consumer_on_read_subbuffer_mmap(
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
                        stream, &subbuffer->buffer.buffer, padding_size);
+
+       if (stream->net_seq_idx == -1ULL) {
+               /*
+                * When writing on disk, check that only the subbuffer (no
+                * padding) was written to disk.
+                */
+               if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+                       DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.padded_subbuf_size);
+               }
+       } else {
+               /*
+                * When streaming over the network, check that the entire
+                * subbuffer including padding was successfully written.
+                */
+               if (written_bytes != subbuffer->info.data.subbuf_size) {
+                       DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+                                       written_bytes,
+                                       subbuffer->info.data.subbuf_size);
+               }
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+        * it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading mmap subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -162,8 +193,24 @@ static ssize_t consumer_stream_consume_splice(
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
-       return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
-                       subbuffer->info.data.padded_subbuf_size, 0);
+       const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+                       ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+       if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+               DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+                               written_bytes,
+                               subbuffer->info.data.padded_subbuf_size);
+       }
+
+       /*
+        * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+        * pass it along to the caller, else return zero.
+        */
+       if (written_bytes < 0) {
+               ERR("Error reading splice subbuffer: %zd", written_bytes);
+       }
+
+       return written_bytes;
 }
 
 static int consumer_stream_send_index(
@@ -197,6 +244,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
+       enum sync_metadata_status status;
 
        assert(metadata);
        assert(metadata->metadata_flag);
@@ -244,7 +292,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                        /*
                         * Empty the metadata cache and flush the current stream.
                         */
-                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       status = lttng_kconsumer_sync_metadata(metadata);
                        break;
                case LTTNG_CONSUMER32_UST:
                case LTTNG_CONSUMER64_UST:
@@ -252,18 +300,23 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                         * Ask the sessiond if we have new metadata waiting and update the
                         * consumer metadata cache.
                         */
-                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       status = lttng_ustconsumer_sync_metadata(ctx, metadata);
                        break;
                default:
-                       assert(0);
-                       ret = -1;
-                       break;
+                       abort();
                }
-               /*
-                * Error or no new metadata, we exit here.
-                */
-               if (ret <= 0 || ret == ENODATA) {
+
+               switch (status) {
+               case SYNC_METADATA_STATUS_NEW_DATA:
+                       break;
+               case SYNC_METADATA_STATUS_NO_DATA:
+                       ret = 0;
+                       goto end_unlock_mutex;
+               case SYNC_METADATA_STATUS_ERROR:
+                       ret = -1;
                        goto end_unlock_mutex;
+               default:
+                       abort();
                }
 
                /*
@@ -285,7 +338,7 @@ static int do_sync_metadata(struct lttng_consumer_stream *metadata,
                 */
                pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
                pthread_mutex_unlock(&metadata->metadata_rdv_lock);
-       } while (ret == EAGAIN);
+       } while (status == SYNC_METADATA_STATUS_NEW_DATA);
 
        /* Success */
        return 0;
@@ -404,6 +457,155 @@ end:
        return 0;
 }
 
+static
+bool stream_is_rotating_to_null_chunk(
+               const struct lttng_consumer_stream *stream)
+{
+       bool rotating_to_null_chunk = false;
+
+       if (stream->rotate_position == -1ULL) {
+               /* No rotation ongoing. */
+               goto end;
+       }
+
+       if (stream->trace_chunk == stream->chan->trace_chunk ||
+                       !stream->chan->trace_chunk) {
+               rotating_to_null_chunk = true;
+       }
+end:
+       return rotating_to_null_chunk;
+}
+
+enum consumer_stream_open_packet_status consumer_stream_open_packet(
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       enum consumer_stream_open_packet_status status;
+       unsigned long produced_pos_before, produced_pos_after;
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(
+                       stream, &produced_pos_before);
+       if (ret < 0) {
+               ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = consumer_stream_flush_buffer(stream, 0);
+       if (ret) {
+               ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+       if (ret < 0) {
+               ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       /*
+        * Determine if the flush had an effect by comparing the produced
+        * positons before and after the flush.
+        */
+       status = produced_pos_before != produced_pos_after ?
+                       CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED :
+                       CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE;
+       if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) {
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
+
+end:
+       return status;
+}
+
+/*
+ * An attempt to open a new packet is performed after a rotation completes to
+ * get a begin timestamp as close as possible to the rotation point.
+ *
+ * However, that initial attempt at opening a packet can fail due to a full
+ * ring-buffer. In that case, a second attempt is performed after consuming
+ * a packet since that will have freed enough space in the ring-buffer.
+ */
+static
+int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->opened_packet_in_current_trace_chunk &&
+                       stream->trace_chunk &&
+                       !stream_is_rotating_to_null_chunk(stream)) {
+               const enum consumer_stream_open_packet_status status =
+                               consumer_stream_open_packet(stream);
+
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left.
+                        * This means that new events were produced, resulting
+                        * in a packet being opened, which is what we want
+                        * anyhow.
+                        */
+                       DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end;
+               default:
+                       abort();
+               }
+
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
+
+end:
+       return ret;
+}
+
 struct lttng_consumer_stream *consumer_stream_create(
                struct lttng_consumer_channel *channel,
                uint64_t channel_key,
@@ -447,6 +649,8 @@ struct lttng_consumer_stream *consumer_stream_create(
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        stream->rotate_position = -1ULL;
+       /* Buffer is created with an open packet. */
+       stream->opened_packet_in_current_trace_chunk = true;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -499,6 +703,9 @@ struct lttng_consumer_stream *consumer_stream_create(
 
        rcu_read_unlock();
 
+       lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs,
+                       sizeof(post_consume_cb), NULL);
+
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
                stream->read_subbuffer_ops.lock =
                                consumer_stream_metadata_lock_all;
@@ -507,18 +714,31 @@ struct lttng_consumer_stream *consumer_stream_create(
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                metadata_stream_check_version;
        } else {
+               const post_consume_cb post_consume_index_op = channel->is_live ?
+                               consumer_stream_sync_metadata_index :
+                               consumer_stream_send_index;
+
+               ret = lttng_dynamic_array_add_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               &post_consume_index_op);
+               if (ret) {
+                       PERROR("Failed to add `send index` callback to stream's post consumption callbacks");
+                       goto error;
+               }
+
+               ret = lttng_dynamic_array_add_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               &(post_consume_cb) { post_consume_open_new_packet });
+               if (ret) {
+                       PERROR("Failed to add `open new packet` callback to stream's post consumption callbacks");
+                       goto error;
+               }
+
                stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_data_unlock_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                consumer_stream_update_stats;
-               if (channel->is_live) {
-                       stream->read_subbuffer_ops.post_consume =
-                                       consumer_stream_sync_metadata_index;
-               } else {
-                       stream->read_subbuffer_ops.post_consume =
-                                       consumer_stream_send_index;
-               }
        }
 
        if (channel->output == CONSUMER_CHANNEL_MMAP) {
@@ -534,6 +754,7 @@ struct lttng_consumer_stream *consumer_stream_create(
 error:
        rcu_read_unlock();
        lttng_trace_chunk_put(stream->trace_chunk);
+       lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs);
        free(stream);
 end:
        if (alloc_ret) {
@@ -845,6 +1066,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        /* Free stream within a RCU call. */
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = NULL;
+       lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs);
        consumer_stream_free(stream);
 }
 
@@ -928,16 +1150,16 @@ int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
                        goto end;
                }
                stream->out_fd = -1;
-        }
+       }
 
        DBG("Opening stream output file \"%s\"", stream_path);
        chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
                        flags, mode, &stream->out_fd, false);
-        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->name);
                ret = -1;
                goto end;
-        }
+       }
 
        if (!stream->metadata_flag && (create_index || stream->index_file)) {
                if (stream->index_file) {
@@ -1061,3 +1283,47 @@ void consumer_stream_metadata_set_version(
                metadata_bucket_reset(stream->metadata_bucket);
        }
 }
+
+int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream,
+               bool producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               if (producer_active) {
+                       ret = kernctl_buffer_flush(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end;
+                       }
+               } else {
+                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
+                       if (ret < 0) {
+                               /*
+                                * Doing a buffer flush which does not take into
+                                * account empty packets. This is not perfect,
+                                * but required as a fall-back when
+                                * "flush_empty" is not implemented by
+                                * lttng-modules.
+                                */
+                               ret = kernctl_buffer_flush(stream->wait_fd);
+                               if (ret < 0) {
+                                       ERR("Failed to flush kernel stream");
+                                       goto end;
+                               }
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
This page took 0.027951 seconds and 4 git commands to generate.