#include <common/defaults.hpp>
#include <common/relayd/relayd.hpp>
#include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
#include <common/uri.hpp>
#include <inttypes.h>
/* Destroy any relayd connection */
if (consumer->type == CONSUMER_DST_NET) {
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- int ret;
+ /* Send destroy relayd command. */
+ const int ret = consumer_send_destroy_relayd(socket, consumer);
- /* Send destroy relayd command */
- ret = consumer_send_destroy_relayd(socket, consumer);
if (ret < 0) {
DBG("Unable to send destroy relayd command to consumer");
/* Continue since we MUST delete everything at this point. */
}
}
- rcu_read_unlock();
}
}
LTTNG_ASSERT(data);
+ lttng::urcu::read_lock_guard read_lock;
+
if (output == nullptr || data->cmd_sock < 0) {
/*
* Not an error. Possible there is simply not spawned consumer or it's
goto error;
}
- rcu_read_lock();
socket = consumer_find_socket(data->cmd_sock, output);
- rcu_read_unlock();
if (socket == nullptr) {
socket = consumer_allocate_socket(&data->cmd_sock);
if (socket == nullptr) {
socket->registered = 0;
socket->lock = &data->lock;
- rcu_read_lock();
consumer_add_socket(socket, output);
- rcu_read_unlock();
}
socket->type = data->type;
}
/* By default, consumer output is enabled */
- output->enabled = 1;
+ output->enabled = true;
output->type = type;
output->net_seq_index = (uint64_t) -1ULL;
urcu_ref_init(&output->ref);
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
- consumer_del_socket(socket, obj);
- consumer_destroy_socket(socket);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
+ consumer_del_socket(socket, obj);
+ consumer_destroy_socket(socket);
+ }
}
- rcu_read_unlock();
}
/*
LTTNG_ASSERT(dst);
LTTNG_ASSERT(src);
- rcu_read_lock();
- cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
- /* Ignore socket that are already there. */
- copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
- if (copy_sock) {
- continue;
- }
+ {
+ lttng::urcu::read_lock_guard read_lock;
- /* Create new socket object. */
- copy_sock = consumer_allocate_socket(socket->fd_ptr);
- if (copy_sock == nullptr) {
- rcu_read_unlock();
- ret = -ENOMEM;
- goto error;
- }
+ cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
+ /* Ignore socket that are already there. */
+ copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
+ if (copy_sock) {
+ continue;
+ }
- copy_sock->registered = socket->registered;
- /*
- * This is valid because this lock is shared accross all consumer
- * object being the global lock of the consumer data structure of the
- * session daemon.
- */
- copy_sock->lock = socket->lock;
- consumer_add_socket(copy_sock, dst);
+ /* Create new socket object. */
+ copy_sock = consumer_allocate_socket(socket->fd_ptr);
+ if (copy_sock == nullptr) {
+ ret = -ENOMEM;
+ goto error;
+ }
+
+ copy_sock->registered = socket->registered;
+ /*
+ * This is valid because this lock is shared accross all consumer
+ * object being the global lock of the consumer data structure of the
+ * session daemon.
+ */
+ copy_sock->lock = socket->lock;
+ consumer_add_socket(copy_sock, dst);
+ }
}
- rcu_read_unlock();
error:
return ret;
msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
msg.u.data_pending.session_id = session_id;
- /* Send command for each consumer */
- rcu_read_lock();
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto error_unlock;
- }
+ {
+ /* Send command for each consumer. */
+ lttng::urcu::read_lock_guard read_lock;
- /*
- * No need for a recv reply status because the answer to the command is
- * the reply status message.
- */
+ cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
- ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
- if (ret < 0) {
pthread_mutex_unlock(socket->lock);
- goto error_unlock;
- }
- pthread_mutex_unlock(socket->lock);
- if (ret_code == 1) {
- break;
+ if (ret_code == 1) {
+ break;
+ }
}
}
- rcu_read_unlock();
DBG("Consumer data is %s pending for session id %" PRIu64,
ret_code == 1 ? "" : "NOT",
return ret_code;
error_unlock:
- rcu_read_unlock();
return -1;
}
*discarded = 0;
- /* Send command for each consumer */
- rcu_read_lock();
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- uint64_t consumer_discarded = 0;
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ /* Send command for each consumer. */
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+ uint64_t consumer_discarded = 0;
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(
+ socket, &consumer_discarded, sizeof(consumer_discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
- /*
- * No need for a recv reply status because the answer to the
- * command is the reply status message.
- */
- ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
- if (ret < 0) {
- ERR("get discarded events");
pthread_mutex_unlock(socket->lock);
- goto end;
+ *discarded += consumer_discarded;
}
- pthread_mutex_unlock(socket->lock);
- *discarded += consumer_discarded;
}
+
ret = 0;
DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
end:
- rcu_read_unlock();
return ret;
}
*lost = 0;
- /* Send command for each consumer */
- rcu_read_lock();
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- uint64_t consumer_lost = 0;
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ /* Send command for each consumer. */
+ {
+ lttng::urcu::read_lock_guard read_lock;
- /*
- * No need for a recv reply status because the answer to the
- * command is the reply status message.
- */
- ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
- if (ret < 0) {
- ERR("get lost packets");
+ cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+ uint64_t consumer_lost = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
+ if (ret < 0) {
+ ERR("get lost packets");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
pthread_mutex_unlock(socket->lock);
- goto end;
+ *lost += consumer_lost;
}
- pthread_mutex_unlock(socket->lock);
- *lost += consumer_lost;
}
+
ret = 0;
DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
end:
- rcu_read_unlock();
return ret;
}