relayd: implement file and session rotation on top of trace chunks
[lttng-tools.git] / src / common / consumer / consumer.c
index 948a81d804428a01582b0466cb2e7433eb87dc4e..b2f4c2686d2f7f81b5bc21c4249a681e92a5bad3 100644 (file)
@@ -52,6 +52,7 @@
 #include <common/trace-chunk.h>
 #include <common/trace-chunk-registry.h>
 #include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -936,6 +937,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
                data_hdr.padding_size = htobe32(padding);
+
                /*
                 * Note that net_seq_num below is assigned with the *current* value of
                 * next_net_seq_num and only after that the next_net_seq_num will be
@@ -4012,12 +4014,28 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+       struct lttng_dynamic_array stream_rotation_positions;
+       uint64_t next_chunk_id, stream_count = 0;
+       enum lttng_trace_chunk_status chunk_status;
+       const bool is_local_trace = relayd_id == -1ULL;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       bool rotating_to_new_chunk = true;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
+       lttng_dynamic_array_init(&stream_rotation_positions,
+                       sizeof(struct relayd_stream_rotation_position), NULL);
+
        rcu_read_lock();
 
        pthread_mutex_lock(&channel->lock);
+       assert(channel->trace_chunk);
+       chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+                       &next_chunk_id);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = -1;
+               goto end_unlock_channel;
+       }
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
@@ -4032,6 +4050,10 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                 */
                pthread_mutex_lock(&stream->lock);
 
+               if (stream->trace_chunk == stream->chan->trace_chunk) {
+                       rotating_to_new_chunk = false;
+               }
+
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
                        ERR("Failed to sample snapshot position during channel rotation");
@@ -4058,18 +4080,62 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        goto end_unlock_stream;
                }
 
+               if (!is_local_trace) {
+                       const struct relayd_stream_rotation_position position = {
+                               .stream_id = stream->relayd_stream_id,
+                               .rotate_at_seq_num = (stream->rotate_position /
+                                               stream->max_sb_size) + 1,
+                       };
+
+                       ret = lttng_dynamic_array_add_element(
+                                       &stream_rotation_positions,
+                                       &position);
+                       if (ret) {
+                               ERR("Failed to allocate stream rotation position");
+                               goto end_unlock_stream;
+                       }
+                       stream_count++;
+               }
                pthread_mutex_unlock(&stream->lock);
        }
+       stream = NULL;
        pthread_mutex_unlock(&channel->lock);
 
+       if (is_local_trace) {
+               ret = 0;
+               goto end;
+       }
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd %" PRIu64, relayd_id);
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                       rotating_to_new_chunk ? &next_chunk_id : NULL,
+                       (const struct relayd_stream_rotation_position *)
+                                       stream_rotation_positions.buffer.data);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                               relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+               goto end;
+       }
+
        ret = 0;
        goto end;
 
 end_unlock_stream:
        pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
        pthread_mutex_unlock(&channel->lock);
 end:
        rcu_read_unlock();
+       lttng_dynamic_array_reset(&stream_rotation_positions);
        return ret;
 }
 
@@ -4168,52 +4234,6 @@ end:
        return ret;
 }
 
-/*
- * Perform the rotation a stream file on the relay.
- */
-int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
-{
-       int ret;
-       struct consumer_relayd_sock_pair *relayd;
-       uint64_t chunk_id;
-       enum lttng_trace_chunk_status chunk_status;
-
-       DBG("Rotate relay stream");
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (!relayd) {
-               ERR("Failed to find relayd");
-               ret = -1;
-               goto end;
-       }
-
-       chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk,
-                       &chunk_id);
-       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"",
-                               stream->chan->name);
-               ret = -1;
-               goto end;
-       }
-
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_stream(&relayd->control_sock,
-                       stream->relayd_stream_id,
-                       chunk_id,
-                       stream->last_sequence_number);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-       }
-       if (ret) {
-               ERR("Rotate relay stream");
-       }
-
-end:
-       return ret;
-}
-
 /*
  * Performs the stream rotation for the rotate session feature if needed.
  * It must be called with the channel and stream locks held.
@@ -4255,14 +4275,12 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                stream->trace_chunk = stream->chan->trace_chunk;
        }
 
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ret = rotate_relay_stream(ctx, stream);
-       } else {
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
                ret = rotate_local_stream(ctx, stream);
-       }
-       if (ret < 0) {
-               ERR("Failed to rotate stream, ret = %i", ret);
-               goto error;
+               if (ret < 0) {
+                       ERR("Failed to rotate stream, ret = %i", ret);
+                       goto error;
+               }
        }
 
        if (stream->metadata_flag && stream->trace_chunk) {
@@ -4678,10 +4696,14 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id)
 {
+       int ret;
        enum lttcomm_return_code ret_code;
        struct lttng_trace_chunk *chunk;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
+       const bool is_local_trace = !relayd_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       bool chunk_exists_remote;
 
        if (relayd_id) {
                int ret;
@@ -4697,16 +4719,47 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
         }
 
        DBG("Consumer trace chunk exists command: relayd_id = %s"
-                       ", session_id = %" PRIu64
                        ", chunk_id = %" PRIu64, relayd_id_str,
-                       session_id, chunk_id);
+                       chunk_id);
        chunk = lttng_trace_chunk_registry_find_chunk(
                        consumer_data.chunk_registry, session_id,
                        chunk_id);
        DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
-       ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL :
+       if (chunk) {
+               ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+               lttng_trace_chunk_put(chunk);
+               goto end;
+       } else if (is_local_trace) {
+               ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+               goto end;
+       }
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(*relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd %" PRIu64, *relayd_id);
+               ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end_rcu_unlock;
+       }
+       DBG("Looking up existence of trace chunk on relay daemon");
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+                       &chunk_exists_remote);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Failed to look-up the existence of trace chunk on relay daemon");
+               ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+               goto end_rcu_unlock;
+       }
+
+       ret_code = chunk_exists_remote ?
+                       LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
                        LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+       DBG("Trace chunk %s on relay daemon",
+                       chunk_exists_remote ? "exists" : "does not exist");
 
-       lttng_trace_chunk_put(chunk);
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
        return ret_code;
 }
This page took 0.025949 seconds and 4 git commands to generate.