#include <common/defaults.hpp>
#include <common/relayd/relayd.hpp>
#include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
#include <common/uri.hpp>
#include <inttypes.h>
return pathname;
error:
free(pathname);
- return NULL;
+ return nullptr;
}
/*
/* Destroy any relayd connection */
if (consumer->type == CONSUMER_DST_NET) {
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- int ret;
+ /* Send destroy relayd command. */
+ const int ret = consumer_send_destroy_relayd(socket, consumer);
- /* Send destroy relayd command */
- ret = consumer_send_destroy_relayd(socket, consumer);
if (ret < 0) {
DBG("Unable to send destroy relayd command to consumer");
/* Continue since we MUST delete everything at this point. */
}
}
- rcu_read_unlock();
}
}
LTTNG_ASSERT(data);
- if (output == NULL || data->cmd_sock < 0) {
+ lttng::urcu::read_lock_guard read_lock;
+
+ if (output == nullptr || data->cmd_sock < 0) {
/*
* Not an error. Possible there is simply not spawned consumer or it's
* disabled for the tracing session asking the socket.
goto error;
}
- rcu_read_lock();
socket = consumer_find_socket(data->cmd_sock, output);
- rcu_read_unlock();
- if (socket == NULL) {
+ if (socket == nullptr) {
socket = consumer_allocate_socket(&data->cmd_sock);
- if (socket == NULL) {
+ if (socket == nullptr) {
ret = -1;
goto error;
}
socket->registered = 0;
socket->lock = &data->lock;
- rcu_read_lock();
consumer_add_socket(socket, output);
- rcu_read_unlock();
}
socket->type = data->type;
const struct consumer_output *consumer)
{
int consumer_fd;
- struct consumer_socket *socket = NULL;
+ struct consumer_socket *socket = nullptr;
ASSERT_RCU_READ_LOCKED();
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
- struct consumer_socket *socket = NULL;
+ struct consumer_socket *socket = nullptr;
ASSERT_RCU_READ_LOCKED();
/* Negative keys are lookup failures */
- if (key < 0 || consumer == NULL) {
- return NULL;
+ if (key < 0 || consumer == nullptr) {
+ return nullptr;
}
lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
- if (node != NULL) {
+ if (node != nullptr) {
socket = lttng::utils::container_of(node, &consumer_socket::node);
}
*/
struct consumer_socket *consumer_allocate_socket(int *fd)
{
- struct consumer_socket *socket = NULL;
+ struct consumer_socket *socket = nullptr;
LTTNG_ASSERT(fd);
socket = zmalloc<consumer_socket>();
- if (socket == NULL) {
+ if (socket == nullptr) {
PERROR("zmalloc consumer socket");
goto error;
}
*/
struct consumer_output *consumer_create_output(enum consumer_dst_type type)
{
- struct consumer_output *output = NULL;
+ struct consumer_output *output = nullptr;
output = zmalloc<consumer_output>();
- if (output == NULL) {
+ if (output == nullptr) {
PERROR("zmalloc consumer_output");
goto error;
}
/* By default, consumer output is enabled */
- output->enabled = 1;
+ output->enabled = true;
output->type = type;
output->net_seq_index = (uint64_t) -1ULL;
urcu_ref_init(&output->ref);
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
- consumer_del_socket(socket, obj);
- consumer_destroy_socket(socket);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
+ consumer_del_socket(socket, obj);
+ consumer_destroy_socket(socket);
+ }
}
- rcu_read_unlock();
}
/*
LTTNG_ASSERT(src);
output = consumer_create_output(src->type);
- if (output == NULL) {
+ if (output == nullptr) {
goto end;
}
output->enabled = src->enabled;
error_put:
consumer_output_put(output);
- return NULL;
+ return nullptr;
}
/*
LTTNG_ASSERT(dst);
LTTNG_ASSERT(src);
- rcu_read_lock();
- cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
- /* Ignore socket that are already there. */
- copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
- if (copy_sock) {
- continue;
- }
+ {
+ lttng::urcu::read_lock_guard read_lock;
- /* Create new socket object. */
- copy_sock = consumer_allocate_socket(socket->fd_ptr);
- if (copy_sock == NULL) {
- rcu_read_unlock();
- ret = -ENOMEM;
- goto error;
- }
+ cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
+ /* Ignore socket that are already there. */
+ copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
+ if (copy_sock) {
+ continue;
+ }
- copy_sock->registered = socket->registered;
- /*
- * This is valid because this lock is shared accross all consumer
- * object being the global lock of the consumer data structure of the
- * session daemon.
- */
- copy_sock->lock = socket->lock;
- consumer_add_socket(copy_sock, dst);
+ /* Create new socket object. */
+ copy_sock = consumer_allocate_socket(socket->fd_ptr);
+ if (copy_sock == nullptr) {
+ ret = -ENOMEM;
+ goto error;
+ }
+
+ copy_sock->registered = socket->registered;
+ /*
+ * This is valid because this lock is shared accross all consumer
+ * object being the global lock of the consumer data structure of the
+ * session daemon.
+ */
+ copy_sock->lock = socket->lock;
+ consumer_add_socket(copy_sock, dst);
+ }
}
- rcu_read_unlock();
error:
return ret;
struct lttng_uri *uri)
{
int ret;
- struct lttng_uri *dst_uri = NULL;
+ struct lttng_uri *dst_uri = nullptr;
/* Code flow error safety net. */
LTTNG_ASSERT(output);
msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
msg.u.data_pending.session_id = session_id;
- /* Send command for each consumer */
- rcu_read_lock();
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto error_unlock;
- }
+ {
+ /* Send command for each consumer. */
+ lttng::urcu::read_lock_guard read_lock;
- /*
- * No need for a recv reply status because the answer to the command is
- * the reply status message.
- */
+ cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
- ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
- if (ret < 0) {
pthread_mutex_unlock(socket->lock);
- goto error_unlock;
- }
- pthread_mutex_unlock(socket->lock);
- if (ret_code == 1) {
- break;
+ if (ret_code == 1) {
+ break;
+ }
}
}
- rcu_read_unlock();
DBG("Consumer data is %s pending for session id %" PRIu64,
ret_code == 1 ? "" : "NOT",
return ret_code;
error_unlock:
- rcu_read_unlock();
return -1;
}
*discarded = 0;
- /* Send command for each consumer */
- rcu_read_lock();
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- uint64_t consumer_discarded = 0;
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ /* Send command for each consumer. */
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+ uint64_t consumer_discarded = 0;
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(
+ socket, &consumer_discarded, sizeof(consumer_discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
- /*
- * No need for a recv reply status because the answer to the
- * command is the reply status message.
- */
- ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
- if (ret < 0) {
- ERR("get discarded events");
pthread_mutex_unlock(socket->lock);
- goto end;
+ *discarded += consumer_discarded;
}
- pthread_mutex_unlock(socket->lock);
- *discarded += consumer_discarded;
}
+
ret = 0;
DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id);
end:
- rcu_read_unlock();
return ret;
}
*lost = 0;
- /* Send command for each consumer */
- rcu_read_lock();
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- uint64_t consumer_lost = 0;
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ /* Send command for each consumer. */
+ {
+ lttng::urcu::read_lock_guard read_lock;
- /*
- * No need for a recv reply status because the answer to the
- * command is the reply status message.
- */
- ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
- if (ret < 0) {
- ERR("get lost packets");
+ cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
+ uint64_t consumer_lost = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
+ if (ret < 0) {
+ ERR("get lost packets");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
pthread_mutex_unlock(socket->lock);
- goto end;
+ *lost += consumer_lost;
}
- pthread_mutex_unlock(socket->lock);
- *lost += consumer_lost;
}
+
ret = 0;
DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id);
end:
- rcu_read_unlock();
return ret;
}
int ret;
enum lttng_trace_chunk_status chunk_status;
struct lttng_credentials chunk_credentials;
- const struct lttng_directory_handle *chunk_directory_handle = NULL;
- struct lttng_directory_handle *domain_handle = NULL;
+ const struct lttng_directory_handle *chunk_directory_handle = nullptr;
+ struct lttng_directory_handle *domain_handle = nullptr;
int domain_dirfd;
const char *chunk_name;
bool chunk_name_overridden;