X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=15bb7255a882dab71abe546e9d8e95bc631f92ea;hb=ee69440bd276dbc8a5c78b0138c8e6944d571b4a;hp=2698fb46cb47e2d76ce1f0f22adba8155620f4a5;hpb=d0b96690836f4b876096f3dc14801f8e25281a77;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 2698fb46c..15bb7255a 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +46,7 @@ #include #include "lttng-sessiond.h" +#include "buffer-registry.h" #include "channel.h" #include "cmd.h" #include "consumer.h" @@ -88,6 +90,7 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -99,6 +102,7 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -110,6 +114,7 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .metadata_sock.fd = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, @@ -435,6 +440,7 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); + buffer_reg_destroy_registries(); if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); @@ -863,10 +869,10 @@ static void *thread_manage_consumer(void *data) health_code_update(); /* - * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. - * Nothing more will be added to this poll set. + * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the + * metadata_sock. Nothing more will be added to this poll set. */ - ret = sessiond_set_thread_pollset(&events, 2); + ret = sessiond_set_thread_pollset(&events, 3); if (ret < 0) { goto error_poll; } @@ -883,7 +889,7 @@ static void *thread_manage_consumer(void *data) health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart: health_poll_entry(); @@ -953,87 +959,126 @@ restart: health_code_update(); if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { + /* Connect both socket, command and metadata. */ consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0) { + consumer_data->metadata_sock.fd = + lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 || + consumer_data->metadata_sock.fd < 0) { + PERROR("consumer connect cmd socket"); /* On error, signal condition and quit. */ signal_consumer_condition(consumer_data, -1); - PERROR("consumer connect"); goto error; } + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = -1; + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + signal_consumer_condition(consumer_data, 1); - DBG("Consumer command socket ready"); + DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_sock.fd); } else { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } - /* Remove the kconsumerd error sock since we've established a connexion */ + /* Remove the consumerd error sock since we've established a connexion */ ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; } + /* Add new accepted error socket. */ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } + /* Add metadata socket that is successfully connected. */ + ret = lttng_poll_add(&events, consumer_data->metadata_sock.fd, + LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart_poll: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart_poll; + while (1) { + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart_poll; + } + goto error; } - goto error; - } - nb_fd = ret; + nb_fd = ret; - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(); + health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } + /* Thread quit pipe has been closed. Killing thread. */ + ret = sessiond_check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } - /* Event on the kconsumerd socket */ - if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("consumer err socket second poll error"); + if (pollfd == sock) { + /* Event on the consumerd socket */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("consumer err socket second poll error"); + goto error; + } + health_code_update(); + /* Wait for any kconsumerd error */ + ret = lttcomm_recv_unix_sock(sock, &code, + sizeof(enum lttcomm_return_code)); + if (ret <= 0) { + ERR("consumer closed the command socket"); + goto error; + } + + ERR("consumer return code : %s", + lttcomm_get_readable_code(-code)); + + goto exit; + } else if (pollfd == consumer_data->metadata_sock.fd) { + /* UST metadata requests */ + ret = ust_consumer_metadata_request( + &consumer_data->metadata_sock); + if (ret < 0) { + ERR("Handling metadata request"); + goto error; + } + break; + } else { + ERR("Unknown pollfd"); goto error; } } + health_code_update(); } - health_code_update(); - - /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); - if (ret <= 0) { - ERR("consumer closed the command socket"); - goto error; - } - - ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); - exit: error: /* Immediately set the consumerd state to stopped */ @@ -1059,6 +1104,16 @@ error: PERROR("close"); } } + if (consumer_data->metadata_sock.fd >= 0) { + ret = close(consumer_data->metadata_sock.fd); + if (ret) { + PERROR("close"); + } + } + /* Cleanup metadata socket mutex. */ + pthread_mutex_destroy(consumer_data->metadata_sock.lock); + free(consumer_data->metadata_sock.lock); + if (sock >= 0) { ret = close(sock); if (ret) { @@ -1244,41 +1299,25 @@ error_testpoint: } /* - * Send the application sockets (cmd and notify) to the respective threads. - * This is called from the dispatch UST registration thread once all sockets - * are set for the application. + * Send a socket to a thread This is called from the dispatch UST registration + * thread once all sockets are set for the application. * * On success, return 0 else a negative value being the errno message of the * write(). */ -static int send_app_sockets_to_threads(struct ust_app *app) +static int send_socket_to_thread(int fd, int sock) { int ret; - assert(app); /* Sockets MUST be set or else this should not have been called. */ - assert(app->sock >= 0); - assert(app->notify_sock >= 0); - assert(apps_cmd_pipe[1] >= 0); - assert(apps_cmd_notify_pipe[1] >= 0); + assert(fd >= 0); + assert(sock >= 0); do { - ret = write(apps_cmd_pipe[1], &app->sock, sizeof(app->sock)); + ret = write(fd, &sock, sizeof(sock)); } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != sizeof(app->sock)) { - PERROR("write apps cmd pipe %d", apps_cmd_pipe[1]); - if (ret < 0) { - ret = -errno; - } - goto error; - } - - do { - ret = write(apps_cmd_notify_pipe[1], &app->notify_sock, - sizeof(app->notify_sock)); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || ret != sizeof(app->notify_sock)) { - PERROR("write apps notify cmd pipe %d", apps_cmd_notify_pipe[1]); + if (ret < 0 || ret != sizeof(sock)) { + PERROR("write apps pipe %d", fd); if (ret < 0) { ret = -errno; } @@ -1303,7 +1342,7 @@ static void *thread_dispatch_ust_registration(void *data) struct { struct ust_app *app; struct cds_list_head head; - } *wait_node = NULL; + } *wait_node = NULL, *tmp_wait_node; CDS_LIST_HEAD(wait_queue); @@ -1315,6 +1354,7 @@ static void *thread_dispatch_ust_registration(void *data) do { struct ust_app *app = NULL; + ust_cmd = NULL; /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); @@ -1337,6 +1377,7 @@ static void *thread_dispatch_ust_registration(void *data) wait_node = zmalloc(sizeof(*wait_node)); if (!wait_node) { PERROR("zmalloc wait_node dispatch"); + free(ust_cmd); goto error; } CDS_INIT_LIST_HEAD(&wait_node->head); @@ -1349,6 +1390,9 @@ static void *thread_dispatch_ust_registration(void *data) if (ret < 0) { PERROR("close ust sock dispatch %d", ust_cmd->sock); } + lttng_fd_put(1, LTTNG_FD_APPS); + free(wait_node); + free(ust_cmd); continue; } /* @@ -1357,6 +1401,7 @@ static void *thread_dispatch_ust_registration(void *data) */ cds_list_add(&wait_node->head, &wait_queue); + free(ust_cmd); /* * We have to continue here since we don't have the notify * socket and the application MUST be added to the hash table @@ -1368,7 +1413,8 @@ static void *thread_dispatch_ust_registration(void *data) * Look for the application in the local wait queue and set the * notify socket if found. */ - cds_list_for_each_entry(wait_node, &wait_queue, head) { + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue, head) { if (wait_node->app->pid == ust_cmd->reg_msg.pid) { wait_node->app->notify_sock = ust_cmd->sock; cds_list_del(&wait_node->head); @@ -1378,13 +1424,10 @@ static void *thread_dispatch_ust_registration(void *data) break; } } + free(ust_cmd); } if (app) { - ret = send_app_sockets_to_threads(app); - if (ret < 0) { - goto error; - } /* * @session_lock_list * @@ -1394,29 +1437,52 @@ static void *thread_dispatch_ust_registration(void *data) */ session_lock_list(); rcu_read_lock(); + /* * Add application to the global hash table. This needs to be * done before the update to the UST registry can locate the * application. */ ust_app_add(app); - /* - * Get app version. - */ - ret = ust_app_version(app); - if (ret) { - ERR("Unable to get app version"); + + /* Set app version. This call will print an error if needed. */ + (void) ust_app_version(app); + + /* Send notify socket through the notify pipe. */ + ret = send_socket_to_thread(apps_cmd_notify_pipe[1], + app->notify_sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); + /* No notify thread, stop the UST tracing. */ + goto error; } + /* * Update newly registered application with the tracing * registry info already enabled information. */ update_ust_app(app->sock); - ret = ust_app_register_done(app->sock); + + /* + * Don't care about return value. Let the manage apps threads + * handle app unregistration upon socket close. + */ + (void) ust_app_register_done(app->sock); + + /* + * Even if the application socket has been closed, send the app + * to the thread and unregistration will take place at that + * place. + */ + ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock); if (ret < 0) { - /* Remove application from the registry. */ - ust_app_unregister(app->sock); + rcu_read_unlock(); + session_unlock_list(); + /* No apps. thread, stop the UST tracing. */ + goto error; } + rcu_read_unlock(); session_unlock_list(); } else { @@ -1425,8 +1491,8 @@ static void *thread_dispatch_ust_registration(void *data) if (ret < 0) { PERROR("close ust_cmd sock"); } + lttng_fd_put(1, LTTNG_FD_APPS); } - free(ust_cmd); } while (node != NULL); /* Futex wait on queue. Blocking call on futex() */ @@ -1434,6 +1500,13 @@ static void *thread_dispatch_ust_registration(void *data) } error: + /* Clean up wait queue. */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue, head) { + cds_list_del(&wait_node->head); + free(wait_node); + } + DBG("Dispatch thread dying"); return NULL; } @@ -1561,6 +1634,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(); ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); if (ret < 0) { @@ -1864,6 +1938,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -1908,6 +1983,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -1988,7 +2064,7 @@ end: return 0; error: - /* Cleanup already created socket on error. */ + /* Cleanup already created sockets on error. */ if (consumer_data->err_sock >= 0) { int err; @@ -2595,13 +2671,13 @@ skip_domain: } case LTTNG_ENABLE_CHANNEL: { - ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_channel(cmd_ctx->session, &cmd_ctx->lsm->domain, &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); break; } case LTTNG_ENABLE_EVENT: { - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, &cmd_ctx->lsm->u.enable.event, NULL, kernel_poll_pipe[1]); break; @@ -2610,7 +2686,7 @@ skip_domain: { DBG("Enabling all events"); - ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event_all(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, cmd_ctx->lsm->u.enable.event.type, NULL, kernel_poll_pipe[1]); break; @@ -2959,7 +3035,7 @@ skip_domain: goto error; } - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, &cmd_ctx->lsm->u.enable.event, bytecode, kernel_poll_pipe[1]); break; @@ -4202,6 +4278,10 @@ int main(int argc, char **argv) goto exit; } + /* Initialize global buffer per UID and PID registry. */ + buffer_reg_init_uid_registry(); + buffer_reg_init_pid_registry(); + /* Init UST command queue. */ cds_wfq_init(&ust_cmd_queue.queue);