#include <common/futex.hpp>
#include <common/hashtable/utils.hpp>
#include <common/macros.hpp>
+#include <common/pthread-lock.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/unix.hpp>
+#include <common/urcu.hpp>
#include <lttng/action/action-internal.hpp>
#include <lttng/action/list-internal.hpp>
}
lttng_session_trigger_list_destroy(session_info->trigger_list);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_del(session_info->sessions_ht, &session_info->sessions_ht_node);
- rcu_read_unlock();
free(session_info->name);
lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
call_rcu(&session_info->rcu_node, free_session_info_rcu);
static void session_info_add_channel(struct session_info *session_info,
struct channel_info *channel_info)
{
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_add(session_info->channel_infos_ht,
hash_channel_key(&channel_info->key),
&channel_info->session_info_channels_ht_node);
- rcu_read_unlock();
}
static void session_info_remove_channel(struct session_info *session_info,
struct channel_info *channel_info)
{
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_del(session_info->channel_infos_ht, &channel_info->session_info_channels_ht_node);
- rcu_read_unlock();
}
static struct channel_info *channel_info_create(const char *channel_name,
lttng_condition_put(list->condition);
if (list->notification_trigger_clients_ht) {
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_del(list->notification_trigger_clients_ht,
&list->notification_trigger_clients_ht_node);
- rcu_read_unlock();
list->notification_trigger_clients_ht = nullptr;
}
cds_list_for_each_entry_safe (client_list_element, tmp, &list->clients_list, node) {
*/
client_list->condition = lttng_condition_copy(condition);
- /* Build a list of clients to which this new condition applies. */
- cds_lfht_for_each_entry (state->client_socket_ht, &iter, client, client_socket_ht_node) {
- struct notification_client_list_element *client_list_element;
+ {
+ /* Build a list of clients to which this new condition applies. */
+ lttng::urcu::read_lock_guard read_lock;
- if (!condition_applies_to_client(condition, client)) {
- continue;
- }
+ cds_lfht_for_each_entry (
+ state->client_socket_ht, &iter, client, client_socket_ht_node) {
+ struct notification_client_list_element *client_list_element;
- client_list_element = zmalloc<notification_client_list_element>();
- if (!client_list_element) {
- goto error_put_client_list;
- }
+ if (!condition_applies_to_client(condition, client)) {
+ continue;
+ }
+
+ client_list_element = zmalloc<notification_client_list_element>();
+ if (!client_list_element) {
+ goto error_put_client_list;
+ }
- CDS_INIT_LIST_HEAD(&client_list_element->node);
- client_list_element->client = client;
- cds_list_add(&client_list_element->node, &client_list->clients_list);
+ CDS_INIT_LIST_HEAD(&client_list_element->node);
+ client_list_element->client = client;
+ cds_list_add(&client_list_element->node, &client_list->clients_list);
+ }
}
client_list->notification_trigger_clients_ht = state->notification_trigger_clients_ht;
- rcu_read_lock();
/*
* Add the client list to the global list of client list.
*/
- cds_lfht_add_unique(state->notification_trigger_clients_ht,
- lttng_condition_hash(client_list->condition),
- match_client_list_condition,
- client_list->condition,
- &client_list->notification_trigger_clients_ht_node);
- rcu_read_unlock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_add_unique(state->notification_trigger_clients_ht,
+ lttng_condition_hash(client_list->condition),
+ match_client_list_condition,
+ client_list->condition,
+ &client_list->notification_trigger_clients_ht_node);
+ }
goto end;
error_put_client_list:
struct cds_lfht_iter iter;
struct notification_client_list *list = nullptr;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_lookup(state->notification_trigger_clients_ht,
lttng_condition_hash(condition),
match_client_list_condition,
list = notification_client_list_get(list) ? list : nullptr;
}
- rcu_read_unlock();
return list;
}
struct channel_state_sample *last_sample = nullptr;
struct lttng_channel_trigger_list *channel_trigger_list = nullptr;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Find the channel associated with the condition. */
cds_lfht_for_each_entry (
*session_uid = channel_info->session_info->uid;
*session_gid = channel_info->session_info->gid;
end:
- rcu_read_unlock();
return ret;
}
status = lttng_condition_buffer_usage_get_channel_name(condition, &condition_channel_name);
LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
- if (strcmp(channel_info->session_info->name, condition_session_name)) {
+ if (strcmp(channel_info->session_info->name, condition_session_name) != 0) {
goto fail;
}
- if (strcmp(channel_info->name, condition_channel_name)) {
+ if (strcmp(channel_info->name, condition_channel_name) != 0) {
goto fail;
}
cds_lfht_node_init(&list->session_triggers_ht_node);
list->session_triggers_ht = session_triggers_ht;
- rcu_read_lock();
/* Publish the list through the session_triggers_ht. */
- cds_lfht_add(session_triggers_ht,
- hash_key_str(session_name, lttng_ht_seed),
- &list->session_triggers_ht_node);
- rcu_read_unlock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_add(session_triggers_ht,
+ hash_key_str(session_name, lttng_ht_seed),
+ &list->session_triggers_ht_node);
+ }
end:
return list;
}
cds_list_del(&trigger_list_element->node);
free(trigger_list_element);
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Unpublish the list from the session_triggers_ht. */
cds_lfht_del(list->session_triggers_ht, &list->session_triggers_ht_node);
- rcu_read_unlock();
call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
}
session_trigger_list =
lttng_session_trigger_list_create(session_name, state->session_triggers_ht);
- /* Add all triggers applying to the session named 'session_name'. */
- cds_lfht_for_each_entry (state->triggers_ht, &iter, trigger_ht_element, node) {
- int ret;
+ {
+ /* Add all triggers applying to the session named 'session_name'. */
+ lttng::urcu::read_lock_guard read_lock;
- if (!trigger_applies_to_session(trigger_ht_element->trigger, session_name)) {
- continue;
- }
+ cds_lfht_for_each_entry (state->triggers_ht, &iter, trigger_ht_element, node) {
+ int ret;
- ret = lttng_session_trigger_list_add(session_trigger_list,
- trigger_ht_element->trigger);
- if (ret) {
- goto error;
- }
+ if (!trigger_applies_to_session(trigger_ht_element->trigger,
+ session_name)) {
+ continue;
+ }
- trigger_count++;
+ ret = lttng_session_trigger_list_add(session_trigger_list,
+ trigger_ht_element->trigger);
+ if (ret) {
+ goto error;
+ }
+
+ trigger_count++;
+ }
}
DBG("Found %i triggers that apply to newly created session", trigger_count);
struct session_info *session = nullptr;
struct lttng_session_trigger_list *trigger_list;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
trigger_list = lttng_session_trigger_list_build(state, name);
if (!trigger_list) {
goto error;
goto error;
}
- rcu_read_unlock();
return session;
error:
- rcu_read_unlock();
session_info_put(session);
return nullptr;
}
int trigger_count = 0;
struct cds_lfht_iter iter;
struct session_info *session_info = nullptr;
+ lttng::urcu::read_lock_guard read_lock;
DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64
", domain = %s",
goto error;
}
- rcu_read_lock();
/* Build a list of all triggers applying to the new channel. */
cds_lfht_for_each_entry (state->triggers_ht, &iter, trigger_ht_element, node) {
struct lttng_trigger_list_element *new_element;
new_element = zmalloc<lttng_trigger_list_element>();
if (!new_element) {
- rcu_read_unlock();
goto error;
}
CDS_INIT_LIST_HEAD(&new_element->node);
cds_list_add(&new_element->node, &trigger_list);
trigger_count++;
}
- rcu_read_unlock();
DBG("Found %i triggers that apply to newly added channel", trigger_count);
channel_trigger_list = zmalloc<lttng_channel_trigger_list>();
cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
cds_list_splice(&trigger_list, &channel_trigger_list->list);
- rcu_read_lock();
/* Add channel to the channel_ht which owns the channel_infos. */
cds_lfht_add(state->channels_ht,
hash_channel_key(&new_channel_info->key),
cds_lfht_add(state->channel_triggers_ht,
hash_channel_key(&new_channel_info->key),
&channel_trigger_list->channel_triggers_ht_node);
- rcu_read_unlock();
session_info_put(session_info);
*cmd_result = LTTNG_OK;
return 0;
channel_key,
lttng_domain_type_str(domain));
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_lookup(state->channel_triggers_ht,
hash_channel_key(&key),
cds_lfht_del(state->channels_ht, node);
channel_info_destroy(channel_info);
end:
- rcu_read_unlock();
*cmd_result = LTTNG_OK;
return 0;
}
struct lttng_credentials session_creds;
struct session_state_sample new_session_state;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
session_info = get_session_info_by_id(state, session_id);
if (!session_info) {
session_info_put(session_info);
*_cmd_result = cmd_result;
- rcu_read_unlock();
return ret;
}
struct lttng_triggers *local_triggers = nullptr;
const struct lttng_credentials *creds;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
local_triggers = lttng_triggers_create();
if (!local_triggers) {
local_triggers = nullptr;
end:
- rcu_read_unlock();
lttng_triggers_destroy(local_triggers);
*_cmd_result = cmd_result;
return ret;
const char *trigger_name;
uid_t trigger_owner_uid;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (state->triggers_ht, &iter, trigger_ht_element, node) {
- if (lttng_trigger_is_equal(trigger, trigger_ht_element->trigger)) {
- /* Take one reference on the return trigger. */
- *registered_trigger = trigger_ht_element->trigger;
- lttng_trigger_get(*registered_trigger);
- ret = 0;
- cmd_result = LTTNG_OK;
- goto end;
+ cds_lfht_for_each_entry (state->triggers_ht, &iter, trigger_ht_element, node) {
+ if (lttng_trigger_is_equal(trigger, trigger_ht_element->trigger)) {
+ /* Take one reference on the return trigger. */
+ *registered_trigger = trigger_ht_element->trigger;
+ lttng_trigger_get(*registered_trigger);
+ ret = 0;
+ cmd_result = LTTNG_OK;
+ goto end;
+ }
}
}
ret = 0;
end:
- rcu_read_unlock();
*_cmd_result = cmd_result;
return ret;
}
static bool is_trigger_action_notify(const struct lttng_trigger *trigger)
{
bool is_notify = false;
- unsigned int i, count;
- enum lttng_action_status action_status;
const struct lttng_action *action = lttng_trigger_get_const_action(trigger);
enum lttng_action_type action_type;
goto end;
}
- action_status = lttng_action_list_get_count(action, &count);
- LTTNG_ASSERT(action_status == LTTNG_ACTION_STATUS_OK);
-
- for (i = 0; i < count; i++) {
- const struct lttng_action *inner_action = lttng_action_list_get_at_index(action, i);
-
- action_type = lttng_action_get_type(inner_action);
- if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
+ for (auto inner_action : lttng::ctl::const_action_list_view(action)) {
+ if (lttng_action_get_type(inner_action) == LTTNG_ACTION_TYPE_NOTIFY) {
is_notify = true;
goto end;
}
enum action_executor_status executor_status;
const uint64_t trigger_tracer_token = state->trigger_id.next_tracer_token++;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Set the trigger's tracer token. */
lttng_trigger_set_tracer_token(trigger, trigger_tracer_token);
lttng_trigger_destroy(trigger);
}
end:
- rcu_read_unlock();
return ret;
}
struct cds_lfht_iter iter;
struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
- cds_lfht_for_each_entry (state->trigger_tokens_ht, &iter, trigger_tokens_ht_element, node) {
- if (!lttng_trigger_is_equal(trigger, trigger_tokens_ht_element->trigger)) {
- continue;
- }
+ {
+ lttng::urcu::read_lock_guard read_lock;
- event_notifier_error_accounting_unregister_event_notifier(
- trigger_tokens_ht_element->trigger);
+ cds_lfht_for_each_entry (
+ state->trigger_tokens_ht, &iter, trigger_tokens_ht_element, node) {
+ if (!lttng_trigger_is_equal(trigger, trigger_tokens_ht_element->trigger)) {
+ continue;
+ }
- /* TODO talk to all app and remove it */
- DBG("Removed trigger from tokens_ht");
- cds_lfht_del(state->trigger_tokens_ht, &trigger_tokens_ht_element->node);
+ event_notifier_error_accounting_unregister_event_notifier(
+ trigger_tokens_ht_element->trigger);
- call_rcu(&trigger_tokens_ht_element->rcu_node,
- free_notification_trigger_tokens_ht_element_rcu);
+ /* TODO talk to all app and remove it */
+ DBG("Removed trigger from tokens_ht");
+ cds_lfht_del(state->trigger_tokens_ht, &trigger_tokens_ht_element->node);
- break;
+ call_rcu(&trigger_tokens_ht_element->rcu_node,
+ free_notification_trigger_tokens_ht_element_rcu);
+
+ break;
+ }
}
}
const struct lttng_condition *condition = lttng_trigger_get_const_condition(trigger);
enum lttng_error_code cmd_reply;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_lookup(
state->triggers_ht, lttng_condition_hash(condition), match_trigger, trigger, &iter);
lttng_trigger_destroy(trigger_ht_element->trigger);
call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
end:
- rcu_read_unlock();
if (_cmd_reply) {
*_cmd_reply = cmd_reply;
}
return 0;
}
-static int pop_cmd_queue(struct notification_thread_handle *handle,
- struct notification_thread_command **cmd)
+static notification_thread_command *pop_cmd_queue(notification_thread_handle *handle)
{
- int ret;
- uint64_t counter;
+ lttng::pthread::lock_guard queue_lock(handle->cmd_queue.lock);
- pthread_mutex_lock(&handle->cmd_queue.lock);
- ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
- if (ret != sizeof(counter)) {
- ret = -1;
- goto error_unlock;
+ uint64_t counter;
+ const auto read_ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+ if (read_ret != sizeof(counter)) {
+ if (read_ret < 0) {
+ LTTNG_THROW_POSIX("Failed to read counter value from event_fd", errno);
+ } else {
+ LTTNG_THROW_ERROR(lttng::format(
+ "Failed to read counter value from event_fd because of a truncated read: ret={}, expected read size={}",
+ read_ret,
+ sizeof(counter)));
+ }
}
- *cmd = cds_list_first_entry(
+ auto command = cds_list_first_entry(
&handle->cmd_queue.list, struct notification_thread_command, cmd_list_node);
- cds_list_del(&((*cmd)->cmd_list_node));
- ret = 0;
-
-error_unlock:
- pthread_mutex_unlock(&handle->cmd_queue.lock);
- return ret;
+ cds_list_del(&((command)->cmd_list_node));
+ return command;
}
/* Returns 0 on success, 1 on exit requested, negative value on error. */
struct notification_thread_state *state)
{
int ret;
- struct notification_thread_command *cmd;
+ struct notification_thread_command *cmd = nullptr;
- ret = pop_cmd_queue(handle, &cmd);
- if (ret) {
+ try {
+ cmd = pop_cmd_queue(handle);
+ } catch (const std::exception& ex) {
+ ERR("Failed to get next notification thread command: %s", ex.what());
goto error;
}
cmd->parameters.client_communication_update.id;
struct notification_client *client;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
client = get_client_from_id(client_id, state);
if (!client) {
} else {
ret = client_handle_transmission_status(client, client_status, state);
}
- rcu_read_unlock();
break;
}
default:
if (ret) {
goto error_unlock;
}
+
end:
- if (cmd->is_async) {
- free(cmd);
- cmd = nullptr;
- } else {
- lttng_waiter_wake_up(&cmd->reply_waiter);
+ if (cmd) {
+ if (cmd->is_async) {
+ delete cmd;
+ cmd = nullptr;
+ } else {
+ cmd->command_completed_waker->wake();
+ }
}
+
return ret;
+
error_unlock:
/* Wake-up and return a fatal error to the calling thread. */
- lttng_waiter_wake_up(&cmd->reply_waiter);
cmd->reply_code = LTTNG_ERR_FATAL;
+
error:
- /* Indicate a fatal error to the caller. */
- return -1;
+ ret = -1;
+ goto end;
}
static int socket_set_non_blocking(int socket)
}
DBG("Added new notification channel client socket (%i) to poll set", client->socket);
- rcu_read_lock();
- cds_lfht_add(state->client_socket_ht,
- hash_client_socket(client->socket),
- &client->client_socket_ht_node);
- cds_lfht_add(state->client_id_ht, hash_client_id(client->id), &client->client_id_ht_node);
- rcu_read_unlock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_add(state->client_socket_ht,
+ hash_client_socket(client->socket),
+ &client->client_socket_ht_node);
+ cds_lfht_add(state->client_id_ht,
+ hash_client_id(client->id),
+ &client->client_id_ht_node);
+ }
return ret;
int ret = 0;
struct notification_client *client;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
+
DBG("Closing client connection (socket fd = %i)", client_socket);
client = get_client_from_socket(client_socket, state);
if (!client) {
ret = notification_thread_client_disconnect(client, state);
end:
- rcu_read_unlock();
return ret;
}
struct notification_client *client;
bool error_encoutered = false;
- rcu_read_lock();
DBG("Closing all client connections");
- cds_lfht_for_each_entry (state->client_socket_ht, &iter, client, client_socket_ht_node) {
- int ret;
- ret = notification_thread_client_disconnect(client, state);
- if (ret) {
- error_encoutered = true;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (
+ state->client_socket_ht, &iter, client, client_socket_ht_node) {
+ int ret;
+
+ ret = notification_thread_client_disconnect(client, state);
+ if (ret) {
+ error_encoutered = true;
+ }
}
}
- rcu_read_unlock();
+
return error_encoutered ? 1 : 0;
}
struct cds_lfht_iter iter;
struct lttng_trigger_ht_element *trigger_ht_element;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (state->triggers_ht, &iter, trigger_ht_element, node) {
int ret = handle_notification_thread_command_unregister_trigger(
state, trigger_ht_element->trigger, nullptr);
error_occurred = true;
}
}
- rcu_read_unlock();
return error_occurred ? -1 : 0;
}
ssize_t recv_ret;
size_t offset;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
client = get_client_from_socket(socket, state);
if (!client) {
/* Internal error, abort. */
}
end:
- rcu_read_unlock();
return ret;
error_disconnect_client:
struct notification_client *client;
enum client_transmission_status transmission_status;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
client = get_client_from_socket(socket, state);
if (!client) {
/* Internal error, abort. */
goto end;
}
end:
- rcu_read_unlock();
return ret;
}
unsigned int capture_count = 0;
/* Find triggers associated with this token. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_lookup(state->trigger_tokens_ht,
hash_key_u64(¬ification->tracer_token, lttng_ht_seed),
match_trigger_token,
end_unlock:
notification_client_list_put(client_list);
- rcu_read_unlock();
end:
return ret;
}
struct lttng_credentials channel_creds = {};
struct lttng_credentials session_creds = {};
struct session_info *session;
+ lttng::urcu::read_lock_guard read_lock;
/*
* The monitoring pipe only holds messages smaller than PIPE_BUF,
channel_new_sample.highest_usage = sample_msg.highest;
channel_new_sample.lowest_usage = sample_msg.lowest;
- rcu_read_lock();
-
session = get_session_info_by_id(state, sample_msg.session_id);
if (!session) {
DBG("Received a sample for an unknown session from consumerd: session id = %" PRIu64,
session->last_state_sample = session_new_sample;
}
session_info_put(session);
- rcu_read_unlock();
end:
return ret;
}