X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=990db9e2690eff92ec683b8dd1df1aef65542ab0;hp=e26388f8e50fab81d5df0999ab5fb73948e8faba;hb=4c95e622041958250db73b497097ed93f7715e20;hpb=b31398bb2b3fa91a53dea3b36fd693da4b50e0d3 diff --git a/src/common/consumer.c b/src/common/consumer.c index e26388f8e..990db9e26 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -77,18 +77,17 @@ static struct lttng_ht *metadata_ht; static struct lttng_ht *data_ht; /* - * Notify a thread pipe to poll back again. This usually means that some global - * state has changed so we just send back the thread in a poll wait call. + * Notify a thread lttng pipe to poll back again. This usually means that some + * global state has changed so we just send back the thread in a poll wait + * call. */ -static void notify_thread_pipe(int wpipe) +static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) { - int ret; + struct lttng_consumer_stream *null_stream = NULL; - do { - struct lttng_consumer_stream *null_stream = NULL; + assert(pipe); - ret = write(wpipe, &null_stream, sizeof(null_stream)); - } while (ret < 0 && errno == EINTR); + (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); } static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, @@ -408,8 +407,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * read of this status which happens AFTER receiving this notify. */ if (ctx) { - notify_thread_pipe(ctx->consumer_data_pipe[1]); - notify_thread_pipe(ctx->consumer_metadata_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_data_pipe); + notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); } } @@ -464,6 +463,13 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, PERROR("munmap"); } } + + if (stream->wait_fd >= 0) { + ret = close(stream->wait_fd); + if (ret) { + PERROR("close"); + } + } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: @@ -970,7 +976,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * Insert the consumer_data_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ - (*pollfd)[i].fd = ctx->consumer_data_pipe[0]; + (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -1166,26 +1172,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->on_recv_stream = recv_stream; ctx->on_update_stream = update_stream; - ret = pipe(ctx->consumer_data_pipe); - if (ret < 0) { - PERROR("Error creating poll pipe"); + ctx->consumer_data_pipe = lttng_pipe_open(0); + if (!ctx->consumer_data_pipe) { goto error_poll_pipe; } - /* set read end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto error_poll_fcntl; - } - - /* set write end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK); - if (ret < 0) { - PERROR("fcntl O_NONBLOCK"); - goto error_poll_fcntl; - } - ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1204,8 +1195,8 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_channel_pipe; } - ret = utils_create_pipe(ctx->consumer_metadata_pipe); - if (ret < 0) { + ctx->consumer_metadata_pipe = lttng_pipe_open(0); + if (!ctx->consumer_metadata_pipe) { goto error_metadata_pipe; } @@ -1217,16 +1208,15 @@ struct lttng_consumer_local_data *lttng_consumer_create( return ctx; error_splice_pipe: - utils_close_pipe(ctx->consumer_metadata_pipe); + lttng_pipe_destroy(ctx->consumer_metadata_pipe); error_metadata_pipe: utils_close_pipe(ctx->consumer_channel_pipe); error_channel_pipe: utils_close_pipe(ctx->consumer_thread_pipe); error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); -error_poll_fcntl: error_quit_pipe: - utils_close_pipe(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); error: @@ -1252,7 +1242,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) } utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); - utils_close_pipe(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_metadata_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -1892,6 +1883,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, PERROR("munmap metadata stream"); } } + + if (stream->wait_fd >= 0) { + ret = close(stream->wait_fd); + if (ret < 0) { + PERROR("close kernel metadata wait_fd"); + } + } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: @@ -2133,7 +2131,8 @@ void *consumer_thread_metadata_poll(void *data) goto end_poll; } - ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN); + ret = lttng_poll_add(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN); if (ret < 0) { goto end; } @@ -2171,30 +2170,26 @@ restart: continue; } - if (pollfd == ctx->consumer_metadata_pipe[0]) { + if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); /* * Remove the pipe from the poll set and continue the loop * since their might be data to consume. */ - lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]); - ret = close(ctx->consumer_metadata_pipe[0]); - if (ret < 0) { - PERROR("close metadata pipe"); - } + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); continue; } else if (revents & LPOLLIN) { - do { - /* Get the stream pointer received */ - ret = read(pollfd, &stream, sizeof(stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || - ret < sizeof(struct lttng_consumer_stream *)) { - PERROR("read metadata stream"); + ssize_t pipe_len; + + pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, + &stream, sizeof(stream)); + if (pipe_len < 0) { + ERR("read metadata stream, ret: %ld", pipe_len); /* - * Let's continue here and hope we can still work - * without stopping the consumer. XXX: Should we? + * Continue here to handle the rest of the streams. */ continue; } @@ -2401,13 +2396,10 @@ void *consumer_thread_data_poll(void *data) ssize_t pipe_readlen; DBG("consumer_data_pipe wake up"); - /* Consume 1 byte of pipe data */ - do { - pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, - sizeof(new_stream)); - } while (pipe_readlen == -1 && errno == EINTR); + pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe, + &new_stream, sizeof(new_stream)); if (pipe_readlen < 0) { - PERROR("read consumer data pipe"); + ERR("Consumer data pipe ret %ld", pipe_readlen); /* Continue so we can at least handle the current stream(s). */ continue; } @@ -2545,10 +2537,7 @@ end: * only tracked fd in the poll set. The thread will take care of closing * the read side. */ - ret = close(ctx->consumer_metadata_pipe[1]); - if (ret < 0) { - PERROR("close data pipe"); - } + (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe); destroy_data_stream_ht(data_ht); @@ -2967,7 +2956,7 @@ end: * Notify the data poll thread to poll back again and test the * consumer_quit state that we just set so to quit gracefully. */ - notify_thread_pipe(ctx->consumer_data_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_data_pipe); notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);