Clean-up: consumer: fix -Wshadow error in lttng_consumer_rotate_channel
[lttng-tools.git] / src / common / consumer / consumer.c
index d45771e7eb19fb45123041736853143704db3aff..20f4deb29bf9f61cfbea4b64b3373605948ff357 100644 (file)
@@ -877,6 +877,43 @@ error:
        return outfd;
 }
 
+/*
+ * Write a character on the metadata poll pipe to wake the metadata thread.
+ * Returns 0 on success, -1 on error.
+ */
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
+{
+       int ret = 0;
+
+       DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
+                       channel->name);
+       if (channel->monitor && channel->metadata_stream) {
+               const char dummy = 'c';
+               const ssize_t write_ret = lttng_write(
+                               channel->metadata_stream->ust_metadata_poll_pipe[1],
+                               &dummy, 1);
+
+               if (write_ret < 1) {
+                       if (errno == EWOULDBLOCK) {
+                               /*
+                                * This is fine, the metadata poll thread
+                                * is having a hard time keeping-up, but
+                                * it will eventually wake-up and consume
+                                * the available data.
+                                */
+                               ret = 0;
+                       } else {
+                               PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Trigger a dump of the metadata content. Following/during the succesful
  * completion of this call, the metadata poll thread will start receiving
@@ -3253,7 +3290,7 @@ void *consumer_thread_sessiond_poll(void *data)
                        err = 0;        /* All is OK */
                        goto end;
                }
-               DBG("received command on sock");
+               DBG("Received command on sock");
        }
        /* All is OK */
        err = 0;
@@ -3383,24 +3420,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
        written_bytes = stream->read_subbuffer_ops.consume_subbuffer(
                        ctx, stream, &subbuffer);
-       /*
-        * Should write subbuf_size amount of data when network streaming or
-        * the full padded size when we are not streaming.
-        */
-       if ((written_bytes != subbuffer.info.data.subbuf_size &&
-                           stream->net_seq_idx != (uint64_t) -1ULL) ||
-                       (written_bytes != subbuffer.info.data.padded_subbuf_size &&
-                                       stream->net_seq_idx ==
-                                                       (uint64_t) -1ULL)) {
-               /*
-                * Display the error but continue processing to try to
-                * release the subbuffer. This is a DBG statement
-                * since this can happen without being a critical
-                * error.
-                */
-               DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)",
-                               written_bytes, subbuffer.info.data.padded_subbuf_size,
-                               subbuffer.info.data.subbuf_size);
+       if (written_bytes <= 0) {
+               ERR("Error consuming subbuffer: (%zd)", written_bytes);
+               ret = (int) written_bytes;
+               goto error_put_subbuf;
        }
 
        ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
@@ -3923,6 +3946,36 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
+/* Stream lock must be held by the caller. */
+static int sample_stream_positions(struct lttng_consumer_stream *stream,
+                           unsigned long *produced, unsigned long *consumed)
+{
+       int ret;
+
+       ASSERT_LOCKED(stream->lock);
+
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Failed to sample snapshot positions");
+               goto end;
+       }
+
+       ret = lttng_consumer_get_produced_snapshot(stream, produced);
+       if (ret < 0) {
+               ERR("Failed to sample produced position");
+               goto end;
+       }
+
+       ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
+       if (ret < 0) {
+               ERR("Failed to sample consumed position");
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
@@ -3985,19 +4038,96 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                }
 
                /*
-                * Do not flush an empty packet when rotating from a NULL trace
+                * Do not flush a packet when rotating from a NULL trace
                 * chunk. The stream has no means to output data, and the prior
-                * rotation which rotated to NULL performed that side-effect already.
+                * rotation which rotated to NULL performed that side-effect
+                * already. No new data can be produced when a stream has no
+                * associated trace chunk (e.g. a stop followed by a rotate).
                 */
                if (stream->trace_chunk) {
+                       bool flush_active;
+
+                       if (stream->metadata_flag) {
+                               /*
+                                * Don't produce an empty metadata packet,
+                                * simply close the current one.
+                                *
+                                * Metadata is regenerated on every trace chunk
+                                * switch; there is no concern that no data was
+                                * produced.
+                                */
+                               flush_active = true;
+                       } else {
+                               /*
+                                * Only flush an empty packet if the "packet
+                                * open" could not be performed on transition
+                                * to a new trace chunk and no packets were
+                                * consumed within the chunk's lifetime.
+                                */
+                               if (stream->opened_packet_in_current_trace_chunk) {
+                                       flush_active = true;
+                               } else {
+                                       /*
+                                        * Stream could have been full at the
+                                        * time of rotation, but then have had
+                                        * no activity at all.
+                                        *
+                                        * It is important to flush a packet
+                                        * to prevent 0-length files from being
+                                        * produced as most viewers choke on
+                                        * them.
+                                        *
+                                        * Unfortunately viewers will not be
+                                        * able to know that tracing was active
+                                        * for this stream during this trace
+                                        * chunk's lifetime.
+                                        */
+                                       ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+                                       if (ret) {
+                                               goto end_unlock_stream;
+                                       }
+
+                                       /*
+                                        * Don't flush an empty packet if data
+                                        * was produced; it will be consumed
+                                        * before the rotation completes.
+                                        */
+                                       flush_active = produced_pos != consumed_pos;
+                                       if (!flush_active) {
+                                               const char *trace_chunk_name;
+                                               uint64_t trace_chunk_id;
+
+                                               chunk_status = lttng_trace_chunk_get_name(
+                                                               stream->trace_chunk,
+                                                               &trace_chunk_name,
+                                                               NULL);
+                                               if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
+                                                       trace_chunk_name = "none";
+                                               }
+
+                                               /*
+                                                * Consumer trace chunks are
+                                                * never anonymous.
+                                                */
+                                               chunk_status = lttng_trace_chunk_get_id(
+                                                               stream->trace_chunk,
+                                                               &trace_chunk_id);
+                                               assert(chunk_status ==
+                                                               LTTNG_TRACE_CHUNK_STATUS_OK);
+
+                                               DBG("Unable to open packet for stream during trace chunk's lifetime. "
+                                                               "Flushing an empty packet to prevent an empty file from being created: "
+                                                               "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+                                                               stream->key, trace_chunk_name, trace_chunk_id);
+                                       }
+                               }
+                       }
+
                        /*
-                        * For metadata stream, do an active flush, which does not
-                        * produce empty packets. For data streams, empty-flush;
-                        * ensures we have at least one packet in each stream per trace
-                        * chunk, even if no data was produced.
+                        * Close the current packet before sampling the
+                        * ring buffer positions.
                         */
-                       ret = consumer_stream_flush_buffer(
-                                       stream, stream->metadata_flag ? 1 : 0);
+                       ret = consumer_stream_flush_buffer(stream, flush_active);
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                                stream->key);
@@ -4612,9 +4742,9 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
                } else {
                        relayd_id_str = "(formatting error)";
                }
-        }
+       }
 
-        /* Local protocol error. */
+       /* Local protocol error. */
        assert(chunk_creation_timestamp);
        ret = time_to_iso8601_str(chunk_creation_timestamp,
                        creation_timestamp_buffer,
@@ -4923,7 +5053,7 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                } else {
                        relayd_id_str = "(formatting error)";
                }
-        }
+       }
 
        DBG("Consumer trace chunk exists command: relayd_id = %s"
                        ", chunk_id = %" PRIu64, relayd_id_str,
This page took 0.026707 seconds and 4 git commands to generate.