X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=liblttkconsumerd%2Fliblttkconsumerd.c;h=c1bad734dd4bbe1d616d777fb6c9e773e3885783;hb=242cd1874f3d8f678e795eea4d35552eff683ed8;hp=742fddebe47304ab9def8c339f94c76b11e4d4e9;hpb=1ce86c9ab52be743e3348fd0c90153d9a9f63c49;p=lttng-tools.git diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c index 742fddebe..c1bad734d 100644 --- a/liblttkconsumerd/liblttkconsumerd.c +++ b/liblttkconsumerd/liblttkconsumerd.c @@ -33,20 +33,36 @@ #include "liblttkconsumerd.h" #include "lttngerr.h" -/* Init the list of FDs */ -static struct kconsumerd_fd_list kconsumerd_fd_list = { - .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head), +static +struct kconsumerd_global_data { + /* + * kconsumerd_data.lock protects kconsumerd_data.fd_list, + * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It + * ensures the count matches the number of items in the fd_list. + * It ensures the list updates *always* trigger an fd_array + * update (therefore need to make list update vs + * kconsumerd_data.need_update flag update atomic, and also flag + * read, fd array and flag clear atomic). + */ + pthread_mutex_t lock; + /* + * Number of element for the list below. Protected by + * kconsumerd_data.lock. + */ + unsigned int fds_count; + /* + * List of FDs. Protected by kconsumerd_data.lock. + */ + struct kconsumerd_fd_list fd_list; + /* + * Flag specifying if the local array of FDs needs update in the + * poll function. Protected by kconsumerd_data.lock. + */ + unsigned int need_update; +} kconsumerd_data = { + .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), }; -/* Number of element for the list below. */ -static unsigned int kconsumerd_fds_count; - -/* If the local array of FDs needs update in the poll function */ -static unsigned int kconsumerd_update_fd_array = 1; - -/* lock the fd array and structures */ -static pthread_mutex_t kconsumerd_lock_fds; - /* communication with splice */ static int kconsumerd_thread_pipe[2]; @@ -85,6 +101,30 @@ void kconsumerd_set_command_socket_path(char *sock) kconsumerd_command_sock_path = sock; } +/* + * kconsumerd_find_session_fd + * + * Find a session fd in the global list. + * + * Return 1 if found else 0 + */ +static int kconsumerd_find_session_fd(int fd) +{ + struct kconsumerd_fd *iter; + + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + if (iter->sessiond_fd == fd) { + DBG("Duplicate session fd %d", fd); + pthread_mutex_unlock(&kconsumerd_data.lock); + return 1; + } + } + pthread_mutex_unlock(&kconsumerd_data.lock); + + return 0; +} + /* * kconsumerd_del_fd * @@ -92,10 +132,10 @@ void kconsumerd_set_command_socket_path(char *sock) */ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) { - pthread_mutex_lock(&kconsumerd_lock_fds); + pthread_mutex_lock(&kconsumerd_data.lock); cds_list_del(&lcf->list); - if (kconsumerd_fds_count > 0) { - kconsumerd_fds_count--; + if (kconsumerd_data.fds_count > 0) { + kconsumerd_data.fds_count--; if (lcf != NULL) { close(lcf->out_fd); close(lcf->consumerd_fd); @@ -103,7 +143,8 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) lcf = NULL; } } - pthread_mutex_unlock(&kconsumerd_lock_fds); + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); } /* @@ -113,8 +154,15 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) */ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) { - struct kconsumerd_fd *tmp_fd; int ret; + struct kconsumerd_fd *tmp_fd; + + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(buf->fd); + if (ret == 1) { + goto end; + } tmp_fd = malloc(sizeof(struct kconsumerd_fd)); tmp_fd->sessiond_fd = buf->fd; @@ -137,12 +185,11 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head); - kconsumerd_fds_count++; - pthread_mutex_unlock(&kconsumerd_lock_fds); - + cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); + kconsumerd_data.fds_count++; + kconsumerd_data.need_update = 1; end: + pthread_mutex_unlock(&kconsumerd_data.lock); return ret; } @@ -155,12 +202,16 @@ static void kconsumerd_change_fd_state(int sessiond_fd, enum kconsumerd_fd_state state) { struct kconsumerd_fd *iter; - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { if (iter->sessiond_fd == sessiond_fd) { iter->state = state; break; } } + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); } /* @@ -170,6 +221,7 @@ static void kconsumerd_change_fd_state(int sessiond_fd, * to avoid doing a lookup in the linked list and concurrency issues * when writing is needed. * Returns the number of fds in the structures + * Called with kconsumerd_data.lock held. */ static int kconsumerd_update_poll_array(struct pollfd **pollfd, struct kconsumerd_fd **local_kconsumerd_fd) @@ -178,9 +230,8 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, int i = 0; DBG("Updating poll fd array"); - pthread_mutex_lock(&kconsumerd_lock_fds); - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { DBG("Inside for each"); if (iter->state == ACTIVE_FD) { DBG("Active FD %d", iter->consumerd_fd); @@ -197,9 +248,6 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, */ (*pollfd)[i].fd = kconsumerd_poll_pipe[0]; (*pollfd)[i].events = POLLIN; - - kconsumerd_update_fd_array = 0; - pthread_mutex_unlock(&kconsumerd_lock_fds); return i; } @@ -563,8 +611,6 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, default: break; } - /* flag to tell the polling thread to update its fd array */ - kconsumerd_update_fd_array = 1; /* signal the poll thread */ tmp2 = write(kconsumerd_poll_pipe[1], "4", 1); } else { @@ -576,7 +622,6 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, } end: - DBG("kconsumerd_consumerd_recv_fd thread exiting"); return ret; } @@ -592,7 +637,7 @@ void *kconsumerd_thread_poll_fds(void *data) struct pollfd *pollfd = NULL; /* local view of the fds */ struct kconsumerd_fd **local_kconsumerd_fd = NULL; - /* local view of kconsumerd_fds_count */ + /* local view of kconsumerd_data.fds_count */ int nb_fd = 0; char tmp; int tmp2; @@ -613,7 +658,8 @@ void *kconsumerd_thread_poll_fds(void *data) * the ltt_fd_list has been updated, we need to update our * local array as well */ - if (kconsumerd_update_fd_array == 1) { + pthread_mutex_lock(&kconsumerd_data.lock); + if (kconsumerd_data.need_update) { if (pollfd != NULL) { free(pollfd); pollfd = NULL; @@ -622,27 +668,34 @@ void *kconsumerd_thread_poll_fds(void *data) free(local_kconsumerd_fd); local_kconsumerd_fd = NULL; } + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd)); + pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); goto end; } + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) * + local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct kconsumerd_fd)); if (local_kconsumerd_fd == NULL) { perror("local_kconsumerd_fd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); goto end; } ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); kconsumerd_send_error(KCONSUMERD_POLL_ERROR); + pthread_mutex_unlock(&kconsumerd_data.lock); goto end; } nb_fd = ret; + kconsumerd_data.need_update = 0; } + pthread_mutex_unlock(&kconsumerd_data.lock); /* poll on the array of fds */ DBG("polling on %d fd", nb_fd + 1); @@ -663,10 +716,12 @@ void *kconsumerd_thread_poll_fds(void *data) } /* - * if only the kconsumerd_poll_pipe triggered poll to return just - * return to the beginning of the loop to update the array + * If the kconsumerd_poll_pipe triggered poll go + * directly to the beginning of the loop to update the + * array. We want to prioritize array update over + * low-priority reads. */ - if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) { + if (pollfd[nb_fd].revents == POLLIN) { DBG("kconsumerd_poll_pipe wake up"); tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1); continue; @@ -678,19 +733,16 @@ void *kconsumerd_thread_poll_fds(void *data) case POLLERR: ERR("Error returned in polling fd %d.", pollfd[i].fd); kconsumerd_del_fd(local_kconsumerd_fd[i]); - kconsumerd_update_fd_array = 1; num_hup++; break; case POLLHUP: DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); kconsumerd_del_fd(local_kconsumerd_fd[i]); - kconsumerd_update_fd_array = 1; num_hup++; break; case POLLNVAL: ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); kconsumerd_del_fd(local_kconsumerd_fd[i]); - kconsumerd_update_fd_array = 1; num_hup++; break; case POLLPRI: @@ -846,7 +898,7 @@ void kconsumerd_cleanup() unlink(kconsumerd_command_sock_path); /* close all outfd */ - cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { kconsumerd_del_fd(iter); } }