This patch fixes the (normal) case where we handle more than one reading
fd. Previous versions were only tested with one FD, as of now we can
consume multiple fd (metadata and data for example).
Signed-off-by: Julien Desfossez <julien.desfossez@polymtl.ca>
*/
static void del_fd(struct ltt_kconsumerd_fd *lcf)
{
*/
static void del_fd(struct ltt_kconsumerd_fd *lcf)
{
+ DBG("Removing %d", lcf->consumerd_fd);
pthread_mutex_lock(&kconsumerd_lock_fds);
cds_list_del(&lcf->list);
if (fds_count > 0) {
pthread_mutex_lock(&kconsumerd_lock_fds);
cds_list_del(&lcf->list);
if (fds_count > 0) {
{
struct ltt_kconsumerd_fd *iter;
{
struct ltt_kconsumerd_fd *iter;
/* remove the socket file */
unlink(command_sock_path);
/* remove the socket file */
unlink(command_sock_path);
*
* send return code to ltt-sessiond
*/
*
* send return code to ltt-sessiond
*/
long ret = 0;
int infd = kconsumerd_fd->consumerd_fd;
long ret = 0;
int infd = kconsumerd_fd->consumerd_fd;
- DBG("In read_subbuffer");
+ DBG("In read_subbuffer (infd : %d)", infd);
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
goto error;
}
while (1) {
goto error;
}
while (1) {
/* We first get the number of fd we are about to receive */
ret = lttcomm_recv_unix_sock(sock, &tmp,
sizeof(struct lttcomm_kconsumerd_header));
/* We first get the number of fd we are about to receive */
ret = lttcomm_recv_unix_sock(sock, &tmp,
sizeof(struct lttcomm_kconsumerd_header));
ERR("Receiving the lttcomm_kconsumerd_header, exiting");
goto error;
}
ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
ERR("Receiving the lttcomm_kconsumerd_header, exiting");
goto error;
}
ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
ERR("Receiving the FD, exiting");
goto error;
}
}
error:
ERR("Receiving the FD, exiting");
goto error;
}
}
error:
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
/* local view of the fds */
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
/* local view of the fds */
- struct ltt_kconsumerd_fd *local_kconsumerd_fd = NULL;
+ struct ltt_kconsumerd_fd **local_kconsumerd_fd = NULL;
/* local view of fds_count */
int nb_fd = 0;
/* local view of fds_count */
int nb_fd = 0;
+ local_kconsumerd_fd = malloc(sizeof(struct ltt_kconsumerd_fd));
+
while (1) {
high_prio = 0;
num_hup = 0;
while (1) {
high_prio = 0;
num_hup = 0;
* local array as well
*/
if (update_fd_array) {
* local array as well
*/
if (update_fd_array) {
- ret = update_poll_array(&pollfd, &local_kconsumerd_fd);
+ ret = update_poll_array(&pollfd, local_kconsumerd_fd);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
send_error(KCONSUMERD_POLL_ERROR);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
send_error(KCONSUMERD_POLL_ERROR);
case POLLPRI:
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
case POLLPRI:
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
- ret = read_subbuffer(&local_kconsumerd_fd[i]);
+ ret = read_subbuffer(local_kconsumerd_fd[i]);
/* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
if (ret == EAGAIN) {
ret = 0;
/* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
if (ret == EAGAIN) {
ret = 0;
if (nb_fd > 0 && num_hup == nb_fd) {
DBG("every buffer FD has hung up\n");
send_error(KCONSUMERD_POLL_HUP);
if (nb_fd > 0 && num_hup == nb_fd) {
DBG("every buffer FD has hung up\n");
send_error(KCONSUMERD_POLL_HUP);
}
/* Take care of low priority channels. */
}
/* Take care of low priority channels. */
switch(pollfd[i].revents) {
case POLLIN:
DBG("Normal read on fd %d", pollfd[i].fd);
switch(pollfd[i].revents) {
case POLLIN:
DBG("Normal read on fd %d", pollfd[i].fd);
- ret = read_subbuffer(&local_kconsumerd_fd[i]);
+ ret = read_subbuffer(local_kconsumerd_fd[i]);
/* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
if (ret == EAGAIN) {
ret = 0;
/* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
if (ret == EAGAIN) {
ret = 0;