projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix: consumerd: slow metadata push slows down application registration
[lttng-tools.git]
/
src
/
common
/
ust-consumer
/
ust-consumer.c
diff --git
a/src/common/ust-consumer/ust-consumer.c
b/src/common/ust-consumer/ust-consumer.c
index e27e15ca573ade45605606ebbff0a01448ae67a2..b43ae58ffd84b4be5c836130341d6c8b200b672c 100644
(file)
--- a/
src/common/ust-consumer/ust-consumer.c
+++ b/
src/common/ust-consumer/ust-consumer.c
@@
-950,6
+950,8
@@
error:
*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
metadata->metadata_stream = NULL;
*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
metadata->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
send_streams_error:
error_no_stream:
end:
send_streams_error:
error_no_stream:
end:
@@
-985,7
+987,7
@@
static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
0
, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
false
, 1);
if (ret < 0) {
goto error;
}
if (ret < 0) {
goto error;
}
@@
-1032,6
+1034,7
@@
error_stream:
*/
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
*/
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
error:
rcu_read_unlock();
error:
rcu_read_unlock();
@@
-1275,7
+1278,7
@@
void metadata_stream_reset_cache_consumed_position(
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, uint64_t version,
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, uint64_t version,
- struct lttng_consumer_channel *channel,
int
timer, int wait)
+ struct lttng_consumer_channel *channel,
bool invoked_by_
timer, int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
@@
-1364,13
+1367,8
@@
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
if (!wait) {
goto end_free;
}
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
-
- health_code_update();
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
end_free:
free(metadata_str);
@@
-1821,7
+1819,7
@@
end_get_channel_nosignal:
health_code_update();
ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
health_code_update();
ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
- version, found_channel,
0
, 1);
+ version, found_channel,
false
, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
@@
-2613,6
+2611,7
@@
int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
goto end;
}
stream->ust_metadata_pushed += write_len;
goto end;
}
stream->ust_metadata_pushed += write_len;
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
assert(stream->chan->metadata_cache->contents.size >=
stream->ust_metadata_pushed);
assert(stream->chan->metadata_cache->contents.size >=
stream->ust_metadata_pushed);
@@
-2662,7
+2661,7
@@
enum sync_metadata_status lttng_ustconsumer_sync_metadata(
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
0
, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
false
, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
@@
-3312,7
+3311,7
@@
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
* pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
* pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel,
int
timer, int wait)
+ struct lttng_consumer_channel *channel,
bool invoked_by_
timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
@@
-3420,7
+3419,7
@@
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
health_code_update();
ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
health_code_update();
ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, version, channel, timer, wait);
+ key, offset, len, version, channel,
invoked_by_
timer, wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
This page took
0.025221 seconds
and
4
git commands to generate.