X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=liblttkconsumerd%2Fliblttkconsumerd.c;h=ffd2faf2eddbec9e9795cbac66441fd121e47d0f;hp=742fddebe47304ab9def8c339f94c76b11e4d4e9;hb=0237248ca64ab78f9d57d3705c35b254644c2cf3;hpb=1ce86c9ab52be743e3348fd0c90153d9a9f63c49;ds=sidebyside diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c index 742fddebe..ffd2faf2e 100644 --- a/liblttkconsumerd/liblttkconsumerd.c +++ b/liblttkconsumerd/liblttkconsumerd.c @@ -85,6 +85,30 @@ void kconsumerd_set_command_socket_path(char *sock) kconsumerd_command_sock_path = sock; } +/* + * kconsumerd_find_session_fd + * + * Find a session fd in the global list. + * + * Return 1 if found else 0 + */ +static int kconsumerd_find_session_fd(int fd) +{ + struct kconsumerd_fd *iter; + + pthread_mutex_lock(&kconsumerd_lock_fds); + cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { + if (iter->sessiond_fd == fd) { + DBG("Duplicate session fd %d", fd); + pthread_mutex_unlock(&kconsumerd_lock_fds); + return 1; + } + } + pthread_mutex_unlock(&kconsumerd_lock_fds); + + return 0; +} + /* * kconsumerd_del_fd * @@ -113,8 +137,14 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) */ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) { - struct kconsumerd_fd *tmp_fd; int ret; + struct kconsumerd_fd *tmp_fd; + + /* Check if already exist */ + ret = kconsumerd_find_session_fd(buf->fd); + if (ret == 1) { + goto end; + } tmp_fd = malloc(sizeof(struct kconsumerd_fd)); tmp_fd->sessiond_fd = buf->fd; @@ -155,12 +185,15 @@ static void kconsumerd_change_fd_state(int sessiond_fd, enum kconsumerd_fd_state state) { struct kconsumerd_fd *iter; + + pthread_mutex_lock(&kconsumerd_lock_fds); cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { if (iter->sessiond_fd == sessiond_fd) { iter->state = state; break; } } + pthread_mutex_unlock(&kconsumerd_lock_fds); } /* @@ -178,7 +211,6 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, int i = 0; DBG("Updating poll fd array"); - pthread_mutex_lock(&kconsumerd_lock_fds); cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) { DBG("Inside for each"); @@ -199,7 +231,6 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd, (*pollfd)[i].events = POLLIN; kconsumerd_update_fd_array = 0; - pthread_mutex_unlock(&kconsumerd_lock_fds); return i; } @@ -576,7 +607,6 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, } end: - DBG("kconsumerd_consumerd_recv_fd thread exiting"); return ret; } @@ -622,12 +652,16 @@ void *kconsumerd_thread_poll_fds(void *data) free(local_kconsumerd_fd); local_kconsumerd_fd = NULL; } + + /* Lock mutex for fds count */ + pthread_mutex_lock(&kconsumerd_lock_fds); /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { perror("pollfd malloc"); goto end; } + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) * sizeof(struct kconsumerd_fd)); @@ -641,6 +675,9 @@ void *kconsumerd_thread_poll_fds(void *data) kconsumerd_send_error(KCONSUMERD_POLL_ERROR); goto end; } + /* Unlock mutex for fds count */ + pthread_mutex_unlock(&kconsumerd_lock_fds); + nb_fd = ret; } @@ -729,6 +766,7 @@ void *kconsumerd_thread_poll_fds(void *data) } } end: + pthread_mutex_unlock(&kconsumerd_lock_fds); DBG("polling thread exiting"); if (pollfd != NULL) { free(pollfd);