X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=fb0d7d05afb182992c97c789dcdd54544bb2fcea;hp=35df86dd81c922217a6dbf628af93fd79d5b3432;hb=50f8ae690312d8f824fb9c9875b0a07f4a2547b6;hpb=58b1f4255ea457f2965f31b84205cb0eec21e71f diff --git a/src/common/consumer.c b/src/common/consumer.c index 35df86dd8..fb0d7d05a 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -281,16 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); - rcu_read_unlock(); - if (consumer_data.stream_count <= 0) { - goto end; - } + assert(consumer_data.stream_count > 0); consumer_data.stream_count--; - if (!stream) { - goto end; - } + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -820,10 +815,10 @@ static int consumer_update_poll_array( rcu_read_unlock(); /* - * Insert the consumer_poll_pipe at the end of the array and don't + * Insert the consumer_data_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ - (*pollfd)[i].fd = ctx->consumer_poll_pipe[0]; + (*pollfd)[i].fd = ctx->consumer_data_pipe[0]; (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -1020,21 +1015,21 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->on_recv_stream = recv_stream; ctx->on_update_stream = update_stream; - ret = pipe(ctx->consumer_poll_pipe); + ret = pipe(ctx->consumer_data_pipe); if (ret < 0) { PERROR("Error creating poll pipe"); goto error_poll_pipe; } /* set read end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK); + ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK); if (ret < 0) { PERROR("fcntl O_NONBLOCK"); goto error_poll_fcntl; } /* set write end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK); + ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK); if (ret < 0) { PERROR("fcntl O_NONBLOCK"); goto error_poll_fcntl; @@ -1082,7 +1077,7 @@ error_quit_pipe: for (i = 0; i < 2; i++) { int err; - err = close(ctx->consumer_poll_pipe[i]); + err = close(ctx->consumer_data_pipe[i]); if (err) { PERROR("close"); } @@ -1112,11 +1107,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - ret = close(ctx->consumer_poll_pipe[0]); + ret = close(ctx->consumer_data_pipe[0]); if (ret) { PERROR("close"); } - ret = close(ctx->consumer_poll_pipe[1]); + ret = close(ctx->consumer_data_pipe[1]); if (ret) { PERROR("close"); } @@ -1981,7 +1976,7 @@ void *consumer_thread_data_poll(void *data) local_stream = NULL; } - /* allocate for all fds + 1 for the consumer_poll_pipe */ + /* allocate for all fds + 1 for the consumer_data_pipe */ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); @@ -1989,7 +1984,7 @@ void *consumer_thread_data_poll(void *data) goto end; } - /* allocate for all fds + 1 for the consumer_poll_pipe */ + /* allocate for all fds + 1 for the consumer_data_pipe */ local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { @@ -2035,17 +2030,17 @@ void *consumer_thread_data_poll(void *data) } /* - * If the consumer_poll_pipe triggered poll go directly to the + * If the consumer_data_pipe triggered poll go directly to the * beginning of the loop to update the array. We want to prioritize * array update over low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; - DBG("consumer_poll_pipe wake up"); + DBG("consumer_data_pipe wake up"); /* Consume 1 byte of pipe data */ do { - pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream, + pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); @@ -2296,7 +2291,7 @@ end: do { struct lttng_consumer_stream *null_stream = NULL; - ret = write(ctx->consumer_poll_pipe[1], &null_stream, + ret = write(ctx->consumer_data_pipe[1], &null_stream, sizeof(null_stream)); } while (ret < 0 && errno == EINTR);