#include <common/sessiond-comm/sessiond-comm.h>
#include <common/relayd/relayd.h>
#include <common/compat/fcntl.h>
+#include <common/compat/endian.h>
#include <common/consumer-metadata-cache.h>
#include <common/consumer-stream.h>
#include <common/consumer-timer.h>
health_code_update();
- if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
- /*
- * Notify the session daemon that the command is completed.
- *
- * On transport layer error, the function call will print an error
- * message so handling the returned code is a bit useless since we
- * return an error code anyway.
- */
- (void) consumer_send_status_msg(sock, ret_code);
- return -ENOENT;
- }
+ /* deprecated */
+ assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
health_code_update();
relayd = consumer_find_relayd(index);
if (relayd == NULL) {
DBG("Unable to find relayd %" PRIu64, index);
- ret_code = LTTNG_ERR_NO_CONSUMER;
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
}
/*
channel = consumer_find_channel(key);
if (!channel) {
ERR("UST consumer get channel key %" PRIu64 " not found", key);
- ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
goto end_msg_sessiond;
}
health_poll_entry();
ret = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret < 0) {
+ if (ret) {
goto error_fatal;
}
ctx);
if (ret < 0) {
ERR("Snapshot metadata failed");
- ret_code = LTTNG_ERR_UST_META_FAIL;
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
}
} else {
ret = snapshot_channel(msg.u.snapshot_channel.key,
ctx);
if (ret < 0) {
ERR("Snapshot channel failed");
- ret_code = LTTNG_ERR_UST_CHAN_FAIL;
+ ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
}
}
return ret;
}
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret;
+ struct ustctl_consumer_stream *ustream;
+
+ assert(stream);
+ assert(ctx);
+
+ ustream = stream->ustream;
+
+ /*
+ * First, we are going to check if there is a new subbuffer available
+ * before reading the stream wait_fd.
+ */
+ /* Get the next subbuffer */
+ ret = ustctl_get_next_subbuf(ustream);
+ if (ret) {
+ /* No more data found, flag the stream. */
+ stream->has_data = 0;
+ ret = 0;
+ goto end;
+ }
+
+ ret = ustctl_put_next_subbuf(ustream);
+ assert(!ret);
+
+ /* This stream still has data. Flag it and wake up the data thread. */
+ stream->has_data = 1;
+
+ if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+ ssize_t writelen;
+
+ writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+ if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ ret = writelen;
+ goto end;
+ }
+
+ /* The wake up pipe has been notified. */
+ ctx->has_wakeup = 1;
+ }
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Read subbuffer from the given stream.
*
unsigned long len, subbuf_size, padding;
int err, write_index = 1;
long ret = 0;
- char dummy;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
ustream = stream->ustream;
/*
- * We can consume the 1 byte written into the wait_fd by UST.
- * Don't trigger error if we cannot read this one byte (read
- * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+ * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+ * error if we cannot read this one byte (read returns 0), or if the error
+ * is EAGAIN or EWOULDBLOCK.
+ *
+ * This is only done when the stream is monitored by a thread, before the
+ * flush is done after a hangup and if the stream is not flagged with data
+ * since there might be nothing to consume in the wait fd but still have
+ * data available flagged by the consumer wake up pipe.
*/
- if (stream->monitor && !stream->hangup_flush_done) {
+ if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+ char dummy;
ssize_t readlen;
readlen = lttng_read(stream->wait_fd, &dummy, 1);
err = ustctl_put_next_subbuf(ustream);
assert(err == 0);
+ /*
+ * This will consumer the byte on the wait_fd if and only if there is not
+ * next subbuffer to be acquired.
+ */
+ if (!stream->metadata_flag) {
+ ret = notify_if_more_data(stream, ctx);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
/* Write index if needed. */
if (!write_index) {
goto end;
*/
DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
contiguous, pushed);
- assert(((int64_t) contiguous - pushed) >= 0);
+ assert(((int64_t) (contiguous - pushed)) >= 0);
if ((contiguous != pushed) ||
(((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
ret = 1; /* Data is pending */
assert(channel);
assert(channel->metadata_cache);
+ memset(&request, 0, sizeof(request));
+
/* send the metadata request to sessiond */
switch (consumer_data.type) {
case LTTNG_CONSUMER64_UST:
pthread_mutex_unlock(&ctx->metadata_socket_lock);
return ret;
}
+
+/*
+ * Return the ustctl call for the get stream id.
+ */
+int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
+ uint64_t *stream_id)
+{
+ assert(stream);
+ assert(stream_id);
+
+ return ustctl_get_stream_id(stream->ustream, stream_id);
+}