Clean-up: consumer: fix -Wshadow error in lttng_consumer_rotate_channel
[lttng-tools.git] / src / common / consumer / consumer.c
index b2c5eb3ddbd1484e5aa2f44f64bc48f17a8dceda..20f4deb29bf9f61cfbea4b64b3373605948ff357 100644 (file)
@@ -877,6 +877,43 @@ error:
        return outfd;
 }
 
+/*
+ * Write a character on the metadata poll pipe to wake the metadata thread.
+ * Returns 0 on success, -1 on error.
+ */
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
+{
+       int ret = 0;
+
+       DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
+                       channel->name);
+       if (channel->monitor && channel->metadata_stream) {
+               const char dummy = 'c';
+               const ssize_t write_ret = lttng_write(
+                               channel->metadata_stream->ust_metadata_poll_pipe[1],
+                               &dummy, 1);
+
+               if (write_ret < 1) {
+                       if (errno == EWOULDBLOCK) {
+                               /*
+                                * This is fine, the metadata poll thread
+                                * is having a hard time keeping-up, but
+                                * it will eventually wake-up and consume
+                                * the available data.
+                                */
+                               ret = 0;
+                       } else {
+                               PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Trigger a dump of the metadata content. Following/during the succesful
  * completion of this call, the metadata poll thread will start receiving
@@ -3253,7 +3290,7 @@ void *consumer_thread_sessiond_poll(void *data)
                        err = 0;        /* All is OK */
                        goto end;
                }
-               DBG("received command on sock");
+               DBG("Received command on sock");
        }
        /* All is OK */
        err = 0;
@@ -3383,24 +3420,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
        written_bytes = stream->read_subbuffer_ops.consume_subbuffer(
                        ctx, stream, &subbuffer);
-       /*
-        * Should write subbuf_size amount of data when network streaming or
-        * the full padded size when we are not streaming.
-        */
-       if ((written_bytes != subbuffer.info.data.subbuf_size &&
-                           stream->net_seq_idx != (uint64_t) -1ULL) ||
-                       (written_bytes != subbuffer.info.data.padded_subbuf_size &&
-                                       stream->net_seq_idx ==
-                                                       (uint64_t) -1ULL)) {
-               /*
-                * Display the error but continue processing to try to
-                * release the subbuffer. This is a DBG statement
-                * since this can happen without being a critical
-                * error.
-                */
-               DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)",
-                               written_bytes, subbuffer.info.data.padded_subbuf_size,
-                               subbuffer.info.data.subbuf_size);
+       if (written_bytes <= 0) {
+               ERR("Error consuming subbuffer: (%zd)", written_bytes);
+               ret = (int) written_bytes;
+               goto error_put_subbuf;
        }
 
        ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
@@ -3923,6 +3946,36 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
+/* Stream lock must be held by the caller. */
+static int sample_stream_positions(struct lttng_consumer_stream *stream,
+                           unsigned long *produced, unsigned long *consumed)
+{
+       int ret;
+
+       ASSERT_LOCKED(stream->lock);
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to sample snapshot positions");
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(stream, produced);
+       if (ret < 0) {
+               ERR("Failed to sample produced position");
+               goto end;
+       }
+
+       ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
+       if (ret < 0) {
+               ERR("Failed to sample consumed position");
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
@@ -3946,11 +3999,15 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        const bool is_local_trace = relayd_id == -1ULL;
        struct consumer_relayd_sock_pair *relayd = NULL;
        bool rotating_to_new_chunk = true;
+       /* Array of `struct lttng_consumer_stream *` */
+       struct lttng_dynamic_pointer_array streams_packet_to_open;
+       size_t stream_idx;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        lttng_dynamic_array_init(&stream_rotation_positions,
                        sizeof(struct relayd_stream_rotation_position), NULL);
+       lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
 
        rcu_read_lock();
 
@@ -3981,19 +4038,96 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                }
 
                /*
-                * Do not flush an empty packet when rotating from a NULL trace
+                * Do not flush a packet when rotating from a NULL trace
                 * chunk. The stream has no means to output data, and the prior
-                * rotation which rotated to NULL performed that side-effect already.
+                * rotation which rotated to NULL performed that side-effect
+                * already. No new data can be produced when a stream has no
+                * associated trace chunk (e.g. a stop followed by a rotate).
                 */
                if (stream->trace_chunk) {
+                       bool flush_active;
+
+                       if (stream->metadata_flag) {
+                               /*
+                                * Don't produce an empty metadata packet,
+                                * simply close the current one.
+                                *
+                                * Metadata is regenerated on every trace chunk
+                                * switch; there is no concern that no data was
+                                * produced.
+                                */
+                               flush_active = true;
+                       } else {
+                               /*
+                                * Only flush an empty packet if the "packet
+                                * open" could not be performed on transition
+                                * to a new trace chunk and no packets were
+                                * consumed within the chunk's lifetime.
+                                */
+                               if (stream->opened_packet_in_current_trace_chunk) {
+                                       flush_active = true;
+                               } else {
+                                       /*
+                                        * Stream could have been full at the
+                                        * time of rotation, but then have had
+                                        * no activity at all.
+                                        *
+                                        * It is important to flush a packet
+                                        * to prevent 0-length files from being
+                                        * produced as most viewers choke on
+                                        * them.
+                                        *
+                                        * Unfortunately viewers will not be
+                                        * able to know that tracing was active
+                                        * for this stream during this trace
+                                        * chunk's lifetime.
+                                        */
+                                       ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+                                       if (ret) {
+                                               goto end_unlock_stream;
+                                       }
+
+                                       /*
+                                        * Don't flush an empty packet if data
+                                        * was produced; it will be consumed
+                                        * before the rotation completes.
+                                        */
+                                       flush_active = produced_pos != consumed_pos;
+                                       if (!flush_active) {
+                                               const char *trace_chunk_name;
+                                               uint64_t trace_chunk_id;
+
+                                               chunk_status = lttng_trace_chunk_get_name(
+                                                               stream->trace_chunk,
+                                                               &trace_chunk_name,
+                                                               NULL);
+                                               if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
+                                                       trace_chunk_name = "none";
+                                               }
+
+                                               /*
+                                                * Consumer trace chunks are
+                                                * never anonymous.
+                                                */
+                                               chunk_status = lttng_trace_chunk_get_id(
+                                                               stream->trace_chunk,
+                                                               &trace_chunk_id);
+                                               assert(chunk_status ==
+                                                               LTTNG_TRACE_CHUNK_STATUS_OK);
+
+                                               DBG("Unable to open packet for stream during trace chunk's lifetime. "
+                                                               "Flushing an empty packet to prevent an empty file from being created: "
+                                                               "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+                                                               stream->key, trace_chunk_name, trace_chunk_id);
+                                       }
+                               }
+                       }
+
                        /*
-                        * For metadata stream, do an active flush, which does not
-                        * produce empty packets. For data streams, empty-flush;
-                        * ensures we have at least one packet in each stream per trace
-                        * chunk, even if no data was produced.
+                        * Close the current packet before sampling the
+                        * ring buffer positions.
                         */
-                       ret = consumer_stream_flush_buffer(
-                                       stream, stream->metadata_flag ? 1 : 0);
+                       ret = consumer_stream_flush_buffer(stream, flush_active);
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                                stream->key);
@@ -4128,67 +4262,88 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * packets in this scenario and allows the tracer to
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
+                        *
+                        * The packet open is performed after the channel
+                        * rotation to ensure that no attempt to open a packet
+                        * is performed in a stream that has no active trace
+                        * chunk.
                         */
-                       const enum consumer_stream_open_packet_status status =
-                               consumer_stream_open_packet(stream);
-
-                       switch (status) {
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                               DBG("Opened a packet after a rotation: stream id = %" PRIu64
-                                               ", channel name = %s, session id = %" PRIu64,
-                                               stream->key, stream->chan->name,
-                                               stream->chan->session_id);
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                               /*
-                                * Can't open a packet as there is no space left
-                                * in the buffer. A new packet will be opened
-                                * once one has been consumed.
-                                */
-                               DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
-                                               ", channel name = %s, session id = %" PRIu64,
-                                               stream->key, stream->chan->name,
-                                               stream->chan->session_id);
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                               /* Logged by callee. */
+                       ret = lttng_dynamic_pointer_array_add_pointer(
+                                       &streams_packet_to_open, stream);
+                       if (ret) {
+                               PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
                                ret = -1;
                                goto end_unlock_stream;
-                       default:
-                               abort();
                        }
                }
 
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
-       pthread_mutex_unlock(&channel->lock);
 
-       if (is_local_trace) {
-               ret = 0;
-               goto end;
-       }
+       if (!is_local_trace) {
+               relayd = consumer_find_relayd(relayd_id);
+               if (!relayd) {
+                       ERR("Failed to find relayd %" PRIu64, relayd_id);
+                       ret = -1;
+                       goto end_unlock_channel;
+               }
 
-       relayd = consumer_find_relayd(relayd_id);
-       if (!relayd) {
-               ERR("Failed to find relayd %" PRIu64, relayd_id);
-               ret = -1;
-               goto end;
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                               rotating_to_new_chunk ? &next_chunk_id : NULL,
+                               (const struct relayd_stream_rotation_position *)
+                                               stream_rotation_positions.buffer
+                                                               .data);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                                       relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_unlock_channel;
+               }
        }
 
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
-                       rotating_to_new_chunk ? &next_chunk_id : NULL,
-                       (const struct relayd_stream_rotation_position *)
-                                       stream_rotation_positions.buffer.data);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
-                               relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-               goto end;
+       for (stream_idx = 0;
+                       stream_idx < lttng_dynamic_pointer_array_get_count(
+                               &streams_packet_to_open);
+                       stream_idx++) {
+               enum consumer_stream_open_packet_status status;
+
+               stream = lttng_dynamic_pointer_array_get_pointer(
+                               &streams_packet_to_open, stream_idx);
+
+               pthread_mutex_lock(&stream->lock);
+               status = consumer_stream_open_packet(stream);
+               pthread_mutex_unlock(&stream->lock);
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left
+                        * in the buffer. A new packet will be opened
+                        * once one has been consumed.
+                        */
+                       DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end_unlock_channel;
+               default:
+                       abort();
+               }
        }
 
+       pthread_mutex_unlock(&channel->lock);
        ret = 0;
        goto end;
 
@@ -4199,6 +4354,7 @@ end_unlock_channel:
 end:
        rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
+       lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
 }
 
@@ -4586,9 +4742,9 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                } else {
                        relayd_id_str = "(formatting error)";
                }
-        }
+       }
 
-        /* Local protocol error. */
+       /* Local protocol error. */
        assert(chunk_creation_timestamp);
        ret = time_to_iso8601_str(chunk_creation_timestamp,
                        creation_timestamp_buffer,
@@ -4897,7 +5053,7 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                } else {
                        relayd_id_str = "(formatting error)";
                }
-        }
+       }
 
        DBG("Consumer trace chunk exists command: relayd_id = %s"
                        ", chunk_id = %" PRIu64, relayd_id_str,
This page took 0.028158 seconds and 4 git commands to generate.