Fix: consumerd: double unlock on rotate channel error path
[lttng-tools.git] / src / common / consumer / consumer.c
index cbd99e7ec3ab08146142706383085b2933e314ba..d45771e7eb19fb45123041736853143704db3aff 100644 (file)
@@ -3310,11 +3310,34 @@ error_testpoint:
        return NULL;
 }
 
+static int post_consume(struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
+{
+       size_t i;
+       int ret = 0;
+       const size_t count = lttng_dynamic_array_get_count(
+                       &stream->read_subbuffer_ops.post_consume_cbs);
+
+       for (i = 0; i < count; i++) {
+               const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element(
+                               &stream->read_subbuffer_ops.post_consume_cbs,
+                               i);
+
+               ret = op(stream, subbuffer, ctx);
+               if (ret) {
+                       goto end;
+               }
+       }
+end:
+       return ret;
+}
+
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx,
                bool locked_by_caller)
 {
-       ssize_t ret, written_bytes;
+       ssize_t ret, written_bytes = 0;
        int rotation_ret;
        struct stream_subbuffer subbuffer = {};
 
@@ -3376,8 +3399,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 * error.
                 */
                DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)",
-                               written_bytes, subbuffer.info.data.subbuf_size,
-                               subbuffer.info.data.padded_subbuf_size);
+                               written_bytes, subbuffer.info.data.padded_subbuf_size,
+                               subbuffer.info.data.subbuf_size);
        }
 
        ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
@@ -3385,11 +3408,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       if (stream->read_subbuffer_ops.post_consume) {
-               ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
-               if (ret) {
-                       goto end;
-               }
+       ret = post_consume(stream, &subbuffer, ctx);
+       if (ret) {
+               goto end;
        }
 
        /*
@@ -3407,13 +3428,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ERR("Stream rotation error after consuming data");
                        goto end;
                }
+
        } else if (rotation_ret < 0) {
                ret = rotation_ret;
                ERR("Failed to check if stream was ready to rotate after consuming data");
                goto end;
        }
 
-stream_sleep:
+sleep_stream:
        if (stream->read_subbuffer_ops.on_sleep) {
                stream->read_subbuffer_ops.on_sleep(stream, ctx);
        }
@@ -3901,50 +3923,6 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
-{
-       int ret = 0;
-
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (producer_active) {
-                       ret = kernctl_buffer_flush(stream->wait_fd);
-                       if (ret < 0) {
-                               ERR("Failed to flush kernel stream");
-                               goto end;
-                       }
-               } else {
-                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
-                       if (ret < 0) {
-                               /*
-                                * Doing a buffer flush which does not take into
-                                * account empty packets. This is not perfect,
-                                * but required as a fall-back when
-                                * "flush_empty" is not implemented by
-                                * lttng-modules.
-                                */
-                               ret = kernctl_buffer_flush(stream->wait_fd);
-                               if (ret < 0) {
-                                       ERR("Failed to flush kernel stream");
-                                       goto end;
-                               }
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_flush_buffer(stream, producer_active);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               abort();
-       }
-
-end:
-       return ret;
-}
-
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
@@ -3968,11 +3946,15 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        const bool is_local_trace = relayd_id == -1ULL;
        struct consumer_relayd_sock_pair *relayd = NULL;
        bool rotating_to_new_chunk = true;
+       /* Array of `struct lttng_consumer_stream *` */
+       struct lttng_dynamic_pointer_array streams_packet_to_open;
+       size_t stream_idx;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        lttng_dynamic_array_init(&stream_rotation_positions,
                        sizeof(struct relayd_stream_rotation_position), NULL);
+       lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
 
        rcu_read_lock();
 
@@ -4014,7 +3996,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * ensures we have at least one packet in each stream per trace
                         * chunk, even if no data was produced.
                         */
-                       ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+                       ret = consumer_stream_flush_buffer(
+                                       stream, stream->metadata_flag ? 1 : 0);
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                                stream->key);
@@ -4102,36 +4085,135 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        }
                        stream_count++;
                }
+
+               stream->opened_packet_in_current_trace_chunk = false;
+
+               if (rotating_to_new_chunk && !stream->metadata_flag) {
+                       /*
+                        * Attempt to flush an empty packet as close to the
+                        * rotation point as possible. In the event where a
+                        * stream remains inactive after the rotation point,
+                        * this ensures that the new trace chunk has a
+                        * beginning timestamp set at the begining of the
+                        * trace chunk instead of only creating an empty
+                        * packet when the trace chunk is stopped.
+                        *
+                        * This indicates to the viewers that the stream
+                        * was being recorded, but more importantly it
+                        * allows viewers to determine a useable trace
+                        * intersection.
+                        *
+                        * This presents a problem in the case where the
+                        * ring-buffer is completely full.
+                        *
+                        * Consider the following scenario:
+                        *   - The consumption of data is slow (slow network,
+                        *     for instance),
+                        *   - The ring buffer is full,
+                        *   - A rotation is initiated,
+                        *     - The flush below does nothing (no space left to
+                        *       open a new packet),
+                        *   - The other streams rotate very soon, and new
+                        *     data is produced in the new chunk,
+                        *   - This stream completes its rotation long after the
+                        *     rotation was initiated
+                        *   - The session is stopped before any event can be
+                        *     produced in this stream's buffers.
+                        *
+                        * The resulting trace chunk will have a single packet
+                        * temporaly at the end of the trace chunk for this
+                        * stream making the stream intersection more narrow
+                        * than it should be.
+                        *
+                        * To work-around this, an empty flush is performed
+                        * after the first consumption of a packet during a
+                        * rotation if open_packet fails. The idea is that
+                        * consuming a packet frees enough space to switch
+                        * packets in this scenario and allows the tracer to
+                        * "stamp" the beginning of the new trace chunk at the
+                        * earliest possible point.
+                        *
+                        * The packet open is performed after the channel
+                        * rotation to ensure that no attempt to open a packet
+                        * is performed in a stream that has no active trace
+                        * chunk.
+                        */
+                       ret = lttng_dynamic_pointer_array_add_pointer(
+                                       &streams_packet_to_open, stream);
+                       if (ret) {
+                               PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
+                               ret = -1;
+                               goto end_unlock_stream;
+                       }
+               }
+
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
-       pthread_mutex_unlock(&channel->lock);
 
-       if (is_local_trace) {
-               ret = 0;
-               goto end;
-       }
+       if (!is_local_trace) {
+               relayd = consumer_find_relayd(relayd_id);
+               if (!relayd) {
+                       ERR("Failed to find relayd %" PRIu64, relayd_id);
+                       ret = -1;
+                       goto end_unlock_channel;
+               }
 
-       relayd = consumer_find_relayd(relayd_id);
-       if (!relayd) {
-               ERR("Failed to find relayd %" PRIu64, relayd_id);
-               ret = -1;
-               goto end;
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                               rotating_to_new_chunk ? &next_chunk_id : NULL,
+                               (const struct relayd_stream_rotation_position *)
+                                               stream_rotation_positions.buffer
+                                                               .data);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                                       relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_unlock_channel;
+               }
        }
 
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
-                       rotating_to_new_chunk ? &next_chunk_id : NULL,
-                       (const struct relayd_stream_rotation_position *)
-                                       stream_rotation_positions.buffer.data);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
-                               relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-               goto end;
+       for (stream_idx = 0;
+                       stream_idx < lttng_dynamic_pointer_array_get_count(
+                               &streams_packet_to_open);
+                       stream_idx++) {
+               enum consumer_stream_open_packet_status status;
+
+               stream = lttng_dynamic_pointer_array_get_pointer(
+                               &streams_packet_to_open, stream_idx);
+
+               pthread_mutex_lock(&stream->lock);
+               status = consumer_stream_open_packet(stream);
+               pthread_mutex_unlock(&stream->lock);
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left
+                        * in the buffer. A new packet will be opened
+                        * once one has been consumed.
+                        */
+                       DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end_unlock_channel;
+               default:
+                       abort();
+               }
        }
 
+       pthread_mutex_unlock(&channel->lock);
        ret = 0;
        goto end;
 
@@ -4142,6 +4224,7 @@ end_unlock_channel:
 end:
        rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
+       lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
 }
 
@@ -4200,7 +4283,7 @@ int consumer_clear_stream(struct lttng_consumer_stream *stream)
 {
        int ret;
 
-       ret = consumer_flush_buffer(stream, 1);
+       ret = consumer_stream_flush_buffer(stream, 1);
        if (ret < 0) {
                ERR("Failed to flush stream %" PRIu64 " during channel clear",
                                stream->key);
@@ -4957,3 +5040,70 @@ int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
 end:
        return ret;
 }
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+               struct lttng_consumer_channel *channel)
+{
+       struct lttng_consumer_stream *stream;
+       enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+       if (channel->metadata_stream) {
+               ERR("Open channel packets command attempted on a metadata channel");
+               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end;
+       }
+
+       rcu_read_lock();
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               enum consumer_stream_open_packet_status status;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               status = consumer_stream_open_packet(stream);
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /*
+                        * Only unexpected internal errors can lead to this
+                        * failing. Report an unknown error.
+                        */
+                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel id = %" PRIu64
+                                       ", channel name = %s"
+                                       ", session id = %" PRIu64,
+                                       stream->key, channel->key,
+                                       channel->name, channel->session_id);
+                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                       goto error_unlock;
+               default:
+                       abort();
+               }
+
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
+       return ret;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       goto end_rcu_unlock;
+}
This page took 0.027528 seconds and 4 git commands to generate.