+ /*
+ * Update the stream's trace chunk to its parent channel's
+ * current trace chunk.
+ */
+ stream->trace_chunk = stream->chan->trace_chunk;
+ }
+
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = rotate_relay_stream(ctx, stream);
+ } else {
+ ret = rotate_local_stream(ctx, stream);
+ }
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
+ }
+
+ if (stream->metadata_flag && stream->trace_chunk) {
+ /*
+ * If the stream has transitioned to a new trace
+ * chunk, the metadata should be re-dumped to the
+ * newest chunk.
+ *
+ * However, it is possible for a stream to transition to
+ * a "no-chunk" state. This can happen if a rotation
+ * occurs on an inactive session. In such cases, the metadata
+ * regeneration will happen when the next trace chunk is
+ * created.
+ */
+ ret = consumer_metadata_stream_dump(stream);
+ if (ret) {
+ goto error;
+ }
+ }
+ lttng_consumer_reset_stream_rotate_state(stream);
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Rotate all the ready streams now.
+ *
+ * This is especially important for low throughput streams that have already
+ * been consumed, we cannot wait for their next packet to perform the
+ * rotation.
+ * Need to be called with RCU read-side lock held to ensure existence of
+ * channel.
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
+ uint64_t key, struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+ rcu_read_lock();
+
+ DBG("Consumer rotate ready streams in channel %" PRIu64, key);
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ health_code_update();
+
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->lock);
+
+ if (!stream->rotate_ready) {
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ continue;
+ }
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ ret = 0;
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_init_command(
+ struct lttng_consumer_local_data *ctx,
+ const lttng_uuid sessiond_uuid)
+{
+ enum lttcomm_return_code ret;
+ char uuid_str[UUID_STR_LEN];
+
+ if (ctx->sessiond_uuid.is_set) {
+ ret = LTTCOMM_CONSUMERD_ALREADY_SET;
+ goto end;
+ }
+
+ ctx->sessiond_uuid.is_set = true;
+ memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+ lttng_uuid_to_str(sessiond_uuid, uuid_str);
+ DBG("Received session daemon UUID: %s", uuid_str);
+end:
+ return ret;
+}
+
+enum lttcomm_return_code lttng_consumer_create_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle)
+{
+ int ret;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *created_chunk, *published_chunk;
+ enum lttng_trace_chunk_status chunk_status;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ char creation_timestamp_buffer[ISO8601_STR_LEN];
+ const char *relayd_id_str = "(none)";
+ const char *creation_timestamp_str;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+
+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ /* Local protocol error. */
+ assert(chunk_creation_timestamp);
+ ret = time_to_iso8601_str(chunk_creation_timestamp,
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer :
+ "(formatting error)";
+
+ DBG("Consumer create trace chunk command: relay_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str, session_id, chunk_id,
+ chunk_override_name ? : "(none)",
+ creation_timestamp_str);
+
+ /*
+ * The trace chunk registry, as used by the consumer daemon, implicitly
+ * owns the trace chunks. This is only needed in the consumer since
+ * the consumer has no notion of a session beyond session IDs being
+ * used to identify other objects.
+ *
+ * The lttng_trace_chunk_registry_publish() call below provides a
+ * reference which is not released; it implicitly becomes the session
+ * daemon's reference to the chunk in the consumer daemon.
+ *
+ * The lifetime of trace chunks in the consumer daemon is managed by
+ * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
+ * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
+ */
+ created_chunk = lttng_trace_chunk_create(chunk_id,
+ chunk_creation_timestamp);
+ if (!created_chunk) {
+ ERR("Failed to create trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ if (chunk_override_name) {
+ chunk_status = lttng_trace_chunk_override_name(created_chunk,
+ chunk_override_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ if (chunk_directory_handle) {
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
+ credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk credentials");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ /*
+ * The consumer daemon has no ownership of the chunk output
+ * directory.
+ */
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
+ chunk_directory_handle);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk's directory handle");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+ }
+
+ published_chunk = lttng_trace_chunk_registry_publish_chunk(
+ consumer_data.chunk_registry, session_id,
+ created_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ created_chunk = NULL;
+ if (!published_chunk) {
+ ERR("Failed to publish trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto end;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
+ consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id, &iter.iter, channel,
+ channels_by_session_id_ht_node.node) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel,
+ published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id, chunk_id,
+ chunk_creation_timestamp);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
+ }