X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=38cdf70b467a752de700430bff2c76574139ee3e;hp=b48a3c821d386fbf3cbfaceb8b01d9bbcc4f724d;hb=725d28b2d246b597f9040d8a10c975eb083e9e4d;hpb=212d67a218a0e805950f85bd95143a996e957322 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b48a3c821..38cdf70b4 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -432,7 +434,7 @@ static int send_sessiond_channel(int sock, if (relayd_error) { *relayd_error = 1; } - ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } if (net_seq_idx == -1ULL) { net_seq_idx = stream->net_seq_idx; @@ -1116,17 +1118,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - if (msg.cmd_type == LTTNG_CONSUMER_STOP) { - /* - * Notify the session daemon that the command is completed. - * - * On transport layer error, the function call will print an error - * message so handling the returned code is a bit useless since we - * return an error code anyway. - */ - (void) consumer_send_status_msg(sock, ret_code); - return -ENOENT; - } + /* deprecated */ + assert(msg.cmd_type != LTTNG_CONSUMER_STOP); health_code_update(); @@ -1154,7 +1147,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, relayd = consumer_find_relayd(index); if (relayd == NULL) { DBG("Unable to find relayd %" PRIu64, index); - ret_code = LTTNG_ERR_NO_CONSUMER; + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } /* @@ -1335,7 +1328,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { ERR("UST consumer get channel key %" PRIu64 " not found", key); - ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; goto end_msg_sessiond; } @@ -1456,7 +1449,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_poll_entry(); ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); - if (ret < 0) { + if (ret) { goto error_fatal; } @@ -1491,7 +1484,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ctx); if (ret < 0) { ERR("Snapshot metadata failed"); - ret_code = LTTNG_ERR_UST_META_FAIL; + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; } } else { ret = snapshot_channel(msg.u.snapshot_channel.key, @@ -1501,7 +1494,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ctx); if (ret < 0) { ERR("Snapshot channel failed"); - ret_code = LTTNG_ERR_UST_CHAN_FAIL; + ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } } @@ -1862,6 +1855,57 @@ end: return ret; } +/* + * Return 0 on success else a negative value. + */ +static int notify_if_more_data(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret; + struct ustctl_consumer_stream *ustream; + + assert(stream); + assert(ctx); + + ustream = stream->ustream; + + /* + * First, we are going to check if there is a new subbuffer available + * before reading the stream wait_fd. + */ + /* Get the next subbuffer */ + ret = ustctl_get_next_subbuf(ustream); + if (ret) { + /* No more data found, flag the stream. */ + stream->has_data = 0; + ret = 0; + goto end; + } + + ret = ustctl_put_subbuf(ustream); + assert(!ret); + + /* This stream still has data. Flag it and wake up the data thread. */ + stream->has_data = 1; + + if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) { + ssize_t writelen; + + writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1); + if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + ret = writelen; + goto end; + } + + /* The wake up pipe has been notified. */ + ctx->has_wakeup = 1; + } + ret = 0; + +end: + return ret; +} + /* * Read subbuffer from the given stream. * @@ -1875,7 +1919,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, unsigned long len, subbuf_size, padding; int err, write_index = 1; long ret = 0; - char dummy; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -1890,11 +1933,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ustream = stream->ustream; /* - * We can consume the 1 byte written into the wait_fd by UST. - * Don't trigger error if we cannot read this one byte (read - * returns 0), or if the error is EAGAIN or EWOULDBLOCK. + * We can consume the 1 byte written into the wait_fd by UST. Don't trigger + * error if we cannot read this one byte (read returns 0), or if the error + * is EAGAIN or EWOULDBLOCK. + * + * This is only done when the stream is monitored by a thread, before the + * flush is done after a hangup and if the stream is not flagged with data + * since there might be nothing to consume in the wait fd but still have + * data available flagged by the consumer wake up pipe. */ - if (stream->monitor && !stream->hangup_flush_done) { + if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { + char dummy; ssize_t readlen; readlen = lttng_read(stream->wait_fd, &dummy, 1); @@ -1980,6 +2029,17 @@ retry: err = ustctl_put_next_subbuf(ustream); assert(err == 0); + /* + * This will consumer the byte on the wait_fd if and only if there is not + * next subbuffer to be acquired. + */ + if (!stream->metadata_flag) { + ret = notify_if_more_data(stream, ctx); + if (ret < 0) { + goto end; + } + } + /* Write index if needed. */ if (!write_index) { goto end; @@ -2085,7 +2145,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) */ DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, contiguous, pushed); - assert(((int64_t) contiguous - pushed) >= 0); + assert(((int64_t) (contiguous - pushed)) >= 0); if ((contiguous != pushed) || (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { ret = 1; /* Data is pending */ @@ -2212,6 +2272,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, assert(channel); assert(channel->metadata_cache); + memset(&request, 0, sizeof(request)); + /* send the metadata request to sessiond */ switch (consumer_data.type) { case LTTNG_CONSUMER64_UST: @@ -2322,3 +2384,15 @@ end: pthread_mutex_unlock(&ctx->metadata_socket_lock); return ret; } + +/* + * Return the ustctl call for the get stream id. + */ +int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, + uint64_t *stream_id) +{ + assert(stream); + assert(stream_id); + + return ustctl_get_stream_id(stream->ustream, stream_id); +}