We now wait for the STOP command to exit the polling thread.
When we receive this command and all FD has hung up (all data is
consumed) then we can exit cleanly.
We also close every fd as soon as they report an error.
Signed-off-by: Julien Desfossez <julien.desfossez@polymtl.ca>
/* Commands for kconsumerd */
enum kconsumerd_command {
ADD_STREAM,
/* Commands for kconsumerd */
enum kconsumerd_command {
ADD_STREAM,
- UPDATE_STREAM, /* pause, delete, start depending on fd state */
- STOP, /* delete all */
+ UPDATE_STREAM, /* pause, delete, active depending on fd state */
+ STOP, /* inform the kconsumerd to quit when all fd has hang up */
};
/* State of each fd in consumerd */
};
/* State of each fd in consumerd */
/* to count the number of time the user pressed ctrl+c */
static int sigintcount = 0;
/* to count the number of time the user pressed ctrl+c */
static int sigintcount = 0;
+/* flag to inform the polling thread to quit when all fd hung up */
+static int quit = 0;
+
/* Argument variables */
int opt_quiet;
int opt_verbose;
/* Argument variables */
int opt_quiet;
int opt_verbose;
+ DBG("consumerd_recv_fd thread exiting");
if (buf != NULL) {
free(buf);
buf = NULL;
if (buf != NULL) {
free(buf);
buf = NULL;
client_socket = lttcomm_create_unix_sock(command_sock_path);
if (client_socket < 0) {
ERR("Cannot create command socket");
client_socket = lttcomm_create_unix_sock(command_sock_path);
if (client_socket < 0) {
ERR("Cannot create command socket");
}
ret = lttcomm_listen_unix_sock(client_socket);
if (ret < 0) {
}
ret = lttcomm_listen_unix_sock(client_socket);
if (ret < 0) {
}
DBG("Sending ready command to ltt-sessiond");
ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
if (ret < 0) {
ERR("Error sending ready command to ltt-sessiond");
}
DBG("Sending ready command to ltt-sessiond");
ret = send_error(KCONSUMERD_COMMAND_SOCK_READY);
if (ret < 0) {
ERR("Error sending ready command to ltt-sessiond");
}
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
if (sock <= 0) {
WARN("On accept");
}
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
if (sock <= 0) {
WARN("On accept");
}
while (1) {
/* We first get the number of fd we are about to receive */
ret = lttcomm_recv_unix_sock(sock, &tmp,
sizeof(struct lttcomm_kconsumerd_header));
if (ret <= 0) {
}
while (1) {
/* We first get the number of fd we are about to receive */
ret = lttcomm_recv_unix_sock(sock, &tmp,
sizeof(struct lttcomm_kconsumerd_header));
if (ret <= 0) {
- ERR("Receiving the lttcomm_kconsumerd_header, exiting");
- goto error;
+ ERR("Communication interrupted on command socket");
+ goto end;
+ if (tmp.cmd_type == STOP) {
+ DBG("Received STOP command");
+ quit = 1;
+ goto end;
+ }
+ /* we received a command to add or update fds */
ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
if (ret <= 0) {
ERR("Receiving the FD, exiting");
ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
if (ret <= 0) {
ERR("Receiving the FD, exiting");
+end:
+ DBG("thread_receive_fds exiting");
(*pollfd)[i].events = POLLIN | POLLPRI;
local_kconsumerd_fd[i] = iter;
i++;
(*pollfd)[i].events = POLLIN | POLLPRI;
local_kconsumerd_fd[i] = iter;
i++;
- } else if (iter->state == DELETE_FD) {
- del_fd(iter);
switch(pollfd[i].revents) {
case POLLERR:
ERR("Error returned in polling fd %d.", pollfd[i].fd);
switch(pollfd[i].revents) {
case POLLERR:
ERR("Error returned in polling fd %d.", pollfd[i].fd);
+ del_fd(local_kconsumerd_fd[i]);
+ update_fd_array = 1;
- send_error(KCONSUMERD_POLL_ERROR);
break;
case POLLHUP:
ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
break;
case POLLHUP:
ERR("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ del_fd(local_kconsumerd_fd[i]);
+ update_fd_array = 1;
num_hup++;
break;
case POLLNVAL:
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
num_hup++;
break;
case POLLNVAL:
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- send_error(KCONSUMERD_POLL_NVAL);
+ del_fd(local_kconsumerd_fd[i]);
+ update_fd_array = 1;
num_hup++;
break;
case POLLPRI:
num_hup++;
break;
case POLLPRI:
/* If every buffer FD has hung up, we end the read loop here */
if (nb_fd > 0 && num_hup == nb_fd) {
DBG("every buffer FD has hung up\n");
/* If every buffer FD has hung up, we end the read loop here */
if (nb_fd > 0 && num_hup == nb_fd) {
DBG("every buffer FD has hung up\n");
- send_error(KCONSUMERD_POLL_HUP);
- goto end;
+ if (quit == 1) {
+ goto end;
+ }
+ continue;
}
/* Take care of low priority channels. */
}
/* Take care of low priority channels. */
+ DBG("polling thread exiting");
if (pollfd != NULL) {
free(pollfd);
pollfd = NULL;
if (pollfd != NULL) {
free(pollfd);
pollfd = NULL;
*/
struct lttcomm_kconsumerd_header {
u32 payload_size;
*/
struct lttcomm_kconsumerd_header {
u32 payload_size;
- u32 cmd_type; /* enum lttcomm_consumerd_command */
- u32 ret_code; /* enum lttcomm_return_code */
+ u32 cmd_type; /* enum kconsumerd_command */
};
/* lttcomm_kconsumerd_msg represents a file descriptor to consume the
};
/* lttcomm_kconsumerd_msg represents a file descriptor to consume the