X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=2420d110b3e8f1449a01a3d41ca941e1099bf4d1;hb=94d4914075c61cd1ee2ec00d8b61eacff105fc47;hp=2c2b79cf0f501fc864211e1e63b7ec6abc242dbb;hpb=309167d2a6f59d0c8cbf64eb23ba912cdea76a34;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 2c2b79cf0..2420d110b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -40,6 +40,7 @@ #include #include #include +#include #include "consumer.h" #include "consumer-stream.h" @@ -305,6 +306,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) consumer_stream_destroy(stream, NULL); } + if (channel->live_timer_enabled == 1) { + consumer_timer_live_stop(channel); + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; @@ -515,6 +520,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->metadata_flag = 1; /* Metadata is flat out. */ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); + /* Live rendez-vous point. */ + pthread_cond_init(&stream->metadata_rdv, NULL); + pthread_mutex_init(&stream->metadata_rdv_lock, NULL); } else { /* Format stream name to _ */ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", @@ -729,6 +737,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, if (ret < 0) { goto end; } + uatomic_inc(&relayd->refcount); stream->sent_to_relayd = 1; } else { @@ -839,7 +848,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t tracefile_size, uint64_t tracefile_count, uint64_t session_id_per_pid, - unsigned int monitor) + unsigned int monitor, + unsigned int live_timer_interval) { struct lttng_consumer_channel *channel; @@ -860,6 +870,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; channel->monitor = monitor; + channel->live_timer_interval = live_timer_interval; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); @@ -3054,6 +3065,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, ssize_t ret; pthread_mutex_lock(&stream->lock); + if (stream->metadata_flag) { + pthread_mutex_lock(&stream->metadata_rdv_lock); + } switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -3070,6 +3084,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, break; } + if (stream->metadata_flag) { + pthread_cond_broadcast(&stream->metadata_rdv); + pthread_mutex_unlock(&stream->metadata_rdv_lock); + } pthread_mutex_unlock(&stream->lock); return ret; } @@ -3109,7 +3127,8 @@ void lttng_consumer_init(void) int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id) + struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, + uint64_t relayd_session_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; @@ -3209,29 +3228,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; - /* - * Create a session on the relayd and store the returned id. Lock the - * control socket mutex if the relayd was NOT created before. - */ - if (!relayd_created) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - } - ret = relayd_create_session(&relayd->control_sock, - &relayd->relayd_session_id); - if (!relayd_created) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - if (ret < 0) { - /* - * Close all sockets of a relayd object. It will be freed if it was - * created at the error code path or else it will be garbage - * collect. - */ - (void) relayd_close(&relayd->control_sock); - (void) relayd_close(&relayd->data_sock); - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - goto error; - } + relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: