X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.cpp;fp=src%2Fbin%2Flttng-sessiond%2Fconsumer.cpp;h=13c18325d75d363e5da76674734b4e3aedcd0c07;hp=e8434702532273e5fd5e06093d15d7417eed8ccb;hb=56047f5a23df5c2c583a102b8015bbec5a7da9f1;hpb=66cefebdc240cbae0bc79594305f509b0779fa98 diff --git a/src/bin/lttng-sessiond/consumer.cpp b/src/bin/lttng-sessiond/consumer.cpp index e84347025..13c18325d 100644 --- a/src/bin/lttng-sessiond/consumer.cpp +++ b/src/bin/lttng-sessiond/consumer.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -291,18 +292,17 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) /* Destroy any relayd connection */ if (consumer->type == CONSUMER_DST_NET) { - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - int ret; + /* Send destroy relayd command. */ + const int ret = consumer_send_destroy_relayd(socket, consumer); - /* Send destroy relayd command */ - ret = consumer_send_destroy_relayd(socket, consumer); if (ret < 0) { DBG("Unable to send destroy relayd command to consumer"); /* Continue since we MUST delete everything at this point. */ } } - rcu_read_unlock(); } } @@ -319,6 +319,8 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o LTTNG_ASSERT(data); + lttng::urcu::read_lock_guard read_lock; + if (output == nullptr || data->cmd_sock < 0) { /* * Not an error. Possible there is simply not spawned consumer or it's @@ -327,9 +329,7 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o goto error; } - rcu_read_lock(); socket = consumer_find_socket(data->cmd_sock, output); - rcu_read_unlock(); if (socket == nullptr) { socket = consumer_allocate_socket(&data->cmd_sock); if (socket == nullptr) { @@ -339,9 +339,7 @@ int consumer_create_socket(struct consumer_data *data, struct consumer_output *o socket->registered = 0; socket->lock = &data->lock; - rcu_read_lock(); consumer_add_socket(socket, output); - rcu_read_unlock(); } socket->type = data->type; @@ -544,12 +542,14 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) return; } - rcu_read_lock(); - cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { - consumer_del_socket(socket, obj); - consumer_destroy_socket(socket); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { + consumer_del_socket(socket, obj); + consumer_destroy_socket(socket); + } } - rcu_read_unlock(); } /* @@ -636,32 +636,33 @@ int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *s LTTNG_ASSERT(dst); LTTNG_ASSERT(src); - rcu_read_lock(); - cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { - /* Ignore socket that are already there. */ - copy_sock = consumer_find_socket(*socket->fd_ptr, dst); - if (copy_sock) { - continue; - } + { + lttng::urcu::read_lock_guard read_lock; - /* Create new socket object. */ - copy_sock = consumer_allocate_socket(socket->fd_ptr); - if (copy_sock == nullptr) { - rcu_read_unlock(); - ret = -ENOMEM; - goto error; - } + cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { + /* Ignore socket that are already there. */ + copy_sock = consumer_find_socket(*socket->fd_ptr, dst); + if (copy_sock) { + continue; + } - copy_sock->registered = socket->registered; - /* - * This is valid because this lock is shared accross all consumer - * object being the global lock of the consumer data structure of the - * session daemon. - */ - copy_sock->lock = socket->lock; - consumer_add_socket(copy_sock, dst); + /* Create new socket object. */ + copy_sock = consumer_allocate_socket(socket->fd_ptr); + if (copy_sock == nullptr) { + ret = -ENOMEM; + goto error; + } + + copy_sock->registered = socket->registered; + /* + * This is valid because this lock is shared accross all consumer + * object being the global lock of the consumer data structure of the + * session daemon. + */ + copy_sock->lock = socket->lock; + consumer_add_socket(copy_sock, dst); + } } - rcu_read_unlock(); error: return ret; @@ -1268,33 +1269,35 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; msg.u.data_pending.session_id = session_id; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto error_unlock; - } + { + /* Send command for each consumer. */ + lttng::urcu::read_lock_guard read_lock; - /* - * No need for a recv reply status because the answer to the command is - * the reply status message. - */ + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } + + /* + * No need for a recv reply status because the answer to the command is + * the reply status message. + */ + ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } - ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); - if (ret < 0) { pthread_mutex_unlock(socket->lock); - goto error_unlock; - } - pthread_mutex_unlock(socket->lock); - if (ret_code == 1) { - break; + if (ret_code == 1) { + break; + } } } - rcu_read_unlock(); DBG("Consumer data is %s pending for session id %" PRIu64, ret_code == 1 ? "" : "NOT", @@ -1302,7 +1305,6 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum return ret_code; error_unlock: - rcu_read_unlock(); return -1; } @@ -1580,35 +1582,41 @@ int consumer_get_discarded_events(uint64_t session_id, *discarded = 0; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - uint64_t consumer_discarded = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + /* Send command for each consumer. */ + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + uint64_t consumer_discarded = 0; + + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv( + socket, &consumer_discarded, sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded)); - if (ret < 0) { - ERR("get discarded events"); pthread_mutex_unlock(socket->lock); - goto end; + *discarded += consumer_discarded; } - pthread_mutex_unlock(socket->lock); - *discarded += consumer_discarded; } + ret = 0; DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id); end: - rcu_read_unlock(); return ret; } @@ -1636,35 +1644,38 @@ int consumer_get_lost_packets(uint64_t session_id, *lost = 0; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - uint64_t consumer_lost = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + /* Send command for each consumer. */ + { + lttng::urcu::read_lock_guard read_lock; - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); - if (ret < 0) { - ERR("get lost packets"); + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } pthread_mutex_unlock(socket->lock); - goto end; + *lost += consumer_lost; } - pthread_mutex_unlock(socket->lock); - *lost += consumer_lost; } + ret = 0; DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id); end: - rcu_read_unlock(); return ret; }