Cleanup: Mixed enums used for return code in send_sessiond_channel()
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 0955e66fa4e384fdde2f70703a96e626b8ebfc8c..38cdf70b467a752de700430bff2c76574139ee3e 100644 (file)
@@ -17,6 +17,7 @@
  */
 
 #define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <lttng/ust-ctl.h>
 #include <poll.h>
@@ -37,6 +38,7 @@
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/relayd/relayd.h>
 #include <common/compat/fcntl.h>
+#include <common/compat/endian.h>
 #include <common/consumer-metadata-cache.h>
 #include <common/consumer-stream.h>
 #include <common/consumer-timer.h>
@@ -432,7 +434,7 @@ static int send_sessiond_channel(int sock,
                                if (relayd_error) {
                                        *relayd_error = 1;
                                }
-                               ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
+                               ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        }
                        if (net_seq_idx == -1ULL) {
                                net_seq_idx = stream->net_seq_idx;
@@ -1145,7 +1147,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                relayd = consumer_find_relayd(index);
                if (relayd == NULL) {
                        DBG("Unable to find relayd %" PRIu64, index);
-                       ret_code = LTTNG_ERR_NO_CONSUMER;
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                }
 
                /*
@@ -1326,7 +1328,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                channel = consumer_find_channel(key);
                if (!channel) {
                        ERR("UST consumer get channel key %" PRIu64 " not found", key);
-                       ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
                        goto end_msg_sessiond;
                }
 
@@ -1482,7 +1484,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot metadata failed");
-                               ret_code = LTTNG_ERR_UST_META_FAIL;
+                               ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
                        }
                } else {
                        ret = snapshot_channel(msg.u.snapshot_channel.key,
@@ -1492,7 +1494,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
-                               ret_code = LTTNG_ERR_UST_CHAN_FAIL;
+                               ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
                        }
                }
 
@@ -1853,6 +1855,57 @@ end:
        return ret;
 }
 
+/*
+ * Return 0 on success else a negative value.
+ */
+static int notify_if_more_data(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+       struct ustctl_consumer_stream *ustream;
+
+       assert(stream);
+       assert(ctx);
+
+       ustream = stream->ustream;
+
+       /*
+        * First, we are going to check if there is a new subbuffer available
+        * before reading the stream wait_fd.
+        */
+       /* Get the next subbuffer */
+       ret = ustctl_get_next_subbuf(ustream);
+       if (ret) {
+               /* No more data found, flag the stream. */
+               stream->has_data = 0;
+               ret = 0;
+               goto end;
+       }
+
+       ret = ustctl_put_subbuf(ustream);
+       assert(!ret);
+
+       /* This stream still has data. Flag it and wake up the data thread. */
+       stream->has_data = 1;
+
+       if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
+               ssize_t writelen;
+
+               writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
+               if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       ret = writelen;
+                       goto end;
+               }
+
+               /* The wake up pipe has been notified. */
+               ctx->has_wakeup = 1;
+       }
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Read subbuffer from the given stream.
  *
@@ -1866,7 +1919,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        unsigned long len, subbuf_size, padding;
        int err, write_index = 1;
        long ret = 0;
-       char dummy;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
 
@@ -1881,11 +1933,17 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ustream = stream->ustream;
 
        /*
-        * We can consume the 1 byte written into the wait_fd by UST.
-        * Don't trigger error if we cannot read this one byte (read
-        * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+        * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
+        * error if we cannot read this one byte (read returns 0), or if the error
+        * is EAGAIN or EWOULDBLOCK.
+        *
+        * This is only done when the stream is monitored by a thread, before the
+        * flush is done after a hangup and if the stream is not flagged with data
+        * since there might be nothing to consume in the wait fd but still have
+        * data available flagged by the consumer wake up pipe.
         */
-       if (stream->monitor && !stream->hangup_flush_done) {
+       if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+               char dummy;
                ssize_t readlen;
 
                readlen = lttng_read(stream->wait_fd, &dummy, 1);
@@ -1971,6 +2029,17 @@ retry:
        err = ustctl_put_next_subbuf(ustream);
        assert(err == 0);
 
+       /*
+        * This will consumer the byte on the wait_fd if and only if there is not
+        * next subbuffer to be acquired.
+        */
+       if (!stream->metadata_flag) {
+               ret = notify_if_more_data(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
        /* Write index if needed. */
        if (!write_index) {
                goto end;
@@ -2076,7 +2145,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                 */
                DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
                                contiguous, pushed);
-               assert(((int64_t) contiguous - pushed) >= 0);
+               assert(((int64_t) (contiguous - pushed)) >= 0);
                if ((contiguous != pushed) ||
                                (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
                        ret = 1;        /* Data is pending */
@@ -2315,3 +2384,15 @@ end:
        pthread_mutex_unlock(&ctx->metadata_socket_lock);
        return ret;
 }
+
+/*
+ * Return the ustctl call for the get stream id.
+ */
+int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
+               uint64_t *stream_id)
+{
+       assert(stream);
+       assert(stream_id);
+
+       return ustctl_get_stream_id(stream->ustream, stream_id);
+}
This page took 0.025565 seconds and 4 git commands to generate.