X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=38cdf70b467a752de700430bff2c76574139ee3e;hp=0955e66fa4e384fdde2f70703a96e626b8ebfc8c;hb=725d28b2d246b597f9040d8a10c975eb083e9e4d;hpb=84382d49b1e8a17037d0415d84f1c9e250163494 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0955e66fa..38cdf70b4 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -432,7 +434,7 @@ static int send_sessiond_channel(int sock, if (relayd_error) { *relayd_error = 1; } - ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } if (net_seq_idx == -1ULL) { net_seq_idx = stream->net_seq_idx; @@ -1145,7 +1147,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { DBG("Unable to find relayd %" PRIu64, index); - ret_code = LTTNG_ERR_NO_CONSUMER; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } /* @@ -1326,7 +1328,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { ERR("UST consumer get channel key %" PRIu64 " not found", key); - ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; goto end_msg_sessiond; } @@ -1482,7 +1484,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ctx); if (ret < 0) { ERR("Snapshot metadata failed"); - ret_code = LTTNG_ERR_UST_META_FAIL; + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; } } else { ret = snapshot_channel(msg.u.snapshot_channel.key, @@ -1492,7 +1494,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ctx); if (ret < 0) { ERR("Snapshot channel failed"); - ret_code = LTTNG_ERR_UST_CHAN_FAIL; + ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } } @@ -1853,6 +1855,57 @@ end: return ret; } +/* + * Return 0 on success else a negative value. + */ +static int notify_if_more_data(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct ustctl_consumer_stream *ustream; + + assert(stream); + assert(ctx); + + ustream = stream->ustream; + + /* + * First, we are going to check if there is a new subbuffer available + * before reading the stream wait_fd. + */ + /* Get the next subbuffer */ + ret = ustctl_get_next_subbuf(ustream); + if (ret) { + /* No more data found, flag the stream. */ + stream->has_data = 0; + ret = 0; + goto end; + } + + ret = ustctl_put_subbuf(ustream); + assert(!ret); + + /* This stream still has data. Flag it and wake up the data thread. */ + stream->has_data = 1; + + if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) { + ssize_t writelen; + + writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1); + if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + ret = writelen; + goto end; + } + + /* The wake up pipe has been notified. */ + ctx->has_wakeup = 1; + } + ret = 0; + +end: + return ret; +} + /* * Read subbuffer from the given stream. * @@ -1866,7 +1919,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, unsigned long len, subbuf_size, padding; int err, write_index = 1; long ret = 0; - char dummy; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -1881,11 +1933,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ustream = stream->ustream; /* - * We can consume the 1 byte written into the wait_fd by UST. - * Don't trigger error if we cannot read this one byte (read - * returns 0), or if the error is EAGAIN or EWOULDBLOCK. + * We can consume the 1 byte written into the wait_fd by UST. Don't trigger + * error if we cannot read this one byte (read returns 0), or if the error + * is EAGAIN or EWOULDBLOCK. + * + * This is only done when the stream is monitored by a thread, before the + * flush is done after a hangup and if the stream is not flagged with data + * since there might be nothing to consume in the wait fd but still have + * data available flagged by the consumer wake up pipe. */ - if (stream->monitor && !stream->hangup_flush_done) { + if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { + char dummy; ssize_t readlen; readlen = lttng_read(stream->wait_fd, &dummy, 1); @@ -1971,6 +2029,17 @@ retry: err = ustctl_put_next_subbuf(ustream); assert(err == 0); + /* + * This will consumer the byte on the wait_fd if and only if there is not + * next subbuffer to be acquired. + */ + if (!stream->metadata_flag) { + ret = notify_if_more_data(stream, ctx); + if (ret < 0) { + goto end; + } + } + /* Write index if needed. */ if (!write_index) { goto end; @@ -2076,7 +2145,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) */ DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, contiguous, pushed); - assert(((int64_t) contiguous - pushed) >= 0); + assert(((int64_t) (contiguous - pushed)) >= 0); if ((contiguous != pushed) || (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { ret = 1; /* Data is pending */ @@ -2315,3 +2384,15 @@ end: pthread_mutex_unlock(&ctx->metadata_socket_lock); return ret; } + +/* + * Return the ustctl call for the get stream id. + */ +int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, + uint64_t *stream_id) +{ + assert(stream); + assert(stream_id); + + return ustctl_get_stream_id(stream->ustream, stream_id); +}