kconsumerd_command_sock_path = sock;
}
+/*
+ * kconsumerd_find_session_fd
+ *
+ * Find a session fd in the global list.
+ *
+ * Return 1 if found else 0
+ */
+static int kconsumerd_find_session_fd(int fd)
+{
+ struct kconsumerd_fd *iter;
+
+ pthread_mutex_lock(&kconsumerd_lock_fds);
+ cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+ if (iter->sessiond_fd == fd) {
+ DBG("Duplicate session fd %d", fd);
+ pthread_mutex_unlock(&kconsumerd_lock_fds);
+ return 1;
+ }
+ }
+ pthread_mutex_unlock(&kconsumerd_lock_fds);
+
+ return 0;
+}
+
/*
* kconsumerd_del_fd
*
*/
static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
{
- struct kconsumerd_fd *tmp_fd;
int ret;
+ struct kconsumerd_fd *tmp_fd;
+
+ /* Check if already exist */
+ ret = kconsumerd_find_session_fd(buf->fd);
+ if (ret == 1) {
+ goto end;
+ }
tmp_fd = malloc(sizeof(struct kconsumerd_fd));
tmp_fd->sessiond_fd = buf->fd;
enum kconsumerd_fd_state state)
{
struct kconsumerd_fd *iter;
+
+ pthread_mutex_lock(&kconsumerd_lock_fds);
cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
if (iter->sessiond_fd == sessiond_fd) {
iter->state = state;
break;
}
}
+ pthread_mutex_unlock(&kconsumerd_lock_fds);
}
/*
int i = 0;
DBG("Updating poll fd array");
- pthread_mutex_lock(&kconsumerd_lock_fds);
cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
DBG("Inside for each");
(*pollfd)[i].events = POLLIN;
kconsumerd_update_fd_array = 0;
- pthread_mutex_unlock(&kconsumerd_lock_fds);
return i;
}
}
end:
- DBG("kconsumerd_consumerd_recv_fd thread exiting");
return ret;
}
free(local_kconsumerd_fd);
local_kconsumerd_fd = NULL;
}
+
+ /* Lock mutex for fds count */
+ pthread_mutex_lock(&kconsumerd_lock_fds);
/* allocate for all fds + 1 for the kconsumerd_poll_pipe */
pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
perror("pollfd malloc");
goto end;
}
+
/* allocate for all fds + 1 for the kconsumerd_poll_pipe */
local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) *
sizeof(struct kconsumerd_fd));
kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
goto end;
}
+ /* Unlock mutex for fds count */
+ pthread_mutex_unlock(&kconsumerd_lock_fds);
+
nb_fd = ret;
}
}
}
end:
+ pthread_mutex_unlock(&kconsumerd_lock_fds);
DBG("polling thread exiting");
if (pollfd != NULL) {
free(pollfd);