X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=999e400059fbe763264698a75c67b3fb80d6c717;hb=c6aa2d41cdd9daddc133e43cf6e1f2daae818cdd;hp=821a04e3d73d80025e76ad99f1cb0558725c7188;hpb=b5a6470f372799b28d3d20603c1c0c8e5871dd63;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 821a04e3d..999e40005 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -1091,12 +1092,15 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, */ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; + + (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe); + (*pollfd)[i + 1].events = POLLIN | POLLPRI; return i; } /* - * Poll on the should_quit pipe and the command socket return -1 on error and - * should exit, 0 if data is available on the command socket + * Poll on the should_quit pipe and the command socket return -1 on + * error, 1 if should exit, 0 if data is available on the command socket */ int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll) { @@ -1112,16 +1116,13 @@ restart: goto restart; } PERROR("Poll error"); - goto exit; + return -1; } if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) { DBG("consumer_should_quit wake up"); - goto exit; + return 1; } return 0; - -exit: - return -1; } /* @@ -1290,6 +1291,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + ctx->consumer_wakeup_pipe = lttng_pipe_open(0); + if (!ctx->consumer_wakeup_pipe) { + goto error_wakeup_pipe; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1329,6 +1335,8 @@ error_channel_pipe: error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); +error_wakeup_pipe: lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); @@ -1396,6 +1404,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + if (!ctx) { + return; + } + destroy_data_stream_ht(data_ht); destroy_metadata_stream_ht(metadata_ht); @@ -1411,6 +1423,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -2405,16 +2418,18 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* allocate for all fds + 1 for the consumer_data_pipe */ - pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + /* + * Allocate for all fds +1 for the consumer_data_pipe and +1 for + * wake up pipe. + */ + pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - /* allocate for all fds + 1 for the consumer_data_pipe */ - local_stream = zmalloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 2) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2441,9 +2456,9 @@ void *consumer_thread_data_poll(void *data) } /* poll on the array of fds */ restart: - DBG("polling on %d fd", nb_fd + 1); + DBG("polling on %d fd", nb_fd + 2); health_poll_entry(); - num_rdy = poll(pollfd, nb_fd + 1, -1); + num_rdy = poll(pollfd, nb_fd + 2, -1); health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { @@ -2492,6 +2507,20 @@ void *consumer_thread_data_poll(void *data) continue; } + /* Handle wakeup pipe. */ + if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) { + char dummy; + ssize_t pipe_readlen; + + pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, + sizeof(dummy)); + if (pipe_readlen < 0) { + PERROR("Consumer data wakeup pipe"); + } + /* We've been awakened to handle stream(s). */ + ctx->has_wakeup = 0; + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { health_code_update(); @@ -2530,7 +2559,8 @@ void *consumer_thread_data_poll(void *data) continue; } if ((pollfd[i].revents & POLLIN) || - local_stream[i]->hangup_flush_done) { + local_stream[i]->hangup_flush_done || + local_stream[i]->has_data) { DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ @@ -2944,8 +2974,8 @@ static int set_metadata_socket(struct lttng_consumer_local_data *ctx, assert(ctx); assert(sockpoll); - if (lttng_consumer_poll_socket(sockpoll) < 0) { - ret = -1; + ret = lttng_consumer_poll_socket(sockpoll); + if (ret) { goto error; } DBG("Metadata connection on client_socket"); @@ -3014,7 +3044,12 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].fd = client_socket; consumer_sockpoll[1].events = POLLIN | POLLPRI; - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = lttng_consumer_poll_socket(consumer_sockpoll); + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } DBG("Connection on client_socket"); @@ -3031,7 +3066,11 @@ void *consumer_thread_sessiond_poll(void *data) * command unix socket. */ ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket); - if (ret < 0) { + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } @@ -3052,15 +3091,15 @@ void *consumer_thread_sessiond_poll(void *data) health_poll_entry(); ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); - if (ret < 0) { + if (ret) { + if (ret > 0) { + /* should exit */ + err = 0; + } goto end; } DBG("Incoming command on sock"); ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll); - if (ret == -ENOENT) { - DBG("Received STOP command"); - goto end; - } if (ret <= 0) { /* * This could simply be a session daemon quitting. Don't output @@ -3278,7 +3317,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, } /* Poll on consumer socket. */ - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + ret = lttng_consumer_poll_socket(consumer_sockpoll); + if (ret) { + /* Needing to exit in the middle of a command: error. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); ret = -EINTR; goto error_nosignal; @@ -3613,6 +3654,7 @@ int consumer_send_status_msg(int sock, int ret_code) { struct lttcomm_consumer_status_msg msg; + memset(&msg, 0, sizeof(msg)); msg.ret_code = ret_code; return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); @@ -3630,6 +3672,7 @@ int consumer_send_status_channel(int sock, assert(sock >= 0); + memset(&msg, 0, sizeof(msg)); if (!channel) { msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } else {