* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <getopt.h>
#include <grp.h>
#include <sys/wait.h>
#include <urcu/uatomic.h>
#include <unistd.h>
-#include <config.h>
#include <common/common.h>
#include <common/compat/socket.h>
#include <common/relayd/relayd.h>
#include <common/utils.h>
#include <common/daemonize.h>
-#include <common/config/config.h>
+#include <common/config/session-config.h>
#include "lttng-sessiond.h"
#include "buffer-registry.h"
#include "save.h"
#include "load-session-thread.h"
#include "syscall.h"
+#include "agent.h"
#define CONSUMERD_FILE "lttng-consumerd"
/* Load session thread information to operate. */
struct load_session_thread_data *load_info;
+/* Global hash tables */
+struct lttng_ht *agent_apps_ht_by_sock = NULL;
+
/*
* Whether sessiond is ready for commands/health check requests.
* NR_LTTNG_SESSIOND_READY must match the number of calls to
return ret;
}
+/*
+ * Wait on consumer process termination.
+ *
+ * Need to be called with the consumer data lock held or from a context
+ * ensuring no concurrent access to data (e.g: cleanup).
+ */
+static void wait_consumer(struct consumer_data *consumer_data)
+{
+ pid_t ret;
+ int status;
+
+ if (consumer_data->pid <= 0) {
+ return;
+ }
+
+ DBG("Waiting for complete teardown of consumerd (PID: %d)",
+ consumer_data->pid);
+ ret = waitpid(consumer_data->pid, &status, 0);
+ if (ret == -1) {
+ PERROR("consumerd waitpid pid: %d", consumer_data->pid)
+ }
+ if (!WIFEXITED(status)) {
+ ERR("consumerd termination with error: %d",
+ WEXITSTATUS(ret));
+ }
+ consumer_data->pid = 0;
+}
+
/*
* Cleanup the session daemon's data structures.
*/
}
}
+ wait_consumer(&kconsumer_data);
+ wait_consumer(&ustconsumer64_data);
+ wait_consumer(&ustconsumer32_data);
+
DBG("Cleaning up all agent apps");
agent_app_ht_clean();
free(kmod_probes_list);
free(kmod_extra_probes_list);
+ run_as_destroy_worker();
+
/* <fun> */
DBG("%c[%d;%dm*** assert failed :-) *** ==> %c[%dm%c[%d;%dm"
"Matthew, BEET driven development works!%c[%dm",
}
/* Check for data on kernel pipe */
- if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) {
- (void) lttng_read(kernel_poll_pipe[0],
- &tmp, 1);
- /*
- * Ret value is useless here, if this pipe gets any actions an
- * update is required anyway.
- */
- update_poll_flag = 1;
- continue;
- } else {
- /*
- * New CPU detected by the kernel. Adding kernel stream to
- * kernel session and updating the kernel consumer
- */
- if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
+ if (pollfd == kernel_poll_pipe[0]) {
+ (void) lttng_read(kernel_poll_pipe[0],
+ &tmp, 1);
+ /*
+ * Ret value is useless here, if this pipe gets any actions an
+ * update is required anyway.
+ */
+ update_poll_flag = 1;
+ continue;
+ } else {
+ /*
+ * New CPU detected by the kernel. Adding kernel stream to
+ * kernel session and updating the kernel consumer
+ */
ret = update_kernel_stream(&kconsumer_data, pollfd);
if (ret < 0) {
continue;
}
break;
- /*
- * TODO: We might want to handle the LPOLLERR | LPOLLHUP
- * and unregister kernel stream at this point.
- */
}
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ update_poll_flag = 1;
+ continue;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
DBG("[thread] Manage consumer started");
+ rcu_register_thread();
+ rcu_thread_online();
+
health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
health_code_update();
/* Event on the registration socket */
if (pollfd == consumer_data->err_sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("consumer err socket poll error");
goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
if (pollfd == sock) {
/* Event on the consumerd socket */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+ && !(revents & LPOLLIN)) {
ERR("consumer err socket second poll error");
goto error;
}
goto exit;
} else if (pollfd == consumer_data->metadata_fd) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+ && !(revents & LPOLLIN)) {
+ ERR("consumer err metadata socket second poll error");
+ goto error;
+ }
/* UST metadata requests */
ret = ust_consumer_metadata_request(
&consumer_data->metadata_sock);
unlink(consumer_data->err_unix_sock_path);
unlink(consumer_data->cmd_unix_sock_path);
- consumer_data->pid = 0;
pthread_mutex_unlock(&consumer_data->lock);
/* Cleanup metadata socket mutex. */
health_unregister(health_sessiond);
DBG("consumer thread cleanup completed");
+ rcu_thread_offline();
+ rcu_unregister_thread();
+
return NULL;
}
/* Inspect the apps cmd pipe */
if (pollfd == apps_cmd_pipe[0]) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Apps command pipe error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
int sock;
/* Empty pipe */
health_code_update();
/*
- * We only monitor the error events of the socket. This
- * thread does not handle any incoming data from UST
- * (POLLIN).
+ * Since this is a command socket (write then read),
+ * we only monitor the error events of the socket.
*/
ret = lttng_poll_add(&events, sock,
LPOLLERR | LPOLLHUP | LPOLLRDHUP);
}
DBG("Apps with sock %d added to poll set", sock);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Apps command pipe error");
+ goto error;
+ } else {
+ ERR("Unknown poll events %u for sock %d", revents, pollfd);
+ goto error;
}
} else {
/*
/* Socket closed on remote end. */
ust_app_unregister(pollfd);
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
wait_queue->count--;
ust_app_destroy(wait_node->app);
free(wait_node);
+ /*
+ * Silence warning of use-after-free in
+ * cds_list_for_each_entry_safe which uses
+ * __typeof__(*wait_node).
+ */
+ wait_node = NULL;
break;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
* Don't care about return value. Let the manage apps threads
* handle app unregistration upon socket close.
*/
- (void) ust_app_register_done(app->sock);
+ (void) ust_app_register_done(app);
/*
* Even if the application socket has been closed, send the app
free(wait_node);
}
+ /* Empty command queue. */
+ for (;;) {
+ /* Dequeue command for registration */
+ node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail);
+ if (node == NULL) {
+ break;
+ }
+ ust_cmd = caa_container_of(node, struct ust_command, node);
+ ret = close(ust_cmd->sock);
+ if (ret < 0) {
+ PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
+ }
+ lttng_fd_put(LTTNG_FD_APPS, 1);
+ free(ust_cmd);
+ }
+
error_testpoint:
DBG("Dispatch thread dying");
if (err) {
/* Event on the registration socket */
if (pollfd == apps_sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Register apps socket poll error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
sock = lttcomm_accept_unix_sock(apps_sock);
if (sock < 0) {
goto error;
ust_cmd = zmalloc(sizeof(struct ust_command));
if (ust_cmd == NULL) {
PERROR("ust command zmalloc");
+ ret = close(sock);
+ if (ret) {
+ PERROR("close");
+ }
goto error;
}
* barrier with the exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&ust_cmd_queue.futex);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Register apps socket poll error");
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
* domain.
*/
if (session->kernel_session->consumer) {
- consumer_destroy_output(session->kernel_session->consumer);
+ consumer_output_put(session->kernel_session->consumer);
}
session->kernel_session->consumer =
consumer_copy_output(session->consumer);
case LTTNG_DOMAIN_UST:
DBG3("Copying tracing session consumer output in UST session");
if (session->ust_session->consumer) {
- consumer_destroy_output(session->ust_session->consumer);
+ consumer_output_put(session->ust_session->consumer);
}
session->ust_session->consumer =
consumer_copy_output(session->consumer);
session->kernel_session->consumer->dst.trace_path,
S_IRWXU | S_IRWXG, session->uid, session->gid);
if (ret < 0) {
- if (ret != -EEXIST) {
+ if (errno != EEXIST) {
ERR("Trace directory creation error");
goto error;
}
DBG("Processing client command %d", cmd_ctx->lsm->cmd_type);
+ assert(!rcu_read_ongoing());
+
*sock_error = 0;
switch (cmd_ctx->lsm->cmd_type) {
switch (cmd_ctx->lsm->cmd_type) {
case LTTNG_ADD_CONTEXT:
{
- ret = cmd_add_context(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+ /*
+ * An LTTNG_ADD_CONTEXT command might have a supplementary
+ * payload if the context being added is an application context.
+ */
+ if (cmd_ctx->lsm->u.context.ctx.ctx ==
+ LTTNG_EVENT_CONTEXT_APP_CONTEXT) {
+ char *provider_name = NULL, *context_name = NULL;
+ size_t provider_name_len =
+ cmd_ctx->lsm->u.context.provider_name_len;
+ size_t context_name_len =
+ cmd_ctx->lsm->u.context.context_name_len;
+
+ if (provider_name_len == 0 || context_name_len == 0) {
+ /*
+ * Application provider and context names MUST
+ * be provided.
+ */
+ ret = -LTTNG_ERR_INVALID;
+ goto error;
+ }
+
+ provider_name = zmalloc(provider_name_len + 1);
+ if (!provider_name) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error;
+ }
+
+ context_name = zmalloc(context_name_len + 1);
+ if (!context_name) {
+ ret = -LTTNG_ERR_NOMEM;
+ goto error_add_context;
+ }
+
+ ret = lttcomm_recv_unix_sock(sock, provider_name,
+ provider_name_len);
+ if (ret < 0) {
+ goto error_add_context;
+ }
+
+ ret = lttcomm_recv_unix_sock(sock, context_name,
+ context_name_len);
+ if (ret < 0) {
+ goto error_add_context;
+ }
+ cmd_ctx->lsm->u.context.ctx.u.app_ctx.provider_name =
+ provider_name;
+ cmd_ctx->lsm->u.context.ctx.u.app_ctx.ctx_name =
+ context_name;
+ }
+
+ /*
+ * cmd_add_context assumes ownership of the provider and context
+ * names.
+ */
+ ret = cmd_add_context(cmd_ctx->session,
+ cmd_ctx->lsm->domain.type,
cmd_ctx->lsm->u.context.channel_name,
- &cmd_ctx->lsm->u.context.ctx, kernel_poll_pipe[1]);
+ &cmd_ctx->lsm->u.context.ctx,
+ kernel_poll_pipe[1]);
+
+ cmd_ctx->lsm->u.context.ctx.u.app_ctx.provider_name = NULL;
+ cmd_ctx->lsm->u.context.ctx.u.app_ctx.ctx_name = NULL;
+error_add_context:
+ free(cmd_ctx->lsm->u.context.ctx.u.app_ctx.provider_name);
+ free(cmd_ctx->lsm->u.context.ctx.u.app_ctx.ctx_name);
+ if (ret < 0) {
+ goto error;
+ }
break;
}
case LTTNG_DISABLE_CHANNEL:
}
case LTTNG_DISABLE_EVENT:
{
+
+ /*
+ * FIXME: handle filter; for now we just receive the filter's
+ * bytecode along with the filter expression which are sent by
+ * liblttng-ctl and discard them.
+ *
+ * This fixes an issue where the client may block while sending
+ * the filter payload and encounter an error because the session
+ * daemon closes the socket without ever handling this data.
+ */
+ size_t count = cmd_ctx->lsm->u.disable.expression_len +
+ cmd_ctx->lsm->u.disable.bytecode_len;
+
+ if (count) {
+ char data[LTTNG_FILTER_MAX_LEN];
+
+ DBG("Discarding disable event command payload of size %zu", count);
+ while (count) {
+ ret = lttcomm_recv_unix_sock(sock, data,
+ count > sizeof(data) ? sizeof(data) : count);
+ if (ret < 0) {
+ goto error;
+ }
+
+ count -= (size_t) ret;
+ }
+ }
/* FIXME: passing packed structure to non-packed pointer */
- /* TODO: handle filter */
ret = cmd_disable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type,
cmd_ctx->lsm->u.disable.channel_name,
&cmd_ctx->lsm->u.disable.event);
}
case LTTNG_DATA_PENDING:
{
- ret = cmd_data_pending(cmd_ctx->session);
+ int pending_ret;
+
+ /* 1 byte to return whether or not data is pending */
+ ret = setup_lttng_msg(cmd_ctx, 1);
+ if (ret < 0) {
+ goto setup_error;
+ }
+
+ pending_ret = cmd_data_pending(cmd_ctx->session);
+ /*
+ * FIXME
+ *
+ * This function may returns 0 or 1 to indicate whether or not
+ * there is data pending. In case of error, it should return an
+ * LTTNG_ERR code. However, some code paths may still return
+ * a nondescript error code, which we handle by returning an
+ * "unknown" error.
+ */
+ if (pending_ret == 0 || pending_ret == 1) {
+ ret = LTTNG_OK;
+ } else if (pending_ret < 0) {
+ ret = LTTNG_ERR_UNK;
+ goto setup_error;
+ } else {
+ ret = pending_ret;
+ goto setup_error;
+ }
+
+ *cmd_ctx->llm->payload = (uint8_t) pending_ret;
break;
}
case LTTNG_SNAPSHOT_ADD_OUTPUT:
session_unlock_list();
}
init_setup_error:
+ assert(!rcu_read_ongoing());
return ret;
}
/* Event on the registration socket */
if (pollfd == sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("Health socket poll error");
goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
/* Event on the registration socket */
if (pollfd == client_sock) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("Client socket poll error");
goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
health_code_update();
- DBG("Sending response (size: %d, retcode: %s)",
+ DBG("Sending response (size: %d, retcode: %s (%d))",
cmd_ctx->lttng_msg_size,
- lttng_strerror(-cmd_ctx->llm->ret_code));
+ lttng_strerror(-cmd_ctx->llm->ret_code),
+ cmd_ctx->llm->ret_code);
ret = send_unix_sock(sock, cmd_ctx->llm, cmd_ctx->lttng_msg_size);
if (ret < 0) {
ERR("Failed to send data back to client");
/*
* config_entry_handler_cb used to handle options read from a config file.
- * See config_entry_handler_cb comment in common/config/config.h for the
+ * See config_entry_handler_cb comment in common/config/session-config.h for the
* return value conventions.
*/
static int config_entry_handler(const struct config_entry *entry, void *unused)
}
}
+ if (run_as_create_worker(argv[0]) < 0) {
+ goto exit_create_run_as_worker_cleanup;
+ }
+
/*
* Starting from here, we can create threads. This needs to be after
* lttng_daemonize due to RCU.
}
exit_reg_apps:
+ /*
+ * Join dispatch thread after joining reg_apps_thread to ensure
+ * we don't leak applications in the queue.
+ */
ret = pthread_join(dispatch_thread, &status);
if (ret) {
errno = ret;
health_app_destroy(health_sessiond);
exit_health_sessiond_cleanup:
+exit_create_run_as_worker_cleanup:
exit_options:
+ /* Ensure all prior call_rcu are done. */
+ rcu_barrier();
+
sessiond_cleanup_options();
exit_set_signal_handler:
+
if (!retval) {
exit(EXIT_SUCCESS);
} else {