Clean-up: consumer: move open packet to post_consume
[lttng-tools.git] / src / common / consumer / consumer.c
index be440e69492b020b4281f83cc94c12fe29c9c38f..b2c5eb3ddbd1484e5aa2f44f64bc48f17a8dceda 100644 (file)
@@ -63,12 +63,6 @@ struct consumer_channel_msg {
        uint64_t key;                           /* del */
 };
 
-enum open_packet_status {
-       OPEN_PACKET_STATUS_OPENED,
-       OPEN_PACKET_STATUS_NO_SPACE,
-       OPEN_PACKET_STATUS_ERROR,
-};
-
 /* Flag used to temporarily pause data consumption from testpoints. */
 int data_consumption_paused;
 
@@ -3316,140 +3310,29 @@ error_testpoint:
        return NULL;
 }
 
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream,
-               int producer_active)
+static int post_consume(struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
 {
+       size_t i;
        int ret = 0;
+       const size_t count = lttng_dynamic_array_get_count(
+                       &stream->read_subbuffer_ops.post_consume_cbs);
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (producer_active) {
-                       ret = kernctl_buffer_flush(stream->wait_fd);
-                       if (ret < 0) {
-                               ERR("Failed to flush kernel stream");
-                               goto end;
-                       }
-               } else {
-                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
-                       if (ret < 0) {
-                               /*
-                                * Doing a buffer flush which does not take into
-                                * account empty packets. This is not perfect,
-                                * but required as a fall-back when
-                                * "flush_empty" is not implemented by
-                                * lttng-modules.
-                                */
-                               ret = kernctl_buffer_flush(stream->wait_fd);
-                               if (ret < 0) {
-                                       ERR("Failed to flush kernel stream");
-                                       goto end;
-                               }
-                       }
+       for (i = 0; i < count; i++) {
+               const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               i);
+
+               ret = op(stream, subbuffer, ctx);
+               if (ret) {
+                       goto end;
                }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_flush_buffer(stream, producer_active);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
        }
-
 end:
        return ret;
 }
 
-static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
-{
-       int ret;
-       enum open_packet_status status;
-       unsigned long produced_pos_before, produced_pos_after;
-
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = lttng_consumer_get_produced_snapshot(
-                       stream, &produced_pos_before);
-       if (ret < 0) {
-               ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = consumer_flush_buffer(stream, 0);
-       if (ret) {
-               ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
-       if (ret < 0) {
-               ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
-                               ", channel name = %s, session id = %" PRIu64,
-                               stream->key, stream->chan->name,
-                               stream->chan->session_id);
-               status = OPEN_PACKET_STATUS_ERROR;
-               goto end;
-       }
-
-       /*
-        * Determine if the flush had an effect by comparing the produced
-        * positons before and after the flush.
-        */
-       status = produced_pos_before != produced_pos_after ?
-                       OPEN_PACKET_STATUS_OPENED :
-                       OPEN_PACKET_STATUS_NO_SPACE;
-       if (status == OPEN_PACKET_STATUS_OPENED) {
-               stream->opened_packet_in_current_trace_chunk = true;
-       }
-end:
-       return status;
-}
-
-static bool stream_is_rotating_to_null_chunk(
-               const struct lttng_consumer_stream *stream)
-{
-       bool rotating_to_null_chunk = false;
-
-       if (stream->rotate_position == -1ULL) {
-               /* No rotation ongoing. */
-               goto end;
-       }
-
-       if (stream->trace_chunk == stream->chan->trace_chunk ||
-                       !stream->chan->trace_chunk) {
-               rotating_to_null_chunk = true;
-       }
-end:
-       return rotating_to_null_chunk;
-}
-
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx,
                bool locked_by_caller)
@@ -3525,11 +3408,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       if (stream->read_subbuffer_ops.post_consume) {
-               ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
-               if (ret) {
-                       goto end;
-               }
+       ret = post_consume(stream, &subbuffer, ctx);
+       if (ret) {
+               goto end;
        }
 
        /*
@@ -3547,50 +3428,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ERR("Stream rotation error after consuming data");
                        goto end;
                }
+
        } else if (rotation_ret < 0) {
                ret = rotation_ret;
                ERR("Failed to check if stream was ready to rotate after consuming data");
                goto end;
        }
 
-       /*
-        * TODO roll into a post_consume op as this doesn't apply to metadata
-        * streams.
-        */
-       if (!stream->opened_packet_in_current_trace_chunk &&
-                       stream->trace_chunk && !stream->metadata_flag &&
-                       !stream_is_rotating_to_null_chunk(stream)) {
-               const enum open_packet_status status = open_packet(stream);
-
-               switch (status) {
-               case OPEN_PACKET_STATUS_OPENED:
-                       DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
-                                       ", channel name = %s, session id = %" PRIu64,
-                                       stream->key, stream->chan->name,
-                                       stream->chan->session_id);
-                       break;
-               case OPEN_PACKET_STATUS_NO_SPACE:
-                       /*
-                        * Can't open a packet as there is no space left.
-                        * This means that new events were produced, resulting
-                        * in a packet being opened, which is what we wanted
-                        * anyhow.
-                        */
-                       DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
-                                       ", channel name = %s, session id = %" PRIu64,
-                                       stream->key, stream->chan->name,
-                                       stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk = true;
-                       break;
-               case OPEN_PACKET_STATUS_ERROR:
-                       /* Logged by callee. */
-                       ret = -1;
-                       goto end;
-               default:
-                       abort();
-               }
-       }
-
 sleep_stream:
        if (stream->read_subbuffer_ops.on_sleep) {
                stream->read_subbuffer_ops.on_sleep(stream, ctx);
@@ -4148,7 +3992,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * ensures we have at least one packet in each stream per trace
                         * chunk, even if no data was produced.
                         */
-                       ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+                       ret = consumer_stream_flush_buffer(
+                                       stream, stream->metadata_flag ? 1 : 0);
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                                stream->key);
@@ -4284,17 +4129,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
                         */
-                       const enum open_packet_status status =
-                               open_packet(stream);
+                       const enum consumer_stream_open_packet_status status =
+                               consumer_stream_open_packet(stream);
 
                        switch (status) {
-                       case OPEN_PACKET_STATUS_OPENED:
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                                DBG("Opened a packet after a rotation: stream id = %" PRIu64
                                                ", channel name = %s, session id = %" PRIu64,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                break;
-                       case OPEN_PACKET_STATUS_NO_SPACE:
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
                                /*
                                 * Can't open a packet as there is no space left
                                 * in the buffer. A new packet will be opened
@@ -4305,7 +4150,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                break;
-                       case OPEN_PACKET_STATUS_ERROR:
+                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                                /* Logged by callee. */
                                ret = -1;
                                goto end_unlock_stream;
@@ -4412,7 +4257,7 @@ int consumer_clear_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
-       ret = consumer_flush_buffer(stream, 1);
+       ret = consumer_stream_flush_buffer(stream, 1);
        if (ret < 0) {
                ERR("Failed to flush stream %" PRIu64 " during channel clear",
                                stream->key);
@@ -5184,29 +5029,29 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(
 
        rcu_read_lock();
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               enum open_packet_status status;
+               enum consumer_stream_open_packet_status status;
 
                pthread_mutex_lock(&stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
                        goto next;
                }
 
-               status = open_packet(stream);
+               status = consumer_stream_open_packet(stream);
                switch (status) {
-               case OPEN_PACKET_STATUS_OPENED:
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                        DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                        stream->opened_packet_in_current_trace_chunk = true;
                        break;
-               case OPEN_PACKET_STATUS_NO_SPACE:
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
                        DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                        break;
-               case OPEN_PACKET_STATUS_ERROR:
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                        /*
                         * Only unexpected internal errors can lead to this
                         * failing. Report an unknown error.
This page took 0.03144 seconds and 4 git commands to generate.