Fix: rotation of a stopped session hangs indifinitely
[lttng-tools.git] / src / common / consumer / consumer.c
index 3896875f50e999076b2296829e19eab5f665ad5c..914eda8c5a1386f3650d6b47cc499e64986267a5 100644 (file)
@@ -52,6 +52,7 @@
 #include <common/trace-chunk.h>
 #include <common/trace-chunk-registry.h>
 #include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -936,6 +937,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
                data_hdr.padding_size = htobe32(padding);
+
                /*
                 * Note that net_seq_num below is assigned with the *current* value of
                 * next_net_seq_num and only after that the next_net_seq_num will be
@@ -1726,7 +1728,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        /* RCU lock for the relayd pointer */
        rcu_read_lock();
 
-       assert(stream->chan->trace_chunk);
+       assert(stream->net_seq_idx != (uint64_t) -1ULL ||
+                       stream->chan->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
@@ -4011,12 +4014,28 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+       struct lttng_dynamic_array stream_rotation_positions;
+       uint64_t next_chunk_id, stream_count = 0;
+       enum lttng_trace_chunk_status chunk_status;
+       const bool is_local_trace = relayd_id == -1ULL;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       bool rotating_to_new_chunk = true;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
+       lttng_dynamic_array_init(&stream_rotation_positions,
+                       sizeof(struct relayd_stream_rotation_position), NULL);
+
        rcu_read_lock();
 
        pthread_mutex_lock(&channel->lock);
+       assert(channel->trace_chunk);
+       chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+                       &next_chunk_id);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = -1;
+               goto end_unlock_channel;
+       }
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
@@ -4031,6 +4050,10 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                 */
                pthread_mutex_lock(&stream->lock);
 
+               if (stream->trace_chunk == stream->chan->trace_chunk) {
+                       rotating_to_new_chunk = false;
+               }
+
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
                        ERR("Failed to sample snapshot position during channel rotation");
@@ -4050,6 +4073,10 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        stream->rotate_ready = true;
                }
 
+               /*
+                * Active flush; has no effect if the production position
+                * is at a packet boundary.
+                */
                ret = consumer_flush_buffer(stream, 1);
                if (ret < 0) {
                        ERR("Failed to flush stream %" PRIu64 " during channel rotation",
@@ -4057,18 +4084,86 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        goto end_unlock_stream;
                }
 
+               if (!is_local_trace) {
+                       /*
+                        * The relay daemon control protocol expects a rotation
+                        * position as "the sequence number of the first packet
+                        * _after_ the current trace chunk.
+                        *
+                        * At the moment when the positions of the buffers are
+                        * sampled, the production position does not necessarily
+                        * sit at a packet boundary. The 'active' flush
+                        * operation above will push the production position to
+                        * the next packet boundary _if_ it is not already
+                        * sitting at such a boundary.
+                        *
+                        * Assuming a current production position that is not
+                        * on the bound of a packet, the 'target' sequence
+                        * number is
+                        *   (consumed_pos / subbuffer_size) + 1
+                        * Note the '+ 1' to ensure the current packet is
+                        * part of the current trace chunk.
+                        *
+                        * However, if the production position is already at
+                        * a packet boundary, the '+ 1' is not necessary as the
+                        * last packet of the current chunk is already
+                        * 'complete'.
+                        */
+                       const struct relayd_stream_rotation_position position = {
+                               .stream_id = stream->relayd_stream_id,
+                               .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
+                                       !!(stream->rotate_position % stream->max_sb_size),
+                       };
+
+                       ret = lttng_dynamic_array_add_element(
+                                       &stream_rotation_positions,
+                                       &position);
+                       if (ret) {
+                               ERR("Failed to allocate stream rotation position");
+                               goto end_unlock_stream;
+                       }
+                       stream_count++;
+               }
                pthread_mutex_unlock(&stream->lock);
        }
+       stream = NULL;
        pthread_mutex_unlock(&channel->lock);
 
+       if (is_local_trace) {
+               ret = 0;
+               goto end;
+       }
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd %" PRIu64, relayd_id);
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                       rotating_to_new_chunk ? &next_chunk_id : NULL,
+                       (const struct relayd_stream_rotation_position *)
+                                       stream_rotation_positions.buffer.data);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                               relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+               goto end;
+       }
+
        ret = 0;
        goto end;
 
 end_unlock_stream:
        pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
        pthread_mutex_unlock(&channel->lock);
 end:
        rcu_read_unlock();
+       lttng_dynamic_array_reset(&stream_rotation_positions);
        return ret;
 }
 
@@ -4167,52 +4262,6 @@ end:
        return ret;
 }
 
-/*
- * Perform the rotation a stream file on the relay.
- */
-int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
-{
-       int ret;
-       struct consumer_relayd_sock_pair *relayd;
-       uint64_t chunk_id;
-       enum lttng_trace_chunk_status chunk_status;
-
-       DBG("Rotate relay stream");
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (!relayd) {
-               ERR("Failed to find relayd");
-               ret = -1;
-               goto end;
-       }
-
-       chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk,
-                       &chunk_id);
-       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"",
-                               stream->chan->name);
-               ret = -1;
-               goto end;
-       }
-
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_stream(&relayd->control_sock,
-                       stream->relayd_stream_id,
-                       chunk_id,
-                       stream->last_sequence_number);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-       }
-       if (ret) {
-               ERR("Rotate relay stream");
-       }
-
-end:
-       return ret;
-}
-
 /*
  * Performs the stream rotation for the rotate session feature if needed.
  * It must be called with the channel and stream locks held.
@@ -4254,14 +4303,12 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                stream->trace_chunk = stream->chan->trace_chunk;
        }
 
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ret = rotate_relay_stream(ctx, stream);
-       } else {
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
                ret = rotate_local_stream(ctx, stream);
-       }
-       if (ret < 0) {
-               ERR("Failed to rotate stream, ret = %i", ret);
-               goto error;
+               if (ret < 0) {
+                       ERR("Failed to rotate stream, ret = %i", ret);
+                       goto error;
+               }
        }
 
        if (stream->metadata_flag && stream->trace_chunk) {
@@ -4677,10 +4724,14 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id)
 {
+       int ret;
        enum lttcomm_return_code ret_code;
        struct lttng_trace_chunk *chunk;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
+       const bool is_local_trace = !relayd_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       bool chunk_exists_remote;
 
        if (relayd_id) {
                int ret;
@@ -4696,16 +4747,47 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
         }
 
        DBG("Consumer trace chunk exists command: relayd_id = %s"
-                       ", session_id = %" PRIu64
                        ", chunk_id = %" PRIu64, relayd_id_str,
-                       session_id, chunk_id);
+                       chunk_id);
        chunk = lttng_trace_chunk_registry_find_chunk(
                        consumer_data.chunk_registry, session_id,
                        chunk_id);
        DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
-       ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL :
+       if (chunk) {
+               ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+               lttng_trace_chunk_put(chunk);
+               goto end;
+       } else if (is_local_trace) {
+               ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+               goto end;
+       }
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(*relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd %" PRIu64, *relayd_id);
+               ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end_rcu_unlock;
+       }
+       DBG("Looking up existence of trace chunk on relay daemon");
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+                       &chunk_exists_remote);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Failed to look-up the existence of trace chunk on relay daemon");
+               ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+               goto end_rcu_unlock;
+       }
+
+       ret_code = chunk_exists_remote ?
+                       LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
                        LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+       DBG("Trace chunk %s on relay daemon",
+                       chunk_exists_remote ? "exists" : "does not exist");
 
-       lttng_trace_chunk_put(chunk);
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
        return ret_code;
 }
This page took 0.026432 seconds and 4 git commands to generate.