X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=9f2e739a36fb59ba5bbba2df574dfcecbab2327d;hp=0955e66fa4e384fdde2f70703a96e626b8ebfc8c;hb=02b3d1769d5f8a33e4109b1e681141c9295dfda6;hpb=84382d49b1e8a17037d0415d84f1c9e250163494 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0955e66fa..9f2e739a3 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1853,6 +1853,57 @@ end: return ret; } +/* + * Return 0 on success else a negative value. + */ +static int notify_if_more_data(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct ustctl_consumer_stream *ustream; + + assert(stream); + assert(ctx); + + ustream = stream->ustream; + + /* + * First, we are going to check if there is a new subbuffer available + * before reading the stream wait_fd. + */ + /* Get the next subbuffer */ + ret = ustctl_get_next_subbuf(ustream); + if (ret) { + /* No more data found, flag the stream. */ + stream->has_data = 0; + ret = 0; + goto end; + } + + ret = ustctl_put_next_subbuf(ustream); + assert(!ret); + + /* This stream still has data. Flag it and wake up the data thread. */ + stream->has_data = 1; + + if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) { + ssize_t writelen; + + writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1); + if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + ret = writelen; + goto end; + } + + /* The wake up pipe has been notified. */ + ctx->has_wakeup = 1; + } + ret = 0; + +end: + return ret; +} + /* * Read subbuffer from the given stream. * @@ -1866,7 +1917,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, unsigned long len, subbuf_size, padding; int err, write_index = 1; long ret = 0; - char dummy; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -1881,11 +1931,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ustream = stream->ustream; /* - * We can consume the 1 byte written into the wait_fd by UST. - * Don't trigger error if we cannot read this one byte (read - * returns 0), or if the error is EAGAIN or EWOULDBLOCK. + * We can consume the 1 byte written into the wait_fd by UST. Don't trigger + * error if we cannot read this one byte (read returns 0), or if the error + * is EAGAIN or EWOULDBLOCK. + * + * This is only done when the stream is monitored by a thread, before the + * flush is done after a hangup and if the stream is not flagged with data + * since there might be nothing to consume in the wait fd but still have + * data available flagged by the consumer wake up pipe. */ - if (stream->monitor && !stream->hangup_flush_done) { + if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { + char dummy; ssize_t readlen; readlen = lttng_read(stream->wait_fd, &dummy, 1); @@ -1971,6 +2027,17 @@ retry: err = ustctl_put_next_subbuf(ustream); assert(err == 0); + /* + * This will consumer the byte on the wait_fd if and only if there is not + * next subbuffer to be acquired. + */ + if (!stream->metadata_flag) { + ret = notify_if_more_data(stream, ctx); + if (ret < 0) { + goto end; + } + } + /* Write index if needed. */ if (!write_index) { goto end;