X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=6de72e2758b5086f35c36e96f69276eaa8dfbdf0;hb=92816cc33a1add3c8276839bd6335e17423577dd;hp=1d6f8b4cc4ce7cba975c85eb6a2c01fcb6e19e5b;hpb=bb586a6e7c1337041aab59ec04824ce9bf002d8c;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 1d6f8b4cc..6de72e275 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -2283,26 +2283,6 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } -static -int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, - uint64_t key) -{ - ssize_t ret; - - do { - ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - PERROR("Failed to write to the channel rotation pipe"); - } else { - DBG("Sent channel rotation notification for channel key %" - PRIu64, key); - ret = 0; - } - - return (int) ret; -} - /* * Perform operations that need to be done after a stream has * rotated and released the stream lock. @@ -2339,13 +2319,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream, abort(); } - if (--stream->chan->nr_stream_rotate_pending == 0) { - DBG("Rotation of channel \"%s\" completed, notifying the session daemon", - stream->chan->name); - ret = rotate_notify_sessiond(ctx, stream->chan->key); - } pthread_mutex_unlock(&stream->chan->lock); - return ret; } @@ -3739,21 +3713,6 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - goto data_not_pending; - } - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, @@ -3776,9 +3735,27 @@ int consumer_data_pending(uint64_t id) } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3787,27 +3764,19 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } - if (ret < 0) { + + if (ret == 1) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + goto data_pending; + } else if (ret < 0) { ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - pthread_mutex_unlock(&stream->lock); goto data_not_pending; } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret == 1) { - pthread_mutex_unlock(&stream->lock); - goto data_pending; - } } - pthread_mutex_unlock(&stream->lock); - } - - if (relayd) { - unsigned int is_data_inflight = 0; - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); @@ -4214,8 +4183,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } else { ret = rotate_local_stream(ctx, stream); } + stream->trace_archive_id++; if (ret < 0) { - ERR("Rotate stream"); + ERR("Failed to rotate stream, ret = %i", ret); goto error; } @@ -4380,7 +4350,64 @@ int lttng_consumer_rotate_rename(const char *old_path, const char *new_path, } } -int lttng_consumer_rotate_pending_relay(uint64_t session_id, +/* Stream lock must be acquired by the caller. */ +static +bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream, + uint64_t session_id, uint64_t chunk_id) +{ + bool pending = false; + + if (stream->session_id != session_id) { + /* Skip. */ + goto end; + } + + /* + * If the stream's archive_id belongs to the chunk being rotated (or an + * even older one), it means that the consumer has not consumed all the + * buffers that belong to the chunk being rotated. Therefore, the + * rotation is considered as ongoing/pending. + */ + pending = stream->trace_archive_id <= chunk_id; +end: + return pending; +} + +/* RCU read lock must be acquired by the caller. */ +int lttng_consumer_check_rotation_pending_local(uint64_t session_id, + uint64_t chunk_id) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + bool rotation_pending = false; + + /* Start with the metadata streams... */ + cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { + pthread_mutex_lock(&stream->lock); + rotation_pending = check_stream_rotation_pending(stream, + session_id, chunk_id); + pthread_mutex_unlock(&stream->lock); + if (rotation_pending) { + goto end; + } + } + + /* ... followed by the data streams. */ + cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { + pthread_mutex_lock(&stream->lock); + rotation_pending = check_stream_rotation_pending(stream, + session_id, chunk_id); + pthread_mutex_unlock(&stream->lock); + if (rotation_pending) { + goto end; + } + } + +end: + return !!rotation_pending; +} + +int lttng_consumer_check_rotation_pending_relay(uint64_t session_id, uint64_t relayd_id, uint64_t chunk_id) { int ret; @@ -4388,7 +4415,7 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id, relayd = consumer_find_relayd(relayd_id); if (!relayd) { - ERR("Failed to find relayd"); + ERR("Failed to find relayd id %" PRIu64, relayd_id); ret = -1; goto end; }