#define _LGPL_SOURCE
-#include <common/common.hpp>
-#include <common/sessiond-comm/sessiond-comm.hpp>
-#include <common/uri.hpp>
-#include <common/utils.hpp>
-
-#include <common/compat/endian.hpp>
-
-#include "fd-limit.hpp"
#include "agent-thread.hpp"
#include "agent.hpp"
+#include "fd-limit.hpp"
#include "lttng-sessiond.hpp"
#include "session.hpp"
-#include "utils.hpp"
#include "thread.hpp"
+#include "utils.hpp"
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/urcu.hpp>
+#include <common/uri.hpp>
+#include <common/utils.hpp>
+
+#include <fcntl.h>
+
+namespace {
struct thread_notifiers {
struct lttng_pipe *quit_pipe;
sem_t ready;
unsigned int major, minor;
};
-static int agent_tracing_enabled = -1;
+int agent_tracing_enabled = -1;
/*
* Note that there is not port here. It's set after this URI is parsed so we
* can let the user define a custom one. However, localhost is ALWAYS the
* default listening address.
*/
-static const char *default_reg_uri =
- "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS;
+const char *default_reg_uri = "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS;
+} /* namespace */
/*
* Update agent application using the given socket. This is done just after
list = session_get_list();
LTTNG_ASSERT(list);
- cds_list_for_each_entry_safe(session, stmp, &list->head, list) {
+ cds_list_for_each_entry_safe (session, stmp, &list->head, list) {
if (!session_get(session)) {
continue;
}
if (session->ust_session) {
const struct agent *agt;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
agt = trace_ust_find_agent(session->ust_session, app->domain);
if (agt) {
agent_update(agt, app);
}
- rcu_read_unlock();
}
session_unlock(session);
session_put(session);
}
- rcu_read_lock();
- /*
- * We are protected against the addition of new events by the session
- * list lock being held.
- */
- cds_lfht_for_each_entry(the_trigger_agents_ht_by_domain->ht,
- &iter.iter, trigger_agent, node.node) {
- agent_update(trigger_agent, app);
+ {
+ /*
+ * We are protected against the addition of new events by the session
+ * list lock being held.
+ */
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (
+ the_trigger_agents_ht_by_domain->ht, &iter.iter, trigger_agent, node.node) {
+ agent_update(trigger_agent, app);
+ }
}
- rcu_read_unlock();
}
/*
* Create and init socket from uri.
*/
-static struct lttcomm_sock *init_tcp_socket(void)
+static struct lttcomm_sock *init_tcp_socket()
{
int ret;
- struct lttng_uri *uri = NULL;
- struct lttcomm_sock *sock = NULL;
+ struct lttng_uri *uri = nullptr;
+ struct lttcomm_sock *sock = nullptr;
unsigned int port;
bool bind_succeeded = false;
sock = lttcomm_alloc_sock_from_uri(uri);
uri_free(uri);
- if (sock == NULL) {
+ if (sock == nullptr) {
ERR("agent allocating TCP socket");
goto error;
}
goto error;
}
- for (port = the_config.agent_tcp_port.begin;
- port <= the_config.agent_tcp_port.end; port++) {
+ for (port = the_config.agent_tcp_port.begin; port <= the_config.agent_tcp_port.end;
+ port++) {
ret = lttcomm_sock_set_port(sock, (uint16_t) port);
if (ret) {
- ERR("Failed to set port %u on socket",
- port);
+ ERR("Failed to set port %u on socket", port);
goto error;
}
DBG3("Trying to bind on port %u", port);
}
if (errno == EADDRINUSE) {
- DBG("Failed to bind to port %u since it is already in use",
- port);
+ DBG("Failed to bind to port %u since it is already in use", port);
} else {
PERROR("Failed to bind to port %u", port);
goto error;
}
if (!bind_succeeded) {
- if (the_config.agent_tcp_port.begin ==
- the_config.agent_tcp_port.end) {
+ if (the_config.agent_tcp_port.begin == the_config.agent_tcp_port.end) {
WARN("Another process is already using the agent port %i. "
"Agent support will be deactivated.",
- the_config.agent_tcp_port.begin);
+ the_config.agent_tcp_port.begin);
goto error;
} else {
WARN("All ports in the range [%i, %i] are already in use. "
"Agent support will be deactivated.",
- the_config.agent_tcp_port.begin,
- the_config.agent_tcp_port.end);
+ the_config.agent_tcp_port.begin,
+ the_config.agent_tcp_port.end);
goto error;
}
}
goto error;
}
- DBG("Listening on TCP port %u and socket %d",
- port, sock->fd);
+ DBG("Listening on TCP port %u and socket %d", port, sock->fd);
return sock;
if (sock) {
lttcomm_destroy_sock(sock);
}
- return NULL;
+ return nullptr;
}
/*
port = 0;
}
- DBG3("Destroy TCP socket on port %" PRIu16,
- port);
+ DBG3("Destroy TCP socket on port %" PRIu16, port);
/* This will return gracefully if fd is invalid. */
sock->ops->close(sock);
}
}
-static bool is_agent_protocol_version_supported(
- const struct agent_protocol_version *version)
+static bool is_agent_protocol_version_supported(const struct agent_protocol_version *version)
{
const bool is_supported = version->major == AGENT_MAJOR_VERSION &&
- version->minor == AGENT_MINOR_VERSION;
+ version->minor == AGENT_MINOR_VERSION;
if (!is_supported) {
WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i",
- version->major, version->minor,
- AGENT_MAJOR_VERSION, AGENT_MINOR_VERSION);
+ version->major,
+ version->minor,
+ AGENT_MAJOR_VERSION,
+ AGENT_MINOR_VERSION);
}
return is_supported;
* On success, the resulting socket is returned through `agent_app_socket`
* and the application's reported id is updated through `agent_app_id`.
*/
-static int accept_agent_connection(
- struct lttcomm_sock *reg_sock,
- struct agent_app_id *agent_app_id,
- struct lttcomm_sock **agent_app_socket)
+static int accept_agent_connection(struct lttcomm_sock *reg_sock,
+ struct agent_app_id *agent_app_id,
+ struct lttcomm_sock **agent_app_socket)
{
int ret;
struct agent_protocol_version agent_version;
PERROR("Failed to register new agent application");
} else if (size != 0) {
ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd",
- sizeof(msg), size);
+ sizeof(msg),
+ size);
} else {
DBG("Failed to register new agent application: connection closed");
}
goto error_close_socket;
}
- agent_version = (struct agent_protocol_version) {
+ agent_version = (struct agent_protocol_version){
be32toh(msg.major_version),
be32toh(msg.minor_version),
};
goto error_close_socket;
}
- *agent_app_id = (struct agent_app_id) {
+ *agent_app_id = (struct agent_app_id){
.pid = (pid_t) be32toh(msg.pid),
.domain = (lttng_domain_type) be32toh(msg.domain),
};
DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d",
- (long) agent_app_id->pid,
- domain_type_str(agent_app_id->domain), new_sock->fd);
+ (long) agent_app_id->pid,
+ domain_type_str(agent_app_id->domain),
+ new_sock->fd);
*agent_app_socket = new_sock;
- new_sock = NULL;
+ new_sock = nullptr;
ret = 0;
goto end;
return ret;
}
-bool agent_tracing_is_enabled(void)
+bool agent_tracing_is_enabled()
{
int enabled;
*/
static int write_agent_port(uint16_t port)
{
- return utils_create_pid_file(
- (pid_t) port, the_config.agent_port_file_path.value);
+ return utils_create_pid_file((pid_t) port, the_config.agent_port_file_path.value);
}
-static
-void mark_thread_as_ready(struct thread_notifiers *notifiers)
+static void mark_thread_as_ready(struct thread_notifiers *notifiers)
{
DBG("Marking agent management thread as ready");
sem_post(¬ifiers->ready);
}
-static
-void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
+static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
{
DBG("Waiting for agent management thread to be ready");
sem_wait(¬ifiers->ready);
*/
static void *thread_agent_management(void *data)
{
- int i, ret, pollfd;
- uint32_t revents, nb_fd;
+ int i, ret;
+ uint32_t nb_fd;
struct lttng_poll_event events;
struct lttcomm_sock *reg_sock;
struct thread_notifiers *notifiers = (thread_notifiers *) data;
- const int quit_pipe_read_fd = lttng_pipe_get_readfd(
- notifiers->quit_pipe);
+ const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
DBG("Manage agent application registration.");
goto error_poll_create;
}
- ret = lttng_poll_add(&events, quit_pipe_read_fd,
- LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
if (ret < 0) {
goto error_tcp_socket;
}
mark_thread_as_ready(notifiers);
/* Add TCP socket to the poll set. */
- ret = lttng_poll_add(&events, reg_sock->fd,
- LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
+ ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLRDHUP);
if (ret < 0) {
goto error;
}
- while (1) {
+ while (true) {
DBG3("Manage agent polling");
/* Inifinite blocking call, waiting for transmission */
-restart:
+ restart:
ret = lttng_poll_wait(&events, -1);
- DBG3("Manage agent return from poll on %d fds",
- LTTNG_POLL_GETNB(&events));
+ DBG3("Manage agent return from poll on %d fds", LTTNG_POLL_GETNB(&events));
if (ret < 0) {
/*
* Restart interrupted system call.
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ const auto revents = LTTNG_POLL_GETEV(&events, i);
+ const auto pollfd = LTTNG_POLL_GETFD(&events, i);
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd == quit_pipe_read_fd) {
+ /* Activity on thread quit pipe, exiting. */
+ if (pollfd == thread_quit_pipe_fd) {
+ DBG("Activity on thread quit pipe");
goto exit;
}
/* Activity on the registration socket. */
if (revents & LPOLLIN) {
struct agent_app_id new_app_id;
- struct agent_app *new_app = NULL;
+ struct agent_app *new_app = nullptr;
struct lttcomm_sock *new_app_socket;
int new_app_socket_fd;
* new_app_socket's ownership has been
* transferred to the new agent app.
*/
- new_app = agent_create_app(new_app_id.pid,
- new_app_id.domain,
- new_app_socket);
+ new_app = agent_create_app(
+ new_app_id.pid, new_app_id.domain, new_app_socket);
if (!new_app) {
- new_app_socket->ops->close(
- new_app_socket);
+ new_app_socket->ops->close(new_app_socket);
continue;
}
new_app_socket_fd = new_app_socket->fd;
- new_app_socket = NULL;
+ new_app_socket = nullptr;
/*
* Since this is a command socket (write then
* read), only add poll error event to only
* detect shutdown.
*/
- ret = lttng_poll_add(&events, new_app_socket_fd,
- LPOLLERR | LPOLLHUP | LPOLLRDHUP);
+ ret = lttng_poll_add(&events, new_app_socket_fd, LPOLLRDHUP);
if (ret < 0) {
agent_destroy_app(new_app);
continue;
if (ret < 0) {
agent_destroy_app(new_app);
/* Removing from the poll set. */
- ret = lttng_poll_del(&events,
- new_app_socket_fd);
+ ret = lttng_poll_del(&events, new_app_socket_fd);
if (ret < 0) {
session_unlock_list();
goto error;
DBG("Cleaning up and stopping.");
rcu_thread_offline();
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
static bool shutdown_agent_management_thread(void *data)
free(notifiers);
}
-bool launch_agent_management_thread(void)
+bool launch_agent_management_thread()
{
struct thread_notifiers *notifiers;
struct lttng_thread *thread;
goto error;
}
thread = lttng_thread_create("Agent management",
- thread_agent_management,
- shutdown_agent_management_thread,
- cleanup_agent_management_thread,
- notifiers);
+ thread_agent_management,
+ shutdown_agent_management_thread,
+ cleanup_agent_management_thread,
+ notifiers);
if (!thread) {
goto error;
}