+
+/* Stream lock must be held by the caller. */
+static int sample_stream_positions(struct lttng_consumer_stream *stream,
+ unsigned long *produced, unsigned long *consumed)
+{
+ int ret;
+
+ ASSERT_LOCKED(stream->lock);
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to sample snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, produced);
+ if (ret < 0) {
+ ERR("Failed to sample produced position");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, consumed);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position");
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Sample the rotate position for all the streams of a channel. If a stream
+ * is already at the rotate position (produced == consumed), we flag it as
+ * ready for rotation. The rotation of ready streams occurs after we have
+ * replied to the session daemon that we have finished sampling the positions.
+ * Must be called with RCU read-side lock held to ensure existence of channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
+ uint64_t key, uint64_t relayd_id, uint32_t metadata,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
+ /* Array of `struct lttng_consumer_stream *` */
+ struct lttng_dynamic_pointer_array streams_packet_to_open;
+ size_t stream_idx;
+
+ DBG("Consumer sample rotate position for channel %" PRIu64, key);
+
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
+ lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
+
+ rcu_read_lock();
+
+ pthread_mutex_lock(&channel->lock);
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ unsigned long produced_pos = 0, consumed_pos = 0;
+
+ health_code_update();
+
+ /*
+ * Lock stream because we are about to change its state.
+ */
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
+ /*
+ * Do not flush a packet when rotating from a NULL trace
+ * chunk. The stream has no means to output data, and the prior
+ * rotation which rotated to NULL performed that side-effect
+ * already. No new data can be produced when a stream has no
+ * associated trace chunk (e.g. a stop followed by a rotate).
+ */
+ if (stream->trace_chunk) {
+ bool flush_active;
+
+ if (stream->metadata_flag) {
+ /*
+ * Don't produce an empty metadata packet,
+ * simply close the current one.
+ *
+ * Metadata is regenerated on every trace chunk
+ * switch; there is no concern that no data was
+ * produced.
+ */
+ flush_active = true;
+ } else {
+ /*
+ * Only flush an empty packet if the "packet
+ * open" could not be performed on transition
+ * to a new trace chunk and no packets were
+ * consumed within the chunk's lifetime.
+ */
+ if (stream->opened_packet_in_current_trace_chunk) {
+ flush_active = true;
+ } else {
+ /*
+ * Stream could have been full at the
+ * time of rotation, but then have had
+ * no activity at all.
+ *
+ * It is important to flush a packet
+ * to prevent 0-length files from being
+ * produced as most viewers choke on
+ * them.
+ *
+ * Unfortunately viewers will not be
+ * able to know that tracing was active
+ * for this stream during this trace
+ * chunk's lifetime.
+ */
+ ret = sample_stream_positions(stream, &produced_pos, &consumed_pos);
+ if (ret) {
+ goto end_unlock_stream;
+ }
+
+ /*
+ * Don't flush an empty packet if data
+ * was produced; it will be consumed
+ * before the rotation completes.
+ */
+ flush_active = produced_pos != consumed_pos;
+ if (!flush_active) {
+ const char *trace_chunk_name;
+ uint64_t trace_chunk_id;
+
+ chunk_status = lttng_trace_chunk_get_name(
+ stream->trace_chunk,
+ &trace_chunk_name,
+ NULL);
+ if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
+ trace_chunk_name = "none";
+ }
+
+ /*
+ * Consumer trace chunks are
+ * never anonymous.
+ */
+ chunk_status = lttng_trace_chunk_get_id(
+ stream->trace_chunk,
+ &trace_chunk_id);
+ assert(chunk_status ==
+ LTTNG_TRACE_CHUNK_STATUS_OK);
+
+ DBG("Unable to open packet for stream during trace chunk's lifetime. "
+ "Flushing an empty packet to prevent an empty file from being created: "
+ "stream id = %" PRIu64 ", trace chunk name = `%s`, trace chunk id = %" PRIu64,
+ stream->key, trace_chunk_name, trace_chunk_id);
+ }
+ }
+ }
+
+ /*
+ * Close the current packet before sampling the
+ * ring buffer positions.
+ */
+ ret = consumer_stream_flush_buffer(stream, flush_active);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+ stream->key);
+ goto end_unlock_stream;
+ }
+ }
+
+ ret = lttng_consumer_take_snapshot(stream);
+ if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+ ERR("Failed to sample snapshot position during channel rotation");
+ goto end_unlock_stream;
+ }
+ if (!ret) {
+ ret = lttng_consumer_get_produced_snapshot(stream,
+ &produced_pos);
+ if (ret < 0) {
+ ERR("Failed to sample produced position during channel rotation");
+ goto end_unlock_stream;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream,
+ &consumed_pos);
+ if (ret < 0) {
+ ERR("Failed to sample consumed position during channel rotation");
+ goto end_unlock_stream;
+ }
+ }
+ /*
+ * Align produced position on the start-of-packet boundary of the first
+ * packet going into the next trace chunk.
+ */
+ produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+ if (consumed_pos == produced_pos) {
+ DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ stream->rotate_ready = true;
+ } else {
+ DBG("Different consumed and produced positions "
+ "for stream %" PRIu64 " produced = %lu consumed = %lu",
+ stream->key, produced_pos, consumed_pos);
+ }
+ /*
+ * The rotation position is based on the packet_seq_num of the
+ * packet following the last packet that was consumed for this
+ * stream, incremented by the offset between produced and
+ * consumed positions. This rotation position is a lower bound
+ * (inclusive) at which the next trace chunk starts. Since it
+ * is a lower bound, it is OK if the packet_seq_num does not
+ * correspond exactly to the same packet identified by the
+ * consumed_pos, which can happen in overwrite mode.
+ */
+ if (stream->sequence_number_unavailable) {
+ /*
+ * Rotation should never be performed on a session which
+ * interacts with a pre-2.8 lttng-modules, which does
+ * not implement packet sequence number.
+ */
+ ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
+ stream->key);
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ stream->rotate_position = stream->last_sequence_number + 1 +
+ ((produced_pos - consumed_pos) / stream->max_sb_size);
+ DBG("Set rotation position for stream %" PRIu64 " at position %" PRIu64,
+ stream->key, stream->rotate_position);
+
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk".
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = stream->rotate_position,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
+
+ stream->opened_packet_in_current_trace_chunk = false;
+
+ if (rotating_to_new_chunk && !stream->metadata_flag) {
+ /*
+ * Attempt to flush an empty packet as close to the
+ * rotation point as possible. In the event where a
+ * stream remains inactive after the rotation point,
+ * this ensures that the new trace chunk has a
+ * beginning timestamp set at the begining of the
+ * trace chunk instead of only creating an empty
+ * packet when the trace chunk is stopped.
+ *
+ * This indicates to the viewers that the stream
+ * was being recorded, but more importantly it
+ * allows viewers to determine a useable trace
+ * intersection.
+ *
+ * This presents a problem in the case where the
+ * ring-buffer is completely full.
+ *
+ * Consider the following scenario:
+ * - The consumption of data is slow (slow network,
+ * for instance),
+ * - The ring buffer is full,
+ * - A rotation is initiated,
+ * - The flush below does nothing (no space left to
+ * open a new packet),
+ * - The other streams rotate very soon, and new
+ * data is produced in the new chunk,
+ * - This stream completes its rotation long after the
+ * rotation was initiated
+ * - The session is stopped before any event can be
+ * produced in this stream's buffers.
+ *
+ * The resulting trace chunk will have a single packet
+ * temporaly at the end of the trace chunk for this
+ * stream making the stream intersection more narrow
+ * than it should be.
+ *
+ * To work-around this, an empty flush is performed
+ * after the first consumption of a packet during a
+ * rotation if open_packet fails. The idea is that
+ * consuming a packet frees enough space to switch
+ * packets in this scenario and allows the tracer to
+ * "stamp" the beginning of the new trace chunk at the
+ * earliest possible point.
+ *
+ * The packet open is performed after the channel
+ * rotation to ensure that no attempt to open a packet
+ * is performed in a stream that has no active trace
+ * chunk.
+ */
+ ret = lttng_dynamic_pointer_array_add_pointer(
+ &streams_packet_to_open, stream);
+ if (ret) {
+ PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
+ ret = -1;
+ goto end_unlock_stream;
+ }
+ }
+
+ pthread_mutex_unlock(&stream->lock);
+ }
+ stream = NULL;
+
+ if (!is_local_trace) {
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end_unlock_channel;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer
+ .data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_unlock_channel;
+ }
+ }
+
+ for (stream_idx = 0;
+ stream_idx < lttng_dynamic_pointer_array_get_count(
+ &streams_packet_to_open);
+ stream_idx++) {
+ enum consumer_stream_open_packet_status status;
+
+ stream = lttng_dynamic_pointer_array_get_pointer(
+ &streams_packet_to_open, stream_idx);
+
+ pthread_mutex_lock(&stream->lock);
+ status = consumer_stream_open_packet(stream);
+ pthread_mutex_unlock(&stream->lock);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left
+ * in the buffer. A new packet will be opened
+ * once one has been consumed.
+ */
+ DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end_unlock_channel;
+ default:
+ abort();
+ }
+ }
+
+ pthread_mutex_unlock(&channel->lock);
+ ret = 0;
+ goto end;
+
+end_unlock_stream:
+ pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
+ pthread_mutex_unlock(&channel->lock);
+end:
+ rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
+ lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
+ return ret;
+}
+
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ unsigned long consumed_pos_before, consumed_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+
+ switch (the_consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ ret = kernctl_buffer_clear(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to clear kernel stream (ret = %d)", ret);
+ goto end;
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_clear_buffer(stream);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+ DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
+end:
+ return ret;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ret = consumer_stream_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel clear",
+ stream->key);
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ ret = consumer_clear_buffer(stream);
+ if (ret < 0) {
+ ERR("Failed to clear stream %" PRIu64 " during channel clear",
+ stream->key);
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+ return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ rcu_read_lock();
+ pthread_mutex_lock(&channel->lock);
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+ pthread_mutex_lock(&stream->lock);
+ ret = consumer_clear_stream(stream);
+ if (ret) {
+ goto error_unlock;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+ pthread_mutex_unlock(&channel->lock);
+ rcu_read_unlock();
+ return 0;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+ DBG("Check is rotate ready for stream %" PRIu64
+ " ready %u rotate_position %" PRIu64
+ " last_sequence_number %" PRIu64,
+ stream->key, stream->rotate_ready,
+ stream->rotate_position, stream->last_sequence_number);
+ if (stream->rotate_ready) {
+ return 1;
+ }
+
+ /*
+ * If packet seq num is unavailable, it means we are interacting
+ * with a pre-2.8 lttng-modules which does not implement the
+ * sequence number. Rotation should never be used by sessiond in this
+ * scenario.
+ */
+ if (stream->sequence_number_unavailable) {
+ ERR("Internal error: rotation used on stream %" PRIu64
+ " with unavailable sequence number",
+ stream->key);
+ return -1;
+ }
+
+ if (stream->rotate_position == -1ULL ||
+ stream->last_sequence_number == -1ULL) {
+ return 0;
+ }
+
+ /*
+ * Rotate position not reached yet. The stream rotate position is
+ * the position of the next packet belonging to the next trace chunk,
+ * but consumerd considers rotation ready when reaching the last
+ * packet of the current chunk, hence the "rotate_position - 1".
+ */
+
+ DBG("Check is rotate ready for stream %" PRIu64
+ " last_sequence_number %" PRIu64
+ " rotate_position %" PRIu64,
+ stream->key, stream->last_sequence_number,
+ stream->rotate_position);
+ if (stream->last_sequence_number >= stream->rotate_position - 1) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ * Reset the state for a stream after a rotation occurred.
+ */
+void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
+{
+ DBG("lttng_consumer_reset_stream_rotate_state for stream %" PRIu64,
+ stream->key);
+ stream->rotate_position = -1ULL;
+ stream->rotate_ready = false;
+}
+
+/*
+ * Perform the rotation a local stream file.
+ */
+static
+int rotate_local_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ DBG("Rotate local stream: stream key %" PRIu64 ", channel key %" PRIu64,
+ stream->key,
+ stream->chan->key);
+ stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = 0;
+
+ if (stream->out_fd >= 0) {
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("Failed to close stream out_fd of channel \"%s\"",
+ stream->chan->name);
+ }
+ stream->out_fd = -1;
+ }
+
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+
+ if (!stream->trace_chunk) {
+ goto end;
+ }
+
+ ret = consumer_stream_create_output_files(stream, true);
+end:
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the channel and stream locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ DBG("Consumer rotate stream %" PRIu64, stream->key);
+
+ /*
+ * Update the stream's 'current' chunk to the session's (channel)
+ * now-current chunk.
+ */
+ lttng_trace_chunk_put(stream->trace_chunk);
+ if (stream->chan->trace_chunk == stream->trace_chunk) {
+ /*
+ * A channel can be rotated and not have a "next" chunk
+ * to transition to. In that case, the channel's "current chunk"
+ * has not been closed yet, but it has not been updated to
+ * a "next" trace chunk either. Hence, the stream, like its
+ * parent channel, becomes part of no chunk and can't output
+ * anything until a new trace chunk is created.
+ */
+ stream->trace_chunk = NULL;
+ } else if (stream->chan->trace_chunk &&
+ !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
+ ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
+ ret = -1;
+ goto error;
+ } else {
+ /*
+ * Update the stream's trace chunk to its parent channel's
+ * current trace chunk.
+ */
+ stream->trace_chunk = stream->chan->trace_chunk;
+ }
+
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
+ ret = rotate_local_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
+ }
+ }
+
+ if (stream->metadata_flag && stream->trace_chunk) {
+ /*
+ * If the stream has transitioned to a new trace
+ * chunk, the metadata should be re-dumped to the
+ * newest chunk.
+ *
+ * However, it is possible for a stream to transition to
+ * a "no-chunk" state. This can happen if a rotation
+ * occurs on an inactive session. In such cases, the metadata
+ * regeneration will happen when the next trace chunk is
+ * created.
+ */
+ ret = consumer_metadata_stream_dump(stream);
+ if (ret) {
+ goto error;
+ }
+ }
+ lttng_consumer_reset_stream_rotate_state(stream);
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Rotate all the ready streams now.
+ *
+ * This is especially important for low throughput streams that have already
+ * been consumed, we cannot wait for their next packet to perform the
+ * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+
+ rcu_read_lock();
+
+ DBG("Consumer rotate ready streams in channel %" PRIu64, key);
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ health_code_update();
+
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
+
+ if (!stream->rotate_ready) {
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ continue;
+ }
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_init_command(
+ struct lttng_consumer_local_data *ctx,
+ const lttng_uuid sessiond_uuid)
+{
+ enum lttcomm_return_code ret;
+ char uuid_str[LTTNG_UUID_STR_LEN];
+
+ if (ctx->sessiond_uuid.is_set) {
+ ret = LTTCOMM_CONSUMERD_ALREADY_SET;
+ goto end;
+ }
+
+ ctx->sessiond_uuid.is_set = true;
+ memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+ lttng_uuid_to_str(sessiond_uuid, uuid_str);
+ DBG("Received session daemon UUID: %s", uuid_str);
+end:
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
+{
+ int ret;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
+ enum lttng_trace_chunk_status chunk_status;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ char creation_timestamp_buffer[ISO8601_STR_LEN];
+ const char *relayd_id_str = "(none)";
+ const char *creation_timestamp_str;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+
+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ /* Local protocol error. */
+ assert(chunk_creation_timestamp);
+ ret = time_to_iso8601_str(chunk_creation_timestamp,
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer :
+ "(formatting error)";
+
+ DBG("Consumer create trace chunk command: relay_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str, session_id, chunk_id,
+ chunk_override_name ? : "(none)",
+ creation_timestamp_str);
+
+ /*
+ * The trace chunk registry, as used by the consumer daemon, implicitly
+ * owns the trace chunks. This is only needed in the consumer since
+ * the consumer has no notion of a session beyond session IDs being
+ * used to identify other objects.
+ *
+ * The lttng_trace_chunk_registry_publish() call below provides a
+ * reference which is not released; it implicitly becomes the session
+ * daemon's reference to the chunk in the consumer daemon.
+ *
+ * The lifetime of trace chunks in the consumer daemon is managed by
+ * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
+ * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
+ */
+ created_chunk = lttng_trace_chunk_create(chunk_id,
+ chunk_creation_timestamp, NULL);
+ if (!created_chunk) {
+ ERR("Failed to create trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+
+ if (chunk_override_name) {
+ chunk_status = lttng_trace_chunk_override_name(created_chunk,
+ chunk_override_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+
+ if (chunk_directory_handle) {
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
+ credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk credentials");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ /*
+ * The consumer daemon has no ownership of the chunk output
+ * directory.
+ */
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
+ chunk_directory_handle);
+ chunk_directory_handle = NULL;
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk's directory handle");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+
+ published_chunk = lttng_trace_chunk_registry_publish_chunk(
+ the_consumer_data.chunk_registry, session_id,
+ created_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ created_chunk = NULL;
+ if (!published_chunk) {
+ ERR("Failed to publish trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(
+ the_consumer_data.channels_by_session_id_ht->ht,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id, &iter.iter, channel,
+ channels_by_session_id_ht_node.node) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel,
+ published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id, chunk_id,
+ chunk_creation_timestamp, NULL,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
+ }
+ }
+
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL, path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
+error:
+ /* Release the reference returned by the "publish" operation. */
+ lttng_trace_chunk_put(published_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
+{
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+ enum lttng_trace_chunk_status chunk_status;
+
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
+
+ DBG("Consumer close trace chunk command: relayd_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ the_consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
+ ERR("Failed to find chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ chunk_status = lttng_trace_chunk_set_close_timestamp(chunk,
+ chunk_close_timestamp);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ if (close_command) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, *close_command);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ /*
+ * chunk is now invalid to access as we no longer hold a reference to
+ * it; it is only kept around to compare it (by address) to the
+ * current chunk found in the session's channels.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(the_consumer_data.channel_ht->ht, &iter.iter,
+ channel, node.node) {
+ int ret;
+
+ /*
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
+ */
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
+ }
+
+ if (relayd_id) {
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_close_trace_chunk(
+ &relayd->control_sock, chunk,
+ path);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64,
+ *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
+end:
+ /*
+ * Release the reference returned by the "find" operation and
+ * the session daemon's implicit reference to the chunk.
+ */
+ lttng_trace_chunk_put(chunk);
+ lttng_trace_chunk_put(chunk);
+
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id)
+{
+ int ret;
+ enum lttcomm_return_code ret_code;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const bool is_local_trace = !relayd_id;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool chunk_exists_local, chunk_exists_remote;
+
+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ DBG("Consumer trace chunk exists command: relayd_id = %s"
+ ", chunk_id = %" PRIu64, relayd_id_str,
+ chunk_id);
+ ret = lttng_trace_chunk_registry_chunk_exists(
+ the_consumer_data.chunk_registry, session_id, chunk_id,
+ &chunk_exists_local);
+ if (ret) {
+ /* Internal error. */
+ ERR("Failed to query the existence of a trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
+ goto end;
+ }
+ DBG("Trace chunk %s locally",
+ chunk_exists_local ? "exists" : "does not exist");
+ if (chunk_exists_local) {
+ ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+ goto end;
+ } else if (is_local_trace) {
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(*relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, *relayd_id);
+ ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end_rcu_unlock;
+ }
+ DBG("Looking up existence of trace chunk on relay daemon");
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+ &chunk_exists_remote);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Failed to look-up the existence of trace chunk on relay daemon");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto end_rcu_unlock;
+ }
+
+ ret_code = chunk_exists_remote ?
+ LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
+ LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon",
+ chunk_exists_remote ? "exists" : "does not exist");
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret_code;
+}
+
+static
+int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
+{
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ int ret;
+
+ ht = the_consumer_data.stream_per_chan_id_ht;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key,
+ &iter.iter, stream, node_channel_id.node) {
+ /*
+ * Protect against teardown with mutex.
+ */
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+ ret = consumer_clear_stream(stream);
+ if (ret) {
+ goto error_unlock;
+ }
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+ rcu_read_unlock();
+ return LTTCOMM_CONSUMERD_SUCCESS;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ rcu_read_unlock();
+ return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+
+ DBG("Consumer clear channel %" PRIu64, channel->key);
+
+ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ /*
+ * Nothing to do for the metadata channel/stream.
+ * Snapshot mechanism already take care of the metadata
+ * handling/generation, and monitored channels only need to
+ * have their data stream cleared..
+ */
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+ goto end;
+ }
+
+ if (!channel->monitor) {
+ ret = consumer_clear_unmonitored_channel(channel);
+ } else {
+ ret = consumer_clear_monitored_channel(channel);
+ }
+end:
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+ struct lttng_consumer_channel *channel)
+{
+ struct lttng_consumer_stream *stream;
+ enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+ if (channel->metadata_stream) {
+ ERR("Open channel packets command attempted on a metadata channel");
+ ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end;
+ }
+
+ rcu_read_lock();
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ enum consumer_stream_open_packet_status status;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64
+ ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key, channel->key,
+ channel->name, channel->session_id);
+ ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ goto error_unlock;
+ default:
+ abort();
+ }
+
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
+ return ret;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ goto end_rcu_unlock;
+}