X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttkconsumerd%2Fliblttkconsumerd.c;h=9ad380c8dab5e8c1be679a0881890e929f355c57;hp=ffd2faf2eddbec9e9795cbac66441fd121e47d0f;hb=ca3c5ac0cf100d80352a1a81936b5adc47942f35;hpb=0237248ca64ab78f9d57d3705c35b254644c2cf3 diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c index ffd2faf2e..9ad380c8d 100644 --- a/liblttkconsumerd/liblttkconsumerd.c +++ b/liblttkconsumerd/liblttkconsumerd.c @@ -4,8 +4,8 @@ * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. + * as published by the Free Software Foundation; only version 2 + * of the License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -28,31 +28,51 @@ #include #include #include +#include #include "libkernelctl.h" #include "liblttkconsumerd.h" #include "lttngerr.h" -/* Init the list of FDs */ -static struct kconsumerd_fd_list kconsumerd_fd_list = { - .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head), +static +struct kconsumerd_global_data { + /* + * kconsumerd_data.lock protects kconsumerd_data.fd_list, + * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It + * ensures the count matches the number of items in the fd_list. + * It ensures the list updates *always* trigger an fd_array + * update (therefore need to make list update vs + * kconsumerd_data.need_update flag update atomic, and also flag + * read, fd array and flag clear atomic). + */ + pthread_mutex_t lock; + /* + * Number of element for the list below. Protected by + * kconsumerd_data.lock. + */ + unsigned int fds_count; + /* + * List of FDs. Protected by kconsumerd_data.lock. + */ + struct kconsumerd_fd_list fd_list; + /* + * Flag specifying if the local array of FDs needs update in the + * poll function. Protected by kconsumerd_data.lock. + */ + unsigned int need_update; +} kconsumerd_data = { + .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), }; -/* Number of element for the list below. */ -static unsigned int kconsumerd_fds_count; - -/* If the local array of FDs needs update in the poll function */ -static unsigned int kconsumerd_update_fd_array = 1; - -/* lock the fd array and structures */ -static pthread_mutex_t kconsumerd_lock_fds; - /* communication with splice */ static int kconsumerd_thread_pipe[2]; /* pipe to wake the poll thread when necessary */ static int kconsumerd_poll_pipe[2]; +/* to let the signal handler wake up the fd receiver thread */ +static int kconsumerd_should_quit[2]; + /* timeout parameter, to control the polling thread grace period */ static int kconsumerd_poll_timeout = -1; @@ -62,8 +82,13 @@ static int kconsumerd_error_socket; /* socket to exchange commands with sessiond */ static char *kconsumerd_command_sock_path; -/* flag to inform the polling thread to kconsumerd_quit when all fd hung up */ -static int kconsumerd_quit = 0; +/* + * flag to inform the polling thread to quit when all fd hung up. + * Updated by the kconsumerd_thread_receive_fds when it notices that all + * fds has hung up. Also updated by the signal handler + * (kconsumerd_should_exit()). Read by the polling threads. + */ +static volatile int kconsumerd_quit = 0; /* * kconsumerd_set_error_socket @@ -89,6 +114,7 @@ void kconsumerd_set_command_socket_path(char *sock) * kconsumerd_find_session_fd * * Find a session fd in the global list. + * The kconsumerd_data.lock must be locked during this call * * Return 1 if found else 0 */ @@ -96,15 +122,12 @@ static int kconsumerd_find_session_fd(int fd) { struct kconsumerd_fd *iter; - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { if (iter->sessiond_fd == fd) { DBG("Duplicate session fd %d", fd); - pthread_mutex_unlock(&kconsumerd_lock_fds); return 1; } } - pthread_mutex_unlock(&kconsumerd_lock_fds); return 0; } @@ -116,10 +139,10 @@ static int kconsumerd_find_session_fd(int fd) */ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) { - pthread_mutex_lock(&kconsumerd_lock_fds); + pthread_mutex_lock(&kconsumerd_data.lock); cds_list_del(&lcf->list); - if (kconsumerd_fds_count > 0) { - kconsumerd_fds_count--; + if (kconsumerd_data.fds_count > 0) { + kconsumerd_data.fds_count--; if (lcf != NULL) { close(lcf->out_fd); close(lcf->consumerd_fd); @@ -127,7 +150,8 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) lcf = NULL; } } - pthread_mutex_unlock(&kconsumerd_lock_fds); + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); } /* @@ -140,6 +164,7 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f int ret; struct kconsumerd_fd *tmp_fd; + pthread_mutex_lock(&kconsumerd_data.lock); /* Check if already exist */ ret = kconsumerd_find_session_fd(buf->fd); if (ret == 1) { @@ -152,6 +177,7 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f tmp_fd->state = buf->state; tmp_fd->max_sb_size = buf->max_sb_size; strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); + tmp_fd->path_name[PATH_MAX - 1] = '\0'; /* Opening the tracefile in write mode */ ret = open(tmp_fd->path_name, @@ -167,12 +193,11 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head); - kconsumerd_fds_count++; - pthread_mutex_unlock(&kconsumerd_lock_fds); - + cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); + kconsumerd_data.fds_count++; + kconsumerd_data.need_update = 1; end: + pthread_mutex_unlock(&kconsumerd_data.lock); return ret; } @@ -186,14 +211,15 @@ static void kconsumerd_change_fd_state(int sessiond_fd, { struct kconsumerd_fd *iter; - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { if (iter->sessiond_fd == sessiond_fd) { iter->state = state; break; } } - pthread_mutex_unlock(&kconsumerd_lock_fds); + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); } /* @@ -203,6 +229,7 @@ static void kconsumerd_change_fd_state(int sessiond_fd, * to avoid doing a lookup in the linked list and concurrency issues * when writing is needed. * Returns the number of fds in the structures + * Called with kconsumerd_data.lock held. */ static int kconsumerd_update_poll_array(struct pollfd **pollfd, struct kconsumerd_fd **local_kconsumerd_fd) @@ -212,7 +239,7 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, DBG("Updating poll fd array"); - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { DBG("Inside for each"); if (iter->state == ACTIVE_FD) { DBG("Active FD %d", iter->consumerd_fd); @@ -229,8 +256,6 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, */ (*pollfd)[i].fd = kconsumerd_poll_pipe[0]; (*pollfd)[i].events = POLLIN; - - kconsumerd_update_fd_array = 0; return i; } @@ -467,7 +492,7 @@ static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) } switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) { - case LTTNG_KERNEL_SPLICE: + case LTTNG_EVENT_SPLICE: /* read the whole subbuffer */ err = kernctl_get_padded_subbuf_size(infd, &len); if (err != 0) { @@ -486,7 +511,7 @@ static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) ERR("Error splicing to tracefile"); } break; - case LTTNG_KERNEL_MMAP: + case LTTNG_EVENT_MMAP: /* read the used subbuffer size */ err = kernctl_get_subbuf_size(infd, &len); if (err != 0) { @@ -525,6 +550,32 @@ end: return ret; } +/* + * kconsumerd_poll_socket + * + * Poll on the should_quit pipe and the command socket + * return -1 on error and should exit, 0 if data is + * available on the command socket + */ +int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) +{ + int num_rdy; + + num_rdy = poll(kconsumerd_sockpoll, 2, -1); + if (num_rdy == -1) { + perror("Poll error"); + goto exit; + } + if (kconsumerd_sockpoll[0].revents == POLLIN) { + DBG("kconsumerd_should_quit wake up"); + goto exit; + } + return 0; + +exit: + return -1; +} + /* * kconsumerd_consumerd_recv_fd * @@ -532,10 +583,10 @@ end: * structures describing each fd (path name). * Returns the size of received data */ -static int kconsumerd_consumerd_recv_fd(int sfd, int size, +static int kconsumerd_consumerd_recv_fd(int sfd, + struct pollfd *kconsumerd_sockpoll, int size, enum kconsumerd_command cmd_type) { - struct msghdr msg; struct iovec iov[1]; int ret = 0, i, tmp2; struct cmsghdr *cmsg; @@ -546,8 +597,11 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, /* the number of fds we are about to receive */ nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); + /* + * nb_fd is the number of fds we receive. One fd per recvmsg. + */ for (i = 0; i < nb_fd; i++) { - memset(&msg, 0, sizeof(msg)); + struct msghdr msg = { 0 }; /* Prepare to receive the structures */ iov[0].iov_base = &lkm; @@ -559,6 +613,10 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, msg.msg_controllen = sizeof(recv_fd); DBG("Waiting to receive fd"); + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + if ((ret = recvmsg(sfd, &msg, 0)) < 0) { perror("recvmsg"); continue; @@ -577,12 +635,13 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); goto end; } + /* if we received fds */ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { switch (cmd_type) { case ADD_STREAM: - DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0])); - ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0])); + DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); + ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); if (ret < 0) { kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR); goto end; @@ -594,10 +653,11 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, default: break; } - /* flag to tell the polling thread to update its fd array */ - kconsumerd_update_fd_array = 1; /* signal the poll thread */ tmp2 = write(kconsumerd_poll_pipe[1], "4", 1); + if (tmp2 < 0) { + perror("write kconsumerd poll"); + } } else { ERR("Didn't received any fd"); kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD); @@ -622,7 +682,7 @@ void *kconsumerd_thread_poll_fds(void *data) struct pollfd *pollfd = NULL; /* local view of the fds */ struct kconsumerd_fd **local_kconsumerd_fd = NULL; - /* local view of kconsumerd_fds_count */ + /* local view of kconsumerd_data.fds_count */ int nb_fd = 0; char tmp; int tmp2; @@ -643,7 +703,8 @@ void *kconsumerd_thread_poll_fds(void *data) * the ltt_fd_list has been updated, we need to update our * local array as well */ - if (kconsumerd_update_fd_array == 1) { + pthread_mutex_lock(&kconsumerd_data.lock); + if (kconsumerd_data.need_update) { if (pollfd != NULL) { free(pollfd); pollfd = NULL; @@ -653,33 +714,33 @@ void *kconsumerd_thread_poll_fds(void *data) local_kconsumerd_fd = NULL; } - /* Lock mutex for fds count */ - pthread_mutex_lock(&kconsumerd_lock_fds); /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd)); + pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); goto end; } /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) * + local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct kconsumerd_fd)); if (local_kconsumerd_fd == NULL) { perror("local_kconsumerd_fd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); goto end; } ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); kconsumerd_send_error(KCONSUMERD_POLL_ERROR); + pthread_mutex_unlock(&kconsumerd_data.lock); goto end; } - /* Unlock mutex for fds count */ - pthread_mutex_unlock(&kconsumerd_lock_fds); - nb_fd = ret; + kconsumerd_data.need_update = 0; } + pthread_mutex_unlock(&kconsumerd_data.lock); /* poll on the array of fds */ DBG("polling on %d fd", nb_fd + 1); @@ -700,12 +761,17 @@ void *kconsumerd_thread_poll_fds(void *data) } /* - * if only the kconsumerd_poll_pipe triggered poll to return just - * return to the beginning of the loop to update the array + * If the kconsumerd_poll_pipe triggered poll go + * directly to the beginning of the loop to update the + * array. We want to prioritize array update over + * low-priority reads. */ - if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) { + if (pollfd[nb_fd].revents == POLLIN) { DBG("kconsumerd_poll_pipe wake up"); tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1); + if (tmp2 < 0) { + perror("read kconsumerd poll"); + } continue; } @@ -715,19 +781,16 @@ void *kconsumerd_thread_poll_fds(void *data) case POLLERR: ERR("Error returned in polling fd %d.", pollfd[i].fd); kconsumerd_del_fd(local_kconsumerd_fd[i]); - kconsumerd_update_fd_array = 1; num_hup++; break; case POLLHUP: DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); kconsumerd_del_fd(local_kconsumerd_fd[i]); - kconsumerd_update_fd_array = 1; num_hup++; break; case POLLNVAL: ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); kconsumerd_del_fd(local_kconsumerd_fd[i]); - kconsumerd_update_fd_array = 1; num_hup++; break; case POLLPRI: @@ -766,7 +829,6 @@ void *kconsumerd_thread_poll_fds(void *data) } } end: - pthread_mutex_unlock(&kconsumerd_lock_fds); DBG("polling thread exiting"); if (pollfd != NULL) { free(pollfd); @@ -776,18 +838,38 @@ end: free(local_kconsumerd_fd); local_kconsumerd_fd = NULL; } - kconsumerd_cleanup(); return NULL; } /* - * kconsumerd_create_poll_pipe + * kconsumerd_init(void) * - * create the pipe to wake to polling thread when needed + * initialise the necessary environnement : + * - inform the polling thread to update the polling array + * - create the poll_pipe + * - create the should_quit pipe (for signal handler) */ -int kconsumerd_create_poll_pipe() +int kconsumerd_init(void) { - return pipe(kconsumerd_poll_pipe); + int ret; + + /* need to update the polling array at init time */ + kconsumerd_data.need_update = 1; + + ret = pipe(kconsumerd_poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + goto end; + } + + ret = pipe(kconsumerd_should_quit); + if (ret < 0) { + perror("Error creating recv pipe"); + goto end; + } + +end: + return ret; } /* @@ -800,6 +882,12 @@ void *kconsumerd_thread_receive_fds(void *data) { int sock, client_socket, ret; struct lttcomm_kconsumerd_header tmp; + /* + * structure to poll for incoming data on communication socket + * avoids making blocking sockets + */ + struct pollfd kconsumerd_sockpoll[2]; + DBG("Creating command socket %s", kconsumerd_command_sock_path); unlink(kconsumerd_command_sock_path); @@ -821,13 +909,45 @@ void *kconsumerd_thread_receive_fds(void *data) goto end; } + ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* prepare the FDs to poll : to client socket and the should_quit pipe */ + kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0]; + kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; + kconsumerd_sockpoll[1].fd = client_socket; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Connection on client_socket"); + /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); if (sock <= 0) { WARN("On accept"); goto end; } + ret = fcntl(sock, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* update the polling structure to poll on the established socket */ + kconsumerd_sockpoll[1].fd = sock; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + while (1) { + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Incoming fds on sock"); + /* We first get the number of fd we are about to receive */ ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); @@ -839,12 +959,19 @@ void *kconsumerd_thread_receive_fds(void *data) DBG("Received STOP command"); goto end; } + if (kconsumerd_quit) { + DBG("kconsumerd_thread_receive_fds received quit from signal"); + goto end; + } + /* we received a command to add or update fds */ - ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); + ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll, + tmp.payload_size, tmp.cmd_type); if (ret <= 0) { ERR("Receiving the FD, exiting"); goto end; } + DBG("received fds on sock"); } end: @@ -876,19 +1003,38 @@ end: * * Cleanup the daemon's socket on exit */ -void kconsumerd_cleanup() +void kconsumerd_cleanup(void) { - struct kconsumerd_fd *iter; + struct kconsumerd_fd *iter, *tmp; /* remove the socket file */ unlink(kconsumerd_command_sock_path); - /* close all outfd */ - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + /* + * close all outfd. Called when there are no more threads + * running (after joining on the threads), no need to protect + * list iteration with mutex. + */ + cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) { kconsumerd_del_fd(iter); } } +/* + * kconsumerd_should_exit + * + * Called from signal handler. + */ +void kconsumerd_should_exit(void) +{ + int ret; + kconsumerd_quit = 1; + ret = write(kconsumerd_should_quit[1], "4", 1); + if (ret < 0) { + perror("write kconsumerd quit"); + } +} + /* * kconsumerd_send_error *