X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=a2df6c9587b5b047f6c71a286d3973f3a248f1f1;hb=95671f5349e87cdd2ea6cb47243608e9368ab8d5;hp=e37e9d4687ed2bcf4afad4d206fa4a886d1b5c28;hpb=90c106c686bee2d1dedf1496140f9291d3b16799;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index e37e9d468..a2df6c958 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -8,6 +8,7 @@ */ #include "common/index/ctf-index.h" +#include #define _LGPL_SOURCE #include #include @@ -178,7 +179,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, send_node) { - cds_list_del(&stream->send_node); /* * Once a stream is added to this list, the buffers were created so we * have a guarantee that this call will succeed. Setting the monitor @@ -465,6 +465,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); + lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } @@ -1047,6 +1049,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->is_live = is_in_live_session; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); + lttng_wait_queue_init(&channel->metadata_pushed_wait_queue); switch (output) { case LTTNG_EVENT_SPLICE: @@ -2177,6 +2180,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, * pointer value. */ channel->metadata_stream = NULL; + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); @@ -3563,18 +3567,22 @@ error: * This will create a relayd socket pair and add it to the relayd hash table. * The caller MUST acquire a RCU read side lock before calling it. */ - void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, - struct lttng_consumer_local_data *ctx, int sock, +void consumer_add_relayd_socket(uint64_t net_seq_idx, + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, - uint64_t relayd_session_id) + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol) { int fd = -1, ret = -1, relayd_created = 0; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct consumer_relayd_sock_pair *relayd = NULL; assert(ctx); - assert(relayd_sock); DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx); @@ -3643,54 +3651,25 @@ error: switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->control_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->control_sock.sock.fd)) { - PERROR("close"); - } + ret = lttcomm_populate_sock_from_open_socket( + &relayd->control_sock.sock, fd, + relayd_socket_protocol); - /* Assign new file descriptor */ - relayd->control_sock.sock.fd = fd; /* Assign version values. */ - relayd->control_sock.major = relayd_sock->major; - relayd->control_sock.minor = relayd_sock->minor; + relayd->control_sock.major = relayd_version_major; + relayd->control_sock.minor = relayd_version_minor; relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); - ret = lttcomm_create_sock(&relayd->data_sock.sock); - /* Handle create_sock error. */ - if (ret < 0) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto error; - } - /* - * Close the socket created internally by - * lttcomm_create_sock, so we can replace it by the one - * received from sessiond. - */ - if (close(relayd->data_sock.sock.fd)) { - PERROR("close"); - } - - /* Assign new file descriptor */ - relayd->data_sock.sock.fd = fd; + ret = lttcomm_populate_sock_from_open_socket( + &relayd->data_sock.sock, fd, + relayd_socket_protocol); /* Assign version values. */ - relayd->data_sock.major = relayd_sock->major; - relayd->data_sock.minor = relayd_sock->minor; + relayd->data_sock.major = relayd_version_major; + relayd->data_sock.minor = relayd_version_minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type); @@ -3698,6 +3677,11 @@ error: goto error; } + if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)", sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd);