X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=ltt-sessiond%2Fmain.c;h=d2240d9dd89a1f9cec011e233955b9c33a441b7e;hp=094fe4adf75fd68b80cfed7e3665309340efc753;hb=5eb91c9837a7b379a74e99358f0f9fb10011ef74;hpb=71615260f4e0d7a58bf64a837c081af2d155ef9d diff --git a/ltt-sessiond/main.c b/ltt-sessiond/main.c index 094fe4adf..d2240d9dd 100644 --- a/ltt-sessiond/main.c +++ b/ltt-sessiond/main.c @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -44,6 +43,7 @@ #include #include +#include "compat/poll.h" #include "context.h" #include "futex.h" #include "kernel-ctl.h" @@ -72,7 +72,6 @@ static int opt_daemon; static int is_root; /* Set to 1 if the daemon is running as root */ static pid_t ppid; /* Parent PID for --sig-parent option */ static pid_t kconsumerd_pid; -static struct pollfd *kernel_pollfd; static int dispatch_thread_exit; static char apps_unix_sock_path[PATH_MAX]; /* Global application Unix socket path */ @@ -110,7 +109,9 @@ static pthread_t kernel_thread; static pthread_t dispatch_thread; static sem_t kconsumerd_sem; -static pthread_mutex_t kconsumerd_pid_mutex; /* Mutex to control kconsumerd pid assignation */ + +/* Mutex to control kconsumerd pid assignation */ +static pthread_mutex_t kconsumerd_pid_mutex; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -133,6 +134,50 @@ static struct ust_cmd_queue ust_cmd_queue; */ static struct ltt_session_list *session_list_ptr; +/* + * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. + */ +static int create_thread_poll_set(struct lttng_poll_event *events, + unsigned int size) +{ + int ret; + + if (events == NULL || size == 0) { + ret = -1; + goto error; + } + + ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); + if (ret < 0) { + goto error; + } + + /* Add quit pipe */ + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + return 0; + +error: + return ret; +} + +/* + * Check if the thread quit pipe was triggered. + * + * Return 1 if it was triggered else 0; + */ +static int check_thread_quit_pipe(int fd, uint32_t events) +{ + if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { + return 1; + } + + return 0; +} + /* * Remove modules in reverse load order. */ @@ -188,7 +233,7 @@ static gid_t allowed_group(void) } /* - * Init quit pipe. + * Init thread quit pipe. * * Return -1 on error or 0 if all pipes are created. */ @@ -234,10 +279,15 @@ static void teardown_kernel_session(struct ltt_session *session) */ static void stop_threads(void) { + int ret; + /* Stopping all threads */ DBG("Terminating all threads"); - close(thread_quit_pipe[0]); - close(thread_quit_pipe[1]); + ret = write(thread_quit_pipe[1], "!", 1); + if (ret < 0) { + ERR("write error on thread quit pipe"); + } + /* Dispatch thread */ dispatch_thread_exit = 1; futex_nto1_wake(&ust_cmd_queue.futex); @@ -260,16 +310,18 @@ static void cleanup(void) 27, 1, 31, 27, 0, 27, 1, 33, 27, 0); /* */ - DBG("Removing %s directory", LTTNG_RUNDIR); - ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR); - if (ret < 0) { - ERR("asprintf failed. Something is really wrong!"); - } + if (is_root) { + DBG("Removing %s directory", LTTNG_RUNDIR); + ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR); + if (ret < 0) { + ERR("asprintf failed. Something is really wrong!"); + } - /* Remove lttng run directory */ - ret = system(cmd); - if (ret < 0) { - ERR("Unable to clean " LTTNG_RUNDIR); + /* Remove lttng run directory */ + ret = system(cmd); + if (ret < 0) { + ERR("Unable to clean " LTTNG_RUNDIR); + } } DBG("Cleaning up all session"); @@ -297,6 +349,9 @@ static void cleanup(void) DBG("Unloading kernel modules"); modprobe_remove_kernel_modules(); } + + close(thread_quit_pipe[0]); + close(thread_quit_pipe[1]); } /* @@ -505,23 +560,17 @@ error: } /* - * Update the kernel pollfd set of all channel fd available over all tracing + * Update the kernel poll set of all channel fd available over all tracing * session. Add the wakeup pipe at the end of the set. */ -static int update_kernel_pollfd(void) +static int update_kernel_poll(struct lttng_poll_event *events) { - int i = 0; - /* - * The wakup pipe and the quit pipe are needed so the number of fds starts - * at 2 for those pipes. - */ - unsigned int nb_fd = 2; + int ret; struct ltt_session *session; struct ltt_kernel_channel *channel; - DBG("Updating kernel_pollfd"); + DBG("Updating kernel poll set"); - /* Get the number of channel of all kernel session */ lock_session_list(); cds_list_for_each_entry(session, &session_list_ptr->head, list) { lock_session(session); @@ -529,48 +578,21 @@ static int update_kernel_pollfd(void) unlock_session(session); continue; } - nb_fd += session->kernel_session->channel_count; - unlock_session(session); - } - - DBG("Resizing kernel_pollfd to size %d", nb_fd); - kernel_pollfd = realloc(kernel_pollfd, nb_fd * sizeof(struct pollfd)); - if (kernel_pollfd == NULL) { - perror("malloc kernel_pollfd"); - goto error; - } - - cds_list_for_each_entry(session, &session_list_ptr->head, list) { - lock_session(session); - if (session->kernel_session == NULL) { - unlock_session(session); - continue; - } - if (i >= nb_fd) { - ERR("To much channel for kernel_pollfd size"); - unlock_session(session); - break; - } cds_list_for_each_entry(channel, &session->kernel_session->channel_list.head, list) { - kernel_pollfd[i].fd = channel->fd; - kernel_pollfd[i].events = POLLIN | POLLRDNORM; - i++; + /* Add channel fd to the kernel poll set */ + ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM); + if (ret < 0) { + unlock_session(session); + goto error; + } + DBG("Channel fd %d added to kernel set", channel->fd); } unlock_session(session); } unlock_session_list(); - /* Adding wake up pipe */ - kernel_pollfd[nb_fd - 2].fd = kernel_poll_pipe[0]; - kernel_pollfd[nb_fd - 2].events = POLLIN; - - /* Adding the quit pipe */ - kernel_pollfd[nb_fd - 1].fd = thread_quit_pipe[0]; - kernel_pollfd[nb_fd - 1].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; - - return nb_fd; + return 0; error: unlock_session_list(); @@ -604,7 +626,8 @@ static int update_kernel_stream(int fd) session->kernel_session->consumer_fd = kconsumerd_cmd_sock; } - cds_list_for_each_entry(channel, &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, + &session->kernel_session->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -613,12 +636,13 @@ static int update_kernel_stream(int fd) } /* - * Have we already sent fds to the consumer? If yes, it means that - * tracing is started so it is safe to send our updated stream fds. + * Have we already sent fds to the consumer? If yes, it means + * that tracing is started so it is safe to send our updated + * stream fds. */ if (session->kernel_session->kconsumer_fds_sent == 1) { - ret = send_kconsumerd_channel_fds(session->kernel_session->consumer_fd, - channel); + ret = send_kconsumerd_channel_fds( + session->kernel_session->consumer_fd, channel); if (ret < 0) { goto error; } @@ -645,27 +669,42 @@ error: */ static void *thread_manage_kernel(void *data) { - int ret, i, nb_fd = 0; + int ret, i, pollfd, update_poll_flag = 1; + uint32_t revents, nb_fd; char tmp; - int update_poll_flag = 1; + struct lttng_poll_event events; DBG("Thread manage kernel started"); + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } + + ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + while (1) { if (update_poll_flag == 1) { - nb_fd = update_kernel_pollfd(); - if (nb_fd < 0) { + ret = update_kernel_poll(&events); + if (ret < 0) { goto error; } update_poll_flag = 0; } - DBG("Polling on %d fds", nb_fd); + nb_fd = LTTNG_POLL_GETNB(&events); + + DBG("Thread kernel polling on %d fds", nb_fd); + + /* Zeroed the poll events */ + lttng_poll_reset(&events); /* Poll infinite value of time */ - ret = poll(kernel_pollfd, nb_fd, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll kernel thread"); goto error; } else if (ret == 0) { /* Should not happen since timeout is infinite */ @@ -674,52 +713,49 @@ static void *thread_manage_kernel(void *data) continue; } - /* Thread quit pipe has been closed. Killing thread. */ - if (kernel_pollfd[nb_fd - 1].revents == POLLNVAL) { - goto error; - } - - DBG("Kernel poll event triggered"); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - /* - * Check if the wake up pipe was triggered. If so, the kernel_pollfd - * must be updated. - */ - switch (kernel_pollfd[nb_fd - 2].revents) { - case POLLIN: - ret = read(kernel_poll_pipe[0], &tmp, 1); - update_poll_flag = 1; - continue; - case POLLERR: - goto error; - default: - break; - } + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } - for (i = 0; i < nb_fd; i++) { - switch (kernel_pollfd[i].revents) { - /* - * New CPU detected by the kernel. Adding kernel stream to kernel - * session and updating the kernel consumer - */ - case POLLIN | POLLRDNORM: - ret = update_kernel_stream(kernel_pollfd[i].fd); - if (ret < 0) { - continue; + /* Check for data on kernel pipe */ + if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) { + ret = read(kernel_poll_pipe[0], &tmp, 1); + update_poll_flag = 1; + continue; + } else { + /* + * New CPU detected by the kernel. Adding kernel stream to + * kernel session and updating the kernel consumer + */ + if (revents & LPOLLIN) { + ret = update_kernel_stream(pollfd); + if (ret < 0) { + continue; + } + break; + /* + * TODO: We might want to handle the LPOLLERR | LPOLLHUP + * and unregister kernel stream at this point. + */ } - break; } } } error: DBG("Kernel thread dying"); - if (kernel_pollfd) { - free(kernel_pollfd); - } - close(kernel_poll_pipe[0]); close(kernel_poll_pipe[1]); + + lttng_poll_clean(&events); + return NULL; } @@ -728,9 +764,10 @@ error: */ static void *thread_manage_kconsumerd(void *data) { - int sock = 0, ret; + int sock = 0, i, ret, pollfd; + uint32_t revents, nb_fd; enum lttcomm_return_code code; - struct pollfd pollfd[2]; + struct lttng_poll_event events; DBG("[thread] Manage kconsumerd started"); @@ -739,26 +776,46 @@ static void *thread_manage_kconsumerd(void *data) goto error; } - /* First fd is always the quit pipe */ - pollfd[0].fd = thread_quit_pipe[0]; + /* + * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. + * Nothing more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - /* Apps socket */ - pollfd[1].fd = kconsumerd_err_sock; - pollfd[1].events = POLLIN; + ret = lttng_poll_add(&events, kconsumerd_err_sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + + nb_fd = LTTNG_POLL_GETNB(&events); /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll kconsumerd thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { - goto error; - } else if (pollfd[1].revents == POLLERR) { - ERR("Kconsumerd err socket poll error"); - goto error; + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Event on the registration socket */ + if (pollfd == kconsumerd_err_sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Kconsumerd err socket poll error"); + goto error; + } + } } sock = lttcomm_accept_unix_sock(kconsumerd_err_sock); @@ -788,25 +845,46 @@ static void *thread_manage_kconsumerd(void *data) goto error; } - /* Kconsumerd err socket */ - pollfd[1].fd = sock; - pollfd[1].events = POLLIN; - - /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + /* Remove the kconsumerd error socket since we have established a connexion */ + ret = lttng_poll_del(&events, kconsumerd_err_sock); if (ret < 0) { - perror("poll kconsumerd thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { goto error; - } else if (pollfd[1].revents == POLLERR) { - ERR("Kconsumerd err socket second poll error"); + } + + /* Update number of fd */ + nb_fd = LTTNG_POLL_GETNB(&events); + + /* Inifinite blocking call, waiting for transmission */ + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { goto error; } + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Event on the kconsumerd socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Kconsumerd err socket second poll error"); + goto error; + } + } + } + /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); if (ret <= 0) { @@ -818,97 +896,17 @@ static void *thread_manage_kconsumerd(void *data) error: DBG("Kconsumerd thread dying"); - if (kconsumerd_err_sock) { - close(kconsumerd_err_sock); - } - if (kconsumerd_cmd_sock) { - close(kconsumerd_cmd_sock); - } - if (sock) { - close(sock); - } + close(kconsumerd_err_sock); + close(kconsumerd_cmd_sock); + close(sock); unlink(kconsumerd_err_unix_sock_path); unlink(kconsumerd_cmd_unix_sock_path); - kconsumerd_pid = 0; - return NULL; -} -/* - * Reallocate the apps command pollfd structure of nb_fd size. - * - * The first two fds must be there at all time. - */ -static int update_apps_cmd_pollfd(unsigned int nb_fd, unsigned int old_nb_fd, - struct pollfd **pollfd) -{ - int i, count; - struct pollfd *old_pollfd = NULL; + lttng_poll_clean(&events); - /* Can't accept pollfd less than 2 */ - if (nb_fd < 2) { - goto end; - } - - if (*pollfd) { - /* Save pointer */ - old_pollfd = *pollfd; - } - - *pollfd = malloc(nb_fd * sizeof(struct pollfd)); - if (*pollfd == NULL) { - perror("malloc manage apps pollfd"); - goto error; - } - - /* First fd is always the quit pipe */ - (*pollfd)[0].fd = thread_quit_pipe[0]; - (*pollfd)[0].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; - - /* Apps command pipe */ - (*pollfd)[1].fd = apps_cmd_pipe[0]; - (*pollfd)[1].events = POLLIN; - - /* Start count after the two pipes below */ - count = 2; - for (i = 2; i < old_nb_fd; i++) { - /* Add to new pollfd */ - if (old_pollfd[i].fd != -1) { - (*pollfd)[count].fd = old_pollfd[i].fd; - (*pollfd)[count].events = POLLHUP | POLLNVAL | POLLERR; - count++; - } - - if (count > nb_fd) { - ERR("Updating poll fd wrong size"); - goto error; - } - } - - if (nb_fd < 2) { - /* - * There should *always* be at least two fds in the pollfd. This safety - * check make sure the poll() will actually try on those two pipes at - * best which are the thread_quit_pipe and apps_cmd_pipe. - */ - nb_fd = 2; - MSG("nb_fd < 2 --> Not good! Continuing..."); - } - - /* Destroy old pollfd */ - free(old_pollfd); - - DBG("Apps cmd pollfd realloc of size %d", nb_fd); - -end: - return 0; - -error: - /* Destroy old pollfd */ - free(old_pollfd); - return -1; + return NULL; } /* @@ -916,105 +914,107 @@ error: */ static void *thread_manage_apps(void *data) { - int i, ret, current_nb_fd; - unsigned int nb_fd = 2; - int update_poll_flag = 1; - struct pollfd *pollfd = NULL; + int i, ret, pollfd; + uint32_t revents, nb_fd; struct ust_command ust_cmd; + struct lttng_poll_event events; DBG("[thread] Manage application started"); - ust_cmd.sock = -1; - current_nb_fd = nb_fd; + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - while (1) { - /* See if we have a valid socket to add to pollfd */ - if (ust_cmd.sock != -1) { - nb_fd++; - update_poll_flag = 1; - } + ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } - /* The pollfd struct must be updated */ - if (update_poll_flag) { - ret = update_apps_cmd_pollfd(nb_fd, current_nb_fd, &pollfd); - if (ret < 0) { - /* malloc failed so we quit */ - goto error; - } + while (1) { + /* Zeroed the events structure */ + lttng_poll_reset(&events); - if (ust_cmd.sock != -1) { - /* Update pollfd with the new UST socket */ - DBG("Adding sock %d to apps cmd pollfd", ust_cmd.sock); - pollfd[nb_fd - 1].fd = ust_cmd.sock; - pollfd[nb_fd - 1].events = POLLHUP | POLLNVAL | POLLERR; - ust_cmd.sock = -1; - } - } + nb_fd = LTTNG_POLL_GETNB(&events); DBG("Apps thread polling on %d fds", nb_fd); /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, nb_fd, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll apps thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { - goto error; - } else { - /* apps_cmd_pipe pipe events */ - switch (pollfd[1].revents) { - case POLLERR: - ERR("Apps command pipe poll error"); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { goto error; - case POLLIN: - /* Empty pipe */ - ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); - if (ret < 0 || ret < sizeof(ust_cmd)) { - perror("read apps cmd pipe"); - goto error; - } + } - /* Register applicaton to the session daemon */ - ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock); - if (ret < 0) { - /* Only critical ENOMEM error can be returned here */ + /* Inspect the apps cmd pipe */ + if (pollfd == apps_cmd_pipe[0]) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps command pipe error"); goto error; - } + } else if (revents & LPOLLIN) { + /* Empty pipe */ + ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); + if (ret < 0 || ret < sizeof(ust_cmd)) { + perror("read apps cmd pipe"); + goto error; + } - ret = ustctl_register_done(ust_cmd.sock); - if (ret < 0) { - /* - * If the registration is not possible, we simply unregister - * the apps and continue - */ - unregister_traceable_app(ust_cmd.sock); + /* Register applicaton to the session daemon */ + ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock); + if (ret < 0) { + /* Only critical ENOMEM error can be returned here */ + goto error; + } + + ret = ustctl_register_done(ust_cmd.sock); + if (ret < 0) { + /* + * If the registration is not possible, we simply + * unregister the apps and continue + */ + unregister_traceable_app(ust_cmd.sock); + } else { + /* + * We just need here to monitor the close of the UST + * socket and poll set monitor those by default. + */ + ret = lttng_poll_add(&events, ust_cmd.sock, 0); + if (ret < 0) { + goto error; + } + + DBG("Apps with sock %d added to poll set", ust_cmd.sock); + } + break; } - break; - } - } + } else { + /* + * At this point, we know that a registered application made the + * event at poll_wait. + */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } - current_nb_fd = nb_fd; - for (i = 2; i < current_nb_fd; i++) { - /* Apps socket is closed/hungup */ - switch (pollfd[i].revents) { - case POLLERR: - case POLLHUP: - case POLLNVAL: - /* Pipe closed */ - unregister_traceable_app(pollfd[i].fd); - /* Indicate to remove this fd from the pollfd */ - pollfd[i].fd = -1; - nb_fd--; - break; + /* Socket closed */ + unregister_traceable_app(pollfd); + break; + } } } - - if (nb_fd != current_nb_fd) { - update_poll_flag = 1; - } } error: @@ -1022,7 +1022,7 @@ error: close(apps_cmd_pipe[0]); close(apps_cmd_pipe[1]); - free(pollfd); + lttng_poll_clean(&events); return NULL; } @@ -1096,8 +1096,9 @@ error: */ static void *thread_registration_apps(void *data) { - int sock = 0, ret; - struct pollfd pollfd[2]; + int sock = 0, i, ret, pollfd; + uint32_t revents, nb_fd; + struct lttng_poll_event events; /* * Get allocated in this thread, enqueued to a global queue, dequeued and * freed in the manage apps thread. @@ -1111,14 +1112,20 @@ static void *thread_registration_apps(void *data) goto error; } - /* First fd is always the quit pipe */ - pollfd[0].fd = thread_quit_pipe[0]; - pollfd[0].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; + /* + * Pass 2 as size here for the thread quit pipe and apps socket. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - /* Apps socket */ - pollfd[1].fd = apps_sock; - pollfd[1].events = POLLIN; + /* Add the application registration socket */ + ret = lttng_poll_add(&events, apps_sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } /* Notify all applications to register */ ret = notify_ust_apps(1); @@ -1131,75 +1138,81 @@ static void *thread_registration_apps(void *data) while (1) { DBG("Accepting application registration"); + nb_fd = LTTNG_POLL_GETNB(&events); + /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll register apps thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { - goto error; - } + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - switch (pollfd[1].revents) { - case POLLNVAL: - case POLLHUP: - case POLLRDHUP: - case POLLERR: - ERR("Register apps socket poll error"); - goto error; - case POLLIN: - sock = lttcomm_accept_unix_sock(apps_sock); - if (sock < 0) { + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { goto error; } - /* Create UST registration command for enqueuing */ - ust_cmd = malloc(sizeof(struct ust_command)); - if (ust_cmd == NULL) { - perror("ust command malloc"); - goto error; - } + /* Event on the registration socket */ + if (pollfd == apps_sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Register apps socket poll error"); + goto error; + } else if (revents & LPOLLIN) { + sock = lttcomm_accept_unix_sock(apps_sock); + if (sock < 0) { + goto error; + } - /* - * Using message-based transmissions to ensure we don't have to deal - * with partially received messages. - */ - ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, - sizeof(struct ust_register_msg)); - if (ret < 0 || ret < sizeof(struct ust_register_msg)) { - if (ret < 0) { - perror("lttcomm_recv_unix_sock register apps"); - } else { - ERR("Wrong size received on apps register"); - } - free(ust_cmd); - close(sock); - continue; - } + /* Create UST registration command for enqueuing */ + ust_cmd = malloc(sizeof(struct ust_command)); + if (ust_cmd == NULL) { + perror("ust command malloc"); + goto error; + } - ust_cmd->sock = sock; + /* + * Using message-based transmissions to ensure we don't + * have to deal with partially received messages. + */ + ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, + sizeof(struct ust_register_msg)); + if (ret < 0 || ret < sizeof(struct ust_register_msg)) { + if (ret < 0) { + perror("lttcomm_recv_unix_sock register apps"); + } else { + ERR("Wrong size received on apps register"); + } + free(ust_cmd); + close(sock); + continue; + } - DBG("UST registration received with pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - /* - * Lock free enqueue the registration request. - * The red pill has been taken! This apps will be part of the *system* - */ - cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node); + ust_cmd->sock = sock; - /* - * Wake the registration queue futex. - * Implicit memory barrier with the exchange in cds_wfq_enqueue. - */ - futex_nto1_wake(&ust_cmd_queue.futex); - break; + DBG("UST registration received with pid:%d ppid:%d uid:%d" + " gid:%d sock:%d name:%s (version %d.%d)", + ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, + ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, + ust_cmd->sock, ust_cmd->reg_msg.name, + ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); + /* + * Lock free enqueue the registration request. The red pill + * has been taken! This apps will be part of the *system* :) + */ + cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node); + + /* + * Wake the registration queue futex. Implicit memory + * barrier with the exchange in cds_wfq_enqueue. + */ + futex_nto1_wake(&ust_cmd_queue.futex); + } + } } } @@ -1211,9 +1224,10 @@ error: close(apps_sock); close(sock); - unlink(apps_unix_sock_path); + lttng_poll_clean(&events); + return NULL; } @@ -1526,9 +1540,9 @@ error: } /* - * Notify kernel thread to update it's pollfd. + * Notify kernel thread to update it's poll set. */ -static int notify_kernel_pollfd(void) +static int notify_kernel_channels_update(void) { int ret; @@ -1999,11 +2013,13 @@ static int process_client_msg(struct command_ctx *cmd_ctx) switch (cmd_ctx->lsm->domain.type) { case LTTNG_DOMAIN_KERNEL: - kchan = trace_kernel_get_channel_by_name(cmd_ctx->lsm->u.enable.channel_name, + kchan = trace_kernel_get_channel_by_name( + cmd_ctx->lsm->u.enable.channel_name, cmd_ctx->session->kernel_session); if (kchan == NULL) { /* Channel not found, creating it */ - DBG("Creating kernel channel"); + DBG("Creating kernel channel %s", + cmd_ctx->lsm->u.enable.channel_name); ret = kernel_create_channel(cmd_ctx->session->kernel_session, &cmd_ctx->lsm->u.channel.chan, @@ -2014,7 +2030,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) } /* Notify kernel thread that there is a new channel */ - ret = notify_kernel_pollfd(); + ret = notify_kernel_channels_update(); if (ret < 0) { ret = LTTCOMM_FATAL; goto error; @@ -2084,6 +2100,12 @@ static int process_client_msg(struct command_ctx *cmd_ctx) ret = LTTCOMM_FATAL; goto error; } + + ret = notify_kernel_channels_update(); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } } kevent = trace_kernel_get_event_by_name(cmd_ctx->lsm->u.enable.event.name, kchan); @@ -2162,6 +2184,12 @@ static int process_client_msg(struct command_ctx *cmd_ctx) ret = LTTCOMM_FATAL; goto error; } + + ret = notify_kernel_channels_update(); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } } /* For each event in the kernel session */ @@ -2398,10 +2426,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx) } /* - * Must notify the kernel thread here to update it's pollfd in order to - * remove the channel(s)' fd just destroyed. + * Must notify the kernel thread here to update it's poll setin order + * to remove the channel(s)' fd just destroyed. */ - ret = notify_kernel_pollfd(); + ret = notify_kernel_channels_update(); if (ret < 0) { ret = LTTCOMM_FATAL; goto error; @@ -2623,9 +2651,10 @@ setup_error: */ static void *thread_manage_clients(void *data) { - int sock = 0, ret; + int sock = 0, ret, i, pollfd; + uint32_t revents, nb_fd; struct command_ctx *cmd_ctx = NULL; - struct pollfd pollfd[2]; + struct lttng_poll_event events; DBG("[thread] Manage client started"); @@ -2634,14 +2663,20 @@ static void *thread_manage_clients(void *data) goto error; } - /* First fd is always the quit pipe */ - pollfd[0].fd = thread_quit_pipe[0]; - pollfd[0].events = - POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI; + /* + * Pass 2 as size here for the thread quit pipe and client_sock. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } - /* Apps socket */ - pollfd[1].fd = client_sock; - pollfd[1].events = POLLIN; + /* Add the application registration socket */ + ret = lttng_poll_add(&events, client_sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } /* * Notify parent pid that we are ready to accept command for client side. @@ -2653,92 +2688,114 @@ static void *thread_manage_clients(void *data) while (1) { DBG("Accepting client command ..."); + nb_fd = LTTNG_POLL_GETNB(&events); + /* Inifinite blocking call, waiting for transmission */ - ret = poll(pollfd, 2, -1); + ret = lttng_poll_wait(&events, -1); if (ret < 0) { - perror("poll client thread"); goto error; } - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd[0].revents == POLLNVAL) { + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Event on the registration socket */ + if (pollfd == client_sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Client socket poll error"); + goto error; + } + } + } + + DBG("Wait for client response"); + + sock = lttcomm_accept_unix_sock(client_sock); + if (sock < 0) { goto error; } - switch (pollfd[1].revents) { - case POLLNVAL: - case POLLHUP: - case POLLERR: - ERR("Client socket poll error"); + /* Allocate context command to process the client request */ + cmd_ctx = malloc(sizeof(struct command_ctx)); + if (cmd_ctx == NULL) { + perror("malloc cmd_ctx"); goto error; - case POLLIN: - sock = lttcomm_accept_unix_sock(client_sock); - if (sock < 0) { - goto error; - } + } - /* Allocate context command to process the client request */ - cmd_ctx = malloc(sizeof(struct command_ctx)); + /* Allocate data buffer for reception */ + cmd_ctx->lsm = malloc(sizeof(struct lttcomm_session_msg)); + if (cmd_ctx->lsm == NULL) { + perror("malloc cmd_ctx->lsm"); + goto error; + } - /* Allocate data buffer for reception */ - cmd_ctx->lsm = malloc(sizeof(struct lttcomm_session_msg)); - cmd_ctx->llm = NULL; - cmd_ctx->session = NULL; + cmd_ctx->llm = NULL; + cmd_ctx->session = NULL; - /* - * Data is received from the lttng client. The struct - * lttcomm_session_msg (lsm) contains the command and data request of - * the client. - */ - DBG("Receiving data from client ..."); - ret = lttcomm_recv_unix_sock(sock, cmd_ctx->lsm, sizeof(struct lttcomm_session_msg)); - if (ret <= 0) { - continue; - } + /* + * Data is received from the lttng client. The struct + * lttcomm_session_msg (lsm) contains the command and data request of + * the client. + */ + DBG("Receiving data from client ..."); + ret = lttcomm_recv_unix_sock(sock, cmd_ctx->lsm, + sizeof(struct lttcomm_session_msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + close(sock); + free(cmd_ctx); + continue; + } - // TODO: Validate cmd_ctx including sanity check for security purpose. + // TODO: Validate cmd_ctx including sanity check for + // security purpose. + /* + * This function dispatch the work to the kernel or userspace tracer + * libs and fill the lttcomm_lttng_msg data structure of all the needed + * informations for the client. The command context struct contains + * everything this function may needs. + */ + ret = process_client_msg(cmd_ctx); + if (ret < 0) { /* - * This function dispatch the work to the kernel or userspace tracer - * libs and fill the lttcomm_lttng_msg data structure of all the needed - * informations for the client. The command context struct contains - * everything this function may needs. + * TODO: Inform client somehow of the fatal error. At + * this point, ret < 0 means that a malloc failed + * (ENOMEM). Error detected but still accept command. */ - ret = process_client_msg(cmd_ctx); - if (ret < 0) { - /* TODO: Inform client somehow of the fatal error. At this point, - * ret < 0 means that a malloc failed (ENOMEM). */ - /* Error detected but still accept command */ - clean_command_ctx(&cmd_ctx); - continue; - } - - DBG("Sending response (size: %d, retcode: %d)", - cmd_ctx->lttng_msg_size, cmd_ctx->llm->ret_code); - ret = send_unix_sock(sock, cmd_ctx->llm, cmd_ctx->lttng_msg_size); - if (ret < 0) { - ERR("Failed to send data back to client"); - } - clean_command_ctx(&cmd_ctx); + continue; + } - /* End of transmission */ - close(sock); - break; + DBG("Sending response (size: %d, retcode: %d)", + cmd_ctx->lttng_msg_size, cmd_ctx->llm->ret_code); + ret = send_unix_sock(sock, cmd_ctx->llm, + cmd_ctx->lttng_msg_size); + if (ret < 0) { + ERR("Failed to send data back to client"); } - } -error: - DBG("Client thread dying"); - if (client_sock) { - close(client_sock); - } - if (sock) { + clean_command_ctx(&cmd_ctx); + + /* End of transmission */ close(sock); } +error: + DBG("Client thread dying"); unlink(client_unix_sock_path); + close(client_sock); + close(sock); + lttng_poll_clean(&events); clean_command_ctx(&cmd_ctx); return NULL; } @@ -3283,6 +3340,9 @@ int main(int argc, char **argv) */ session_list_ptr = get_session_list(); + /* Set up max poll set size */ + lttng_poll_set_max_size(); + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL);