static struct lttng_ht *data_ht;
/*
- * Notify a thread pipe to poll back again. This usually means that some global
- * state has changed so we just send back the thread in a poll wait call.
+ * Notify a thread lttng pipe to poll back again. This usually means that some
+ * global state has changed so we just send back the thread in a poll wait
+ * call.
*/
-static void notify_thread_pipe(int wpipe)
+static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
{
- int ret;
+ struct lttng_consumer_stream *null_stream = NULL;
- do {
- struct lttng_consumer_stream *null_stream = NULL;
+ assert(pipe);
- ret = write(wpipe, &null_stream, sizeof(null_stream));
- } while (ret < 0 && errno == EINTR);
+ (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
}
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
* read of this status which happens AFTER receiving this notify.
*/
if (ctx) {
- notify_thread_pipe(ctx->consumer_data_pipe[1]);
- notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+ notify_thread_lttng_pipe(ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
}
}
uatomic_inc(&relayd->refcount);
}
- /* Update channel refcount once added without error(s). */
- uatomic_inc(&stream->chan->refcount);
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
* Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
+ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_data_pipe);
- if (ret < 0) {
- PERROR("Error creating poll pipe");
+ ctx->consumer_data_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_data_pipe) {
goto error_poll_pipe;
}
- /* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto error_poll_fcntl;
- }
-
- /* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto error_poll_fcntl;
- }
-
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
PERROR("Error creating recv pipe");
goto error_channel_pipe;
}
- ret = utils_create_pipe(ctx->consumer_metadata_pipe);
- if (ret < 0) {
+ ctx->consumer_metadata_pipe = lttng_pipe_open(0);
+ if (!ctx->consumer_metadata_pipe) {
goto error_metadata_pipe;
}
return ctx;
error_splice_pipe:
- utils_close_pipe(ctx->consumer_metadata_pipe);
+ lttng_pipe_destroy(ctx->consumer_metadata_pipe);
error_metadata_pipe:
utils_close_pipe(ctx->consumer_channel_pipe);
error_channel_pipe:
utils_close_pipe(ctx->consumer_thread_pipe);
error_thread_pipe:
utils_close_pipe(ctx->consumer_should_quit);
-error_poll_fcntl:
error_quit_pipe:
- utils_close_pipe(ctx->consumer_data_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_pipe);
error_poll_pipe:
free(ctx);
error:
}
utils_close_pipe(ctx->consumer_thread_pipe);
utils_close_pipe(ctx->consumer_channel_pipe);
- utils_close_pipe(ctx->consumer_data_pipe);
+ lttng_pipe_destroy(ctx->consumer_data_pipe);
+ lttng_pipe_destroy(ctx->consumer_metadata_pipe);
utils_close_pipe(ctx->consumer_should_quit);
utils_close_pipe(ctx->consumer_splice_metadata_pipe);
goto end_poll;
}
- ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
+ ret = lttng_poll_add(&events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
if (ret < 0) {
goto end;
}
continue;
}
- if (pollfd == ctx->consumer_metadata_pipe[0]) {
+ if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & (LPOLLERR | LPOLLHUP )) {
DBG("Metadata thread pipe hung up");
/*
* Remove the pipe from the poll set and continue the loop
* since their might be data to consume.
*/
- lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
- ret = close(ctx->consumer_metadata_pipe[0]);
- if (ret < 0) {
- PERROR("close metadata pipe");
- }
+ lttng_poll_del(&events,
+ lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+ lttng_pipe_read_close(ctx->consumer_metadata_pipe);
continue;
} else if (revents & LPOLLIN) {
- do {
- /* Get the stream pointer received */
- ret = read(pollfd, &stream, sizeof(stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 ||
- ret < sizeof(struct lttng_consumer_stream *)) {
- PERROR("read metadata stream");
+ ssize_t pipe_len;
+
+ pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
+ &stream, sizeof(stream));
+ if (pipe_len < 0) {
+ ERR("read metadata stream, ret: %ld", pipe_len);
/*
- * Let's continue here and hope we can still work
- * without stopping the consumer. XXX: Should we?
+ * Continue here to handle the rest of the streams.
*/
continue;
}
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
- /* Consume 1 byte of pipe data */
- do {
- pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
- sizeof(new_stream));
- } while (pipe_readlen == -1 && errno == EINTR);
+ pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+ &new_stream, sizeof(new_stream));
if (pipe_readlen < 0) {
- PERROR("read consumer data pipe");
+ ERR("Consumer data pipe ret %ld", pipe_readlen);
/* Continue so we can at least handle the current stream(s). */
continue;
}
* only tracked fd in the poll set. The thread will take care of closing
* the read side.
*/
- ret = close(ctx->consumer_metadata_pipe[1]);
- if (ret < 0) {
- PERROR("close data pipe");
- }
+ (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
destroy_data_stream_ht(data_ht);
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
*/
- notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ notify_thread_lttng_pipe(ctx->consumer_data_pipe);
notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);