#include "lttng-syscall.hpp"
#include "notification-thread-commands.hpp"
#include "notification-thread.hpp"
-#include "rotate.hpp"
#include "rotation-thread.hpp"
#include "session.hpp"
#include "timer.hpp"
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/string-utils/string-utils.hpp>
#include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
#include <common/utils.hpp>
#include <lttng/action/action-internal.hpp>
DBG3("Listing agent events");
- rcu_read_lock();
agent_event_count = lttng_ht_get_count(agt->events);
if (agent_event_count == 0) {
/* Early exit. */
local_nb_events = (unsigned int) agent_event_count;
- cds_lfht_for_each_entry (agt->events->ht, &iter.iter, event, node.node) {
- struct lttng_event *tmp_event = lttng_event_create();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- if (!tmp_event) {
- ret_code = LTTNG_ERR_NOMEM;
- goto error;
- }
+ cds_lfht_for_each_entry (agt->events->ht, &iter.iter, event, node.node) {
+ struct lttng_event *tmp_event = lttng_event_create();
- if (lttng_strncpy(tmp_event->name, event->name, sizeof(tmp_event->name))) {
- lttng_event_destroy(tmp_event);
- ret_code = LTTNG_ERR_FATAL;
- goto error;
- }
+ if (!tmp_event) {
+ ret_code = LTTNG_ERR_NOMEM;
+ goto error;
+ }
+
+ if (lttng_strncpy(tmp_event->name, event->name, sizeof(tmp_event->name))) {
+ lttng_event_destroy(tmp_event);
+ ret_code = LTTNG_ERR_FATAL;
+ goto error;
+ }
- tmp_event->name[sizeof(tmp_event->name) - 1] = '\0';
- tmp_event->enabled = !!event->enabled_count;
- tmp_event->loglevel = event->loglevel_value;
- tmp_event->loglevel_type = event->loglevel_type;
+ tmp_event->name[sizeof(tmp_event->name) - 1] = '\0';
+ tmp_event->enabled = !!event->enabled_count;
+ tmp_event->loglevel = event->loglevel_value;
+ tmp_event->loglevel_type = event->loglevel_type;
- ret = lttng_event_serialize(
- tmp_event, 0, nullptr, event->filter_expression, 0, nullptr, reply_payload);
- lttng_event_destroy(tmp_event);
- if (ret) {
- ret_code = LTTNG_ERR_FATAL;
- goto error;
+ ret = lttng_event_serialize(tmp_event,
+ 0,
+ nullptr,
+ event->filter_expression,
+ 0,
+ nullptr,
+ reply_payload);
+ lttng_event_destroy(tmp_event);
+ if (ret) {
+ ret_code = LTTNG_ERR_FATAL;
+ goto error;
+ }
}
}
-
end:
ret_code = LTTNG_OK;
*nb_events = local_nb_events;
error:
- rcu_read_unlock();
return ret_code;
}
DBG("Listing UST global events for channel %s", channel_name);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ust_global->channels, (void *) channel_name, &iter);
node = lttng_ht_iter_get_node_str(&iter);
ret_code = LTTNG_OK;
*nb_events = local_nb_events;
error:
- rcu_read_unlock();
return ret_code;
}
LTTNG_ASSERT(session);
- rcu_read_lock();
-
if (session->consumer_fds_sent == 0 && session->consumer != nullptr) {
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (
session->consumer->socks->ht, &iter.iter, socket, node.node) {
pthread_mutex_lock(socket->lock);
}
error:
- rcu_read_unlock();
return ret;
}
DBG("Setting relayd for session %s", session->name);
- rcu_read_lock();
if (session->current_trace_chunk) {
enum lttng_trace_chunk_status status = lttng_trace_chunk_get_id(
session->current_trace_chunk, ¤t_chunk_id.value);
if (usess && usess->consumer && usess->consumer->type == CONSUMER_DST_NET &&
usess->consumer->enabled) {
/* For each consumer socket, send relayd sockets */
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (
usess->consumer->socks->ht, &iter.iter, socket, node.node) {
pthread_mutex_lock(socket->lock);
/* Session is now ready for network streaming. */
session->net_handle = 1;
}
+
session->consumer->relay_major_version = usess->consumer->relay_major_version;
session->consumer->relay_minor_version = usess->consumer->relay_minor_version;
session->consumer->relay_allows_clear = usess->consumer->relay_allows_clear;
if (ksess && ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET &&
ksess->consumer->enabled) {
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (
ksess->consumer->socks->ht, &iter.iter, socket, node.node) {
pthread_mutex_lock(socket->lock);
/* Session is now ready for network streaming. */
session->net_handle = 1;
}
+
session->consumer->relay_major_version = ksess->consumer->relay_major_version;
session->consumer->relay_minor_version = ksess->consumer->relay_minor_version;
session->consumer->relay_allows_clear = ksess->consumer->relay_allows_clear;
}
error:
- rcu_read_unlock();
return ret;
}
usess = session->ust_session;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
switch (domain) {
case LTTNG_DOMAIN_KERNEL:
ret = LTTNG_OK;
error:
- rcu_read_unlock();
return ret;
}
LTTNG_ASSERT(_attr);
LTTNG_ASSERT(domain);
+ lttng::urcu::read_lock_guard read_lock;
+
attr = lttng_channel_copy(_attr);
if (!attr) {
ret_code = LTTNG_ERR_NOMEM;
DBG("Enabling channel %s for session %s", attr->name, session->name);
- rcu_read_lock();
-
/*
* If the session is a live session, remove the switch timer, the
* live timer does the same thing but sends also synchronisation
session->has_non_mmap_channel = true;
}
error:
- rcu_read_unlock();
end:
lttng_channel_destroy(attr);
return ret_code;
event_name = event->name;
+ lttng::urcu::read_lock_guard read_lock;
+
/* Error out on unhandled search criteria */
if (event->loglevel_type || event->loglevel != -1 || event->enabled || event->pid ||
event->filter || event->exclusion) {
goto error;
}
- rcu_read_lock();
-
switch (domain) {
case LTTNG_DOMAIN_KERNEL:
{
ret = LTTNG_OK;
error_unlock:
- rcu_read_unlock();
error:
free(exclusion);
free(bytecode);
DBG("Enable event command for event \'%s\'", event->name);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
switch (domain->type) {
case LTTNG_DOMAIN_KERNEL:
free(filter);
free(exclusion);
channel_attr_destroy(attr);
- rcu_read_unlock();
return ret;
}
*
* Called with session lock held.
*/
-int cmd_destroy_session(struct ltt_session *session,
- struct notification_thread_handle *notification_thread_handle,
- int *sock_fd)
+int cmd_destroy_session(struct ltt_session *session, int *sock_fd)
{
int ret;
enum lttng_error_code destruction_last_error = LTTNG_OK;
}
if (session->rotate_size) {
- unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
+ try {
+ the_rotation_thread_handle->unsubscribe_session_consumed_size_rotation(
+ *session);
+ } catch (const std::exception& e) {
+ /* Continue the destruction of the session anyway. */
+ ERR("Failed to unsubscribe rotation thread notification channel from consumed size condition during session destruction: %s",
+ e.what());
+ }
+
session->rotate_size = 0;
}
pthread_mutex_init(socket->lock, nullptr);
socket->registered = 1;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
consumer_add_socket(socket, ksess->consumer);
- rcu_read_unlock();
pthread_mutex_lock(&cdata->pid_mutex);
cdata->pid = -1;
DBG3("Listing domains found UST global domain");
nb_dom++;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (
session->ust_session->agents->ht, &iter.iter, agt, node.node) {
if (agt->being_used) {
nb_dom++;
}
}
- rcu_read_unlock();
}
if (!nb_dom) {
(*domains)[index].buf_type = session->ust_session->buffer_type;
index++;
- rcu_read_lock();
- cds_lfht_for_each_entry (
- session->ust_session->agents->ht, &iter.iter, agt, node.node) {
- if (agt->being_used) {
- (*domains)[index].type = agt->domain;
- (*domains)[index].buf_type = session->ust_session->buffer_type;
- index++;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (
+ session->ust_session->agents->ht, &iter.iter, agt, node.node) {
+ if (agt->being_used) {
+ (*domains)[index].type = agt->domain;
+ (*domains)[index].buf_type =
+ session->ust_session->buffer_type;
+ index++;
+ }
}
}
- rcu_read_unlock();
}
end:
return nb_dom;
struct lttng_ht_iter iter;
struct ltt_ust_channel *uchan;
- rcu_read_lock();
- cds_lfht_for_each_entry (session->ust_session->domain_global.channels->ht,
- &iter.iter,
- uchan,
- node.node) {
- uint64_t discarded_events = 0, lost_packets = 0;
- struct lttng_channel *channel = nullptr;
- struct lttng_channel_extended *extended;
-
- channel = trace_ust_channel_to_lttng_channel(uchan);
- if (!channel) {
- ret_code = LTTNG_ERR_NOMEM;
- goto end;
- }
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (session->ust_session->domain_global.channels->ht,
+ &iter.iter,
+ uchan,
+ node.node) {
+ uint64_t discarded_events = 0, lost_packets = 0;
+ struct lttng_channel *channel = nullptr;
+ struct lttng_channel_extended *extended;
- extended = (struct lttng_channel_extended *) channel->attr.extended.ptr;
+ channel = trace_ust_channel_to_lttng_channel(uchan);
+ if (!channel) {
+ ret_code = LTTNG_ERR_NOMEM;
+ goto end;
+ }
- ret = get_ust_runtime_stats(
- session, uchan, &discarded_events, &lost_packets);
- if (ret < 0) {
- lttng_channel_destroy(channel);
- ret_code = LTTNG_ERR_UNK;
- goto end;
- }
+ extended = (struct lttng_channel_extended *)
+ channel->attr.extended.ptr;
+
+ ret = get_ust_runtime_stats(
+ session, uchan, &discarded_events, &lost_packets);
+ if (ret < 0) {
+ lttng_channel_destroy(channel);
+ ret_code = LTTNG_ERR_UNK;
+ goto end;
+ }
- extended->discarded_events = discarded_events;
- extended->lost_packets = lost_packets;
+ extended->discarded_events = discarded_events;
+ extended->lost_packets = lost_packets;
+
+ ret = lttng_channel_serialize(channel, &payload->buffer);
+ if (ret) {
+ ERR("Failed to serialize lttng_channel: channel name = '%s'",
+ channel->name);
+ lttng_channel_destroy(channel);
+ ret_code = LTTNG_ERR_UNK;
+ goto end;
+ }
- ret = lttng_channel_serialize(channel, &payload->buffer);
- if (ret) {
- ERR("Failed to serialize lttng_channel: channel name = '%s'",
- channel->name);
lttng_channel_destroy(channel);
- ret_code = LTTNG_ERR_UNK;
- goto end;
+ i++;
}
-
- lttng_channel_destroy(channel);
- i++;
}
- rcu_read_unlock();
+
break;
}
default:
struct lttng_ht_iter iter;
struct agent *agt;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (
session->ust_session->agents->ht, &iter.iter, agt, node.node) {
if (agt->domain == domain) {
break;
}
}
-
- rcu_read_unlock();
}
break;
default:
goto free_error;
}
- rcu_read_lock();
snapshot_add_output(&session->snapshot, new_output);
if (id) {
*id = new_output->id;
}
- rcu_read_unlock();
return LTTNG_OK;
LTTNG_ASSERT(session);
LTTNG_ASSERT(output);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/*
* Permission denied to create an output if the session is not
ret = LTTNG_OK;
error:
- rcu_read_unlock();
return ret;
}
}
/* Copy list from session to the new list object. */
- rcu_read_lock();
- cds_lfht_for_each_entry (session->snapshot.output_ht->ht, &iter.iter, output, node.node) {
- LTTNG_ASSERT(output->consumer);
- list[idx].id = output->id;
- list[idx].max_size = output->max_size;
- if (lttng_strncpy(list[idx].name, output->name, sizeof(list[idx].name))) {
- ret = -LTTNG_ERR_INVALID;
- goto error;
- }
- if (output->consumer->type == CONSUMER_DST_LOCAL) {
- if (lttng_strncpy(list[idx].ctrl_url,
- output->consumer->dst.session_root_path,
- sizeof(list[idx].ctrl_url))) {
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (
+ session->snapshot.output_ht->ht, &iter.iter, output, node.node) {
+ LTTNG_ASSERT(output->consumer);
+ list[idx].id = output->id;
+ list[idx].max_size = output->max_size;
+ if (lttng_strncpy(list[idx].name, output->name, sizeof(list[idx].name))) {
ret = -LTTNG_ERR_INVALID;
goto error;
}
- } else {
- /* Control URI. */
- ret = uri_to_str_url(&output->consumer->dst.net.control,
- list[idx].ctrl_url,
- sizeof(list[idx].ctrl_url));
- if (ret < 0) {
- ret = -LTTNG_ERR_NOMEM;
- goto error;
- }
- /* Data URI. */
- ret = uri_to_str_url(&output->consumer->dst.net.data,
- list[idx].data_url,
- sizeof(list[idx].data_url));
- if (ret < 0) {
- ret = -LTTNG_ERR_NOMEM;
- goto error;
+ if (output->consumer->type == CONSUMER_DST_LOCAL) {
+ if (lttng_strncpy(list[idx].ctrl_url,
+ output->consumer->dst.session_root_path,
+ sizeof(list[idx].ctrl_url))) {
+ ret = -LTTNG_ERR_INVALID;
+ goto error;
+ }
+ } else {
+ /* Control URI. */
+ ret = uri_to_str_url(&output->consumer->dst.net.control,
+ list[idx].ctrl_url,
+ sizeof(list[idx].ctrl_url));
+ if (ret < 0) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+
+ /* Data URI. */
+ ret = uri_to_str_url(&output->consumer->dst.net.data,
+ list[idx].data_url,
+ sizeof(list[idx].data_url));
+ if (ret < 0) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
}
+
+ idx++;
}
- idx++;
}
*outputs = list;
list = nullptr;
ret = session->snapshot.nb_output;
error:
- rcu_read_unlock();
free(list);
end:
return ret;
(int) trigger_owner,
ret_code);
}
+
+ goto end_unlock_session_list;
}
break;
}
* For each consumer socket, create and send the relayd object of the
* snapshot output.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry (output->socks->ht, &iter.iter, socket, node.node) {
- pthread_mutex_lock(socket->lock);
- status = send_consumer_relayd_sockets(
- session->id,
- output,
- socket,
- session->name,
- session->hostname,
- base_path,
- session->live_timer,
- current_chunk_id.is_set ? ¤t_chunk_id.value : nullptr,
- session->creation_time,
- session->name_contains_creation_time);
- pthread_mutex_unlock(socket->lock);
- if (status != LTTNG_OK) {
- rcu_read_unlock();
- goto error;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (output->socks->ht, &iter.iter, socket, node.node) {
+ pthread_mutex_lock(socket->lock);
+ status = send_consumer_relayd_sockets(
+ session->id,
+ output,
+ socket,
+ session->name,
+ session->hostname,
+ base_path,
+ session->live_timer,
+ current_chunk_id.is_set ? ¤t_chunk_id.value : nullptr,
+ session->creation_time,
+ session->name_contains_creation_time);
+ pthread_mutex_unlock(socket->lock);
+ if (status != LTTNG_OK) {
+ goto error;
+ }
}
}
- rcu_read_unlock();
error:
return status;
struct snapshot_output *sout;
struct lttng_ht_iter iter;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
+
cds_lfht_for_each_entry (
session->snapshot.output_ht->ht, &iter.iter, sout, node.node) {
struct snapshot_output output_copy;
output->name,
sizeof(output_copy.name))) {
cmd_ret = LTTNG_ERR_INVALID;
- rcu_read_unlock();
goto error;
}
}
cmd_ret = snapshot_record(session, &output_copy);
if (cmd_ret != LTTNG_OK) {
- rcu_read_unlock();
goto error;
}
+
snapshot_success = 1;
}
- rcu_read_unlock();
}
if (snapshot_success) {
if (tmp_output) {
snapshot_output_destroy(tmp_output);
}
+
return cmd_ret;
}
ret = (cmd_ret == LTTNG_OK) ? cmd_ret : -((int) cmd_ret);
return ret;
error:
- if (session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR)) {
+ if (session_reset_rotation_state(*session, LTTNG_ROTATION_STATE_ERROR)) {
ERR("Failed to reset rotation state of session \"%s\"", session->name);
}
goto end;
int cmd_rotation_set_schedule(struct ltt_session *session,
bool activate,
enum lttng_rotation_schedule_type schedule_type,
- uint64_t new_value,
- struct notification_thread_handle *notification_thread_handle)
+ uint64_t new_value)
{
int ret;
uint64_t *parameter_value;
break;
case LTTNG_ROTATION_SCHEDULE_TYPE_SIZE_THRESHOLD:
if (activate) {
- ret = subscribe_session_consumed_size_rotation(
- session, new_value, notification_thread_handle);
- if (ret) {
- ERR("Failed to enable consumed-size notification in ROTATION_SET_SCHEDULE command");
+ try {
+ the_rotation_thread_handle->subscribe_session_consumed_size_rotation(
+ *session, new_value);
+ } catch (const std::exception& e) {
+ ERR("Failed to enable consumed-size notification in ROTATION_SET_SCHEDULE command: %s",
+ e.what());
ret = LTTNG_ERR_UNK;
goto end;
}
} else {
- ret = unsubscribe_session_consumed_size_rotation(
- session, notification_thread_handle);
- if (ret) {
- ERR("Failed to disable consumed-size notification in ROTATION_SET_SCHEDULE command");
+ try {
+ the_rotation_thread_handle
+ ->unsubscribe_session_consumed_size_rotation(*session);
+ } catch (const std::exception& e) {
+ ERR("Failed to disable consumed-size notification in ROTATION_SET_SCHEDULE command: %s",
+ e.what());
ret = LTTNG_ERR_UNK;
goto end;
}