Clean-up: consumer: move open packet to post_consume
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 8 Jul 2020 18:57:40 +0000 (14:57 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 16 Jul 2020 16:30:55 +0000 (12:30 -0400)
Move the "open packet" step of read_subbuffer to a post-consume callback
as this only needs to be done for data streams; it does not belong in
the core of the read_subbuffer template method.

Change-Id: Ia4d3f8f833e213a8d0e39bcf5ec766c2c05bcf80
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer-stream.c
src/common/consumer/consumer-stream.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h

index 398d71ae05f92f6d9c0437b4fb203532870eb5d6..8c21d6ae1edd3afe71967d6bf5e5cb32cb998675 100644 (file)
@@ -22,6 +22,7 @@
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
 #include <common/consumer/metadata-bucket.h>
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
 #include <common/consumer/metadata-bucket.h>
+#include <common/kernel-ctl/kernel-ctl.h>
 
 #include "consumer-stream.h"
 
 
 #include "consumer-stream.h"
 
@@ -409,6 +410,155 @@ end:
        return 0;
 }
 
        return 0;
 }
 
+static
+bool stream_is_rotating_to_null_chunk(
+               const struct lttng_consumer_stream *stream)
+{
+       bool rotating_to_null_chunk = false;
+
+       if (stream->rotate_position == -1ULL) {
+               /* No rotation ongoing. */
+               goto end;
+       }
+
+       if (stream->trace_chunk == stream->chan->trace_chunk ||
+                       !stream->chan->trace_chunk) {
+               rotating_to_null_chunk = true;
+       }
+end:
+       return rotating_to_null_chunk;
+}
+
+enum consumer_stream_open_packet_status consumer_stream_open_packet(
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+       enum consumer_stream_open_packet_status status;
+       unsigned long produced_pos_before, produced_pos_after;
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(
+                       stream, &produced_pos_before);
+       if (ret < 0) {
+               ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = consumer_stream_flush_buffer(stream, 0);
+       if (ret) {
+               ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+       if (ret < 0) {
+               ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+                               ", channel name = %s, session id = %" PRIu64,
+                               stream->key, stream->chan->name,
+                               stream->chan->session_id);
+               status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR;
+               goto end;
+       }
+
+       /*
+        * Determine if the flush had an effect by comparing the produced
+        * positons before and after the flush.
+        */
+       status = produced_pos_before != produced_pos_after ?
+                       CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED :
+                       CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE;
+       if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) {
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
+
+end:
+       return status;
+}
+
+/*
+ * An attempt to open a new packet is performed after a rotation completes to
+ * get a begin timestamp as close as possible to the rotation point.
+ *
+ * However, that initial attempt at opening a packet can fail due to a full
+ * ring-buffer. In that case, a second attempt is performed after consuming
+ * a packet since that will have freed enough space in the ring-buffer.
+ */
+static
+int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret = 0;
+
+       if (!stream->opened_packet_in_current_trace_chunk &&
+                       stream->trace_chunk &&
+                       !stream_is_rotating_to_null_chunk(stream)) {
+               const enum consumer_stream_open_packet_status status =
+                               consumer_stream_open_packet(stream);
+
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left.
+                        * This means that new events were produced, resulting
+                        * in a packet being opened, which is what we want
+                        * anyhow.
+                        */
+                       DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end;
+               default:
+                       abort();
+               }
+
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
+
+end:
+       return ret;
+}
+
 struct lttng_consumer_stream *consumer_stream_create(
                struct lttng_consumer_channel *channel,
                uint64_t channel_key,
 struct lttng_consumer_stream *consumer_stream_create(
                struct lttng_consumer_channel *channel,
                uint64_t channel_key,
@@ -506,6 +656,9 @@ struct lttng_consumer_stream *consumer_stream_create(
 
        rcu_read_unlock();
 
 
        rcu_read_unlock();
 
+       lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs,
+                       sizeof(post_consume_cb), NULL);
+
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
                stream->read_subbuffer_ops.lock =
                                consumer_stream_metadata_lock_all;
        if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
                stream->read_subbuffer_ops.lock =
                                consumer_stream_metadata_lock_all;
@@ -514,18 +667,31 @@ struct lttng_consumer_stream *consumer_stream_create(
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                metadata_stream_check_version;
        } else {
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                metadata_stream_check_version;
        } else {
+               const post_consume_cb post_consume_index_op = channel->is_live ?
+                               consumer_stream_sync_metadata_index :
+                               consumer_stream_send_index;
+
+               ret = lttng_dynamic_array_add_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               &post_consume_index_op);
+               if (ret) {
+                       PERROR("Failed to add `send index` callback to stream's post consumption callbacks");
+                       goto error;
+               }
+
+               ret = lttng_dynamic_array_add_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               &(post_consume_cb) { post_consume_open_new_packet });
+               if (ret) {
+                       PERROR("Failed to add `open new packet` callback to stream's post consumption callbacks");
+                       goto error;
+               }
+
                stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_data_unlock_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                consumer_stream_update_stats;
                stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
                stream->read_subbuffer_ops.unlock =
                                consumer_stream_data_unlock_all;
                stream->read_subbuffer_ops.pre_consume_subbuffer =
                                consumer_stream_update_stats;
-               if (channel->is_live) {
-                       stream->read_subbuffer_ops.post_consume =
-                                       consumer_stream_sync_metadata_index;
-               } else {
-                       stream->read_subbuffer_ops.post_consume =
-                                       consumer_stream_send_index;
-               }
        }
 
        if (channel->output == CONSUMER_CHANNEL_MMAP) {
        }
 
        if (channel->output == CONSUMER_CHANNEL_MMAP) {
@@ -541,6 +707,7 @@ struct lttng_consumer_stream *consumer_stream_create(
 error:
        rcu_read_unlock();
        lttng_trace_chunk_put(stream->trace_chunk);
 error:
        rcu_read_unlock();
        lttng_trace_chunk_put(stream->trace_chunk);
+       lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs);
        free(stream);
 end:
        if (alloc_ret) {
        free(stream);
 end:
        if (alloc_ret) {
@@ -852,6 +1019,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        /* Free stream within a RCU call. */
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = NULL;
        /* Free stream within a RCU call. */
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = NULL;
+       lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs);
        consumer_stream_free(stream);
 }
 
        consumer_stream_free(stream);
 }
 
@@ -1068,3 +1236,47 @@ void consumer_stream_metadata_set_version(
                metadata_bucket_reset(stream->metadata_bucket);
        }
 }
                metadata_bucket_reset(stream->metadata_bucket);
        }
 }
+
+int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream,
+               bool producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               if (producer_active) {
+                       ret = kernctl_buffer_flush(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end;
+                       }
+               } else {
+                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
+                       if (ret < 0) {
+                               /*
+                                * Doing a buffer flush which does not take into
+                                * account empty packets. This is not perfect,
+                                * but required as a fall-back when
+                                * "flush_empty" is not implemented by
+                                * lttng-modules.
+                                */
+                               ret = kernctl_buffer_flush(stream->wait_fd);
+                               if (ret < 0) {
+                                       ERR("Failed to flush kernel stream");
+                                       goto end;
+                               }
+                       }
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
index 50e402fe200f1c90f0e06c62622c12f6f29b38d5..c9af63cd83841c10a542e015aba3d6c3eb5fd69d 100644 (file)
 
 #include "consumer.h"
 
 
 #include "consumer.h"
 
+enum consumer_stream_open_packet_status {
+       CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED,
+       CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE,
+       CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR,
+};
+
 /*
  * Create a consumer stream.
  *
 /*
  * Create a consumer stream.
  *
@@ -130,4 +136,34 @@ int consumer_stream_enable_metadata_bucketization(
 void consumer_stream_metadata_set_version(
                struct lttng_consumer_stream *stream, uint64_t new_version);
 
 void consumer_stream_metadata_set_version(
                struct lttng_consumer_stream *stream, uint64_t new_version);
 
+/*
+ * Set the version of a metadata stream (i.e. following a metadata
+ * regeneration).
+ *
+ * Changing the version of a metadata stream will cause any bucketized metadata
+ * to be discarded and will mark the metadata stream for future `reset`.
+ */
+void consumer_stream_metadata_set_version(
+               struct lttng_consumer_stream *stream, uint64_t new_version);
+
+/*
+ * Attempt to open a packet in a stream.
+ *
+ * This function must be called with the stream and channel locks held.
+ */
+enum consumer_stream_open_packet_status consumer_stream_open_packet(
+               struct lttng_consumer_stream *stream);
+
+/*
+ * Flush a stream's buffer.
+ *
+ * producer_active: if true, causes a flush to occur only if there is
+ * content present in the current sub-buffer. If false, forces a flush to take
+ * place (otherwise known as "flush_empty").
+ *
+ * This function must be called with the stream and channel locks held.
+ */
+int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream,
+               bool producer_active);
+
 #endif /* LTTNG_CONSUMER_STREAM_H */
 #endif /* LTTNG_CONSUMER_STREAM_H */
index be440e69492b020b4281f83cc94c12fe29c9c38f..b2c5eb3ddbd1484e5aa2f44f64bc48f17a8dceda 100644 (file)
@@ -63,12 +63,6 @@ struct consumer_channel_msg {
        uint64_t key;                           /* del */
 };
 
        uint64_t key;                           /* del */
 };
 
-enum open_packet_status {
-       OPEN_PACKET_STATUS_OPENED,
-       OPEN_PACKET_STATUS_NO_SPACE,
-       OPEN_PACKET_STATUS_ERROR,
-};
-
 /* Flag used to temporarily pause data consumption from testpoints. */
 int data_consumption_paused;
 
 /* Flag used to temporarily pause data consumption from testpoints. */
 int data_consumption_paused;
 
@@ -3316,140 +3310,29 @@ error_testpoint:
        return NULL;
 }
 
        return NULL;
 }
 
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream,
-               int producer_active)
+static int post_consume(struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
 {
 {
+       size_t i;
        int ret = 0;
        int ret = 0;
+       const size_t count = lttng_dynamic_array_get_count(
+                       &stream->read_subbuffer_ops.post_consume_cbs);
 
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (producer_active) {
-                       ret = kernctl_buffer_flush(stream->wait_fd);
-                       if (ret < 0) {
-                               ERR("Failed to flush kernel stream");
-                               goto end;
-                       }
-               } else {
-                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
-                       if (ret < 0) {
-                               /*
-                                * Doing a buffer flush which does not take into
-                                * account empty packets. This is not perfect,
-                                * but required as a fall-back when
-                                * "flush_empty" is not implemented by
-                                * lttng-modules.
-                                */
-                               ret = kernctl_buffer_flush(stream->wait_fd);
-                               if (ret < 0) {
-                                       ERR("Failed to flush kernel stream");
-                                       goto end;
-                               }
-                       }
+       for (i = 0; i < count; i++) {
+               const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               i);
+
+               ret = op(stream, subbuffer, ctx);
+               if (ret) {
+                       goto end;
                }
                }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_flush_buffer(stream, producer_active);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
        }
        }
-
 end:
        return ret;
 }
 
 end:
        return ret;
 }
 
-static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
-{
-       int ret;
-       enum open_packet_status status;
-       unsigned long produced_pos_before, produced_pos_after;
-
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = lttng_consumer_get_produced_snapshot(
-                       stream, &produced_pos_before);
-       if (ret < 0) {
-               ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = consumer_flush_buffer(stream, 0);
-       if (ret) {
-               ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
-       if (ret < 0) {
-               ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       /*
-        * Determine if the flush had an effect by comparing the produced
-        * positons before and after the flush.
-        */
-       status = produced_pos_before != produced_pos_after ?
-                       OPEN_PACKET_STATUS_OPENED :
-                       OPEN_PACKET_STATUS_NO_SPACE;
-       if (status == OPEN_PACKET_STATUS_OPENED) {
-               stream->opened_packet_in_current_trace_chunk = true;
-       }
-end:
-       return status;
-}
-
-static bool stream_is_rotating_to_null_chunk(
-               const struct lttng_consumer_stream *stream)
-{
-       bool rotating_to_null_chunk = false;
-
-       if (stream->rotate_position == -1ULL) {
-               /* No rotation ongoing. */
-               goto end;
-       }
-
-       if (stream->trace_chunk == stream->chan->trace_chunk ||
-                       !stream->chan->trace_chunk) {
-               rotating_to_null_chunk = true;
-       }
-end:
-       return rotating_to_null_chunk;
-}
-
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx,
                bool locked_by_caller)
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx,
                bool locked_by_caller)
@@ -3525,11 +3408,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
                goto end;
        }
 
-       if (stream->read_subbuffer_ops.post_consume) {
-               ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
-               if (ret) {
-                       goto end;
-               }
+       ret = post_consume(stream, &subbuffer, ctx);
+       if (ret) {
+               goto end;
        }
 
        /*
        }
 
        /*
@@ -3547,50 +3428,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ERR("Stream rotation error after consuming data");
                        goto end;
                }
                        ERR("Stream rotation error after consuming data");
                        goto end;
                }
+
        } else if (rotation_ret < 0) {
                ret = rotation_ret;
                ERR("Failed to check if stream was ready to rotate after consuming data");
                goto end;
        }
 
        } else if (rotation_ret < 0) {
                ret = rotation_ret;
                ERR("Failed to check if stream was ready to rotate after consuming data");
                goto end;
        }
 
-       /*
-        * TODO roll into a post_consume op as this doesn't apply to metadata
-        * streams.
-        */
-       if (!stream->opened_packet_in_current_trace_chunk &&
-                       stream->trace_chunk && !stream->metadata_flag &&
-                       !stream_is_rotating_to_null_chunk(stream)) {
-               const enum open_packet_status status = open_packet(stream);
-
-               switch (status) {
-               case OPEN_PACKET_STATUS_OPENED:
-                       DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
-                                       ", channel name = %s, session id = %" PRIu64,
-                                       stream->key, stream->chan->name,
-                                       stream->chan->session_id);
-                       break;
-               case OPEN_PACKET_STATUS_NO_SPACE:
-                       /*
-                        * Can't open a packet as there is no space left.
-                        * This means that new events were produced, resulting
-                        * in a packet being opened, which is what we wanted
-                        * anyhow.
-                        */
-                       DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
-                                       ", channel name = %s, session id = %" PRIu64,
-                                       stream->key, stream->chan->name,
-                                       stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk = true;
-                       break;
-               case OPEN_PACKET_STATUS_ERROR:
-                       /* Logged by callee. */
-                       ret = -1;
-                       goto end;
-               default:
-                       abort();
-               }
-       }
-
 sleep_stream:
        if (stream->read_subbuffer_ops.on_sleep) {
                stream->read_subbuffer_ops.on_sleep(stream, ctx);
 sleep_stream:
        if (stream->read_subbuffer_ops.on_sleep) {
                stream->read_subbuffer_ops.on_sleep(stream, ctx);
@@ -4148,7 +3992,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * ensures we have at least one packet in each stream per trace
                         * chunk, even if no data was produced.
                         */
                         * ensures we have at least one packet in each stream per trace
                         * chunk, even if no data was produced.
                         */
-                       ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+                       ret = consumer_stream_flush_buffer(
+                                       stream, stream->metadata_flag ? 1 : 0);
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                                stream->key);
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                                stream->key);
@@ -4284,17 +4129,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
                         */
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
                         */
-                       const enum open_packet_status status =
-                               open_packet(stream);
+                       const enum consumer_stream_open_packet_status status =
+                               consumer_stream_open_packet(stream);
 
                        switch (status) {
 
                        switch (status) {
-                       case OPEN_PACKET_STATUS_OPENED:
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                                DBG("Opened a packet after a rotation: stream id = %" PRIu64
                                                ", channel name = %s, session id = %" PRIu64,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                break;
                                DBG("Opened a packet after a rotation: stream id = %" PRIu64
                                                ", channel name = %s, session id = %" PRIu64,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                break;
-                       case OPEN_PACKET_STATUS_NO_SPACE:
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
                                /*
                                 * Can't open a packet as there is no space left
                                 * in the buffer. A new packet will be opened
                                /*
                                 * Can't open a packet as there is no space left
                                 * in the buffer. A new packet will be opened
@@ -4305,7 +4150,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                break;
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                break;
-                       case OPEN_PACKET_STATUS_ERROR:
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                                /* Logged by callee. */
                                ret = -1;
                                goto end_unlock_stream;
                                /* Logged by callee. */
                                ret = -1;
                                goto end_unlock_stream;
@@ -4412,7 +4257,7 @@ int consumer_clear_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
 {
        int ret;
 
-       ret = consumer_flush_buffer(stream, 1);
+       ret = consumer_stream_flush_buffer(stream, 1);
        if (ret < 0) {
                ERR("Failed to flush stream %" PRIu64 " during channel clear",
                                stream->key);
        if (ret < 0) {
                ERR("Failed to flush stream %" PRIu64 " during channel clear",
                                stream->key);
@@ -5184,29 +5029,29 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(
 
        rcu_read_lock();
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
 
        rcu_read_lock();
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               enum open_packet_status status;
+               enum consumer_stream_open_packet_status status;
 
                pthread_mutex_lock(&stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
                        goto next;
                }
 
 
                pthread_mutex_lock(&stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
                        goto next;
                }
 
-               status = open_packet(stream);
+               status = consumer_stream_open_packet(stream);
                switch (status) {
                switch (status) {
-               case OPEN_PACKET_STATUS_OPENED:
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                        DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                        stream->opened_packet_in_current_trace_chunk = true;
                        break;
                        DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                        stream->opened_packet_in_current_trace_chunk = true;
                        break;
-               case OPEN_PACKET_STATUS_NO_SPACE:
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
                        DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                        break;
                        DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                        break;
-               case OPEN_PACKET_STATUS_ERROR:
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                        /*
                         * Only unexpected internal errors can lead to this
                         * failing. Report an unknown error.
                        /*
                         * Only unexpected internal errors can lead to this
                         * failing. Report an unknown error.
index 73189660ce4eb888bf3acc12a0f17ea0c3f9071e..b45f88b756b2032dbf90daa74e91f31ed9ffc940 100644 (file)
@@ -27,6 +27,7 @@
 #include <common/trace-chunk-registry.h>
 #include <common/credentials.h>
 #include <common/buffer-view.h>
 #include <common/trace-chunk-registry.h>
 #include <common/credentials.h>
 #include <common/buffer-view.h>
+#include <common/dynamic-array.h>
 
 struct lttng_consumer_local_data;
 
 
 struct lttng_consumer_local_data;
 
@@ -636,7 +637,7 @@ struct lttng_consumer_stream {
                reset_metadata_cb reset_metadata;
                consume_subbuffer_cb consume_subbuffer;
                put_next_subbuffer_cb put_next_subbuffer;
                reset_metadata_cb reset_metadata;
                consume_subbuffer_cb consume_subbuffer;
                put_next_subbuffer_cb put_next_subbuffer;
-               post_consume_cb post_consume;
+               struct lttng_dynamic_array post_consume_cbs;
                send_live_beacon_cb send_live_beacon;
                on_sleep_cb on_sleep;
                unlock_cb unlock;
                send_live_beacon_cb send_live_beacon;
                on_sleep_cb on_sleep;
                unlock_cb unlock;
This page took 0.033807 seconds and 4 git commands to generate.