Fix: race with the viewer and readiness of streams
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 1a7b2cd4b5a01edd85cf83569d69109d454b6698..af3aca0a5dfad074ba163a875770244af13e475b 100644 (file)
@@ -407,8 +407,9 @@ static int send_sessiond_channel(int sock,
                struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx, int *relayd_error)
 {
-       int ret, ret_code = LTTNG_OK;
+       int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
+       uint64_t net_seq_idx = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -433,12 +434,26 @@ static int send_sessiond_channel(int sock,
                                }
                                ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                        }
+                       if (net_seq_idx == -1ULL) {
+                               net_seq_idx = stream->net_seq_idx;
+                       }
+               }
+               ret = consumer_send_relayd_streams_sent(net_seq_idx);
+               if (ret < 0) {
+                       /*
+                        * Flag that the relayd was the problem here probably due to a
+                        * communicaton error on the socket.
+                        */
+                       if (relayd_error) {
+                               *relayd_error = 1;
+                       }
+                       ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                }
        }
 
        /* Inform sessiond that we are about to send channel and streams. */
        ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0 || ret_code != LTTNG_OK) {
+       if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                /*
                 * Either the session daemon is not responding or the relayd died so we
                 * stop now.
@@ -480,7 +495,7 @@ static int send_sessiond_channel(int sock,
        return 0;
 
 error:
-       if (ret_code != LTTNG_OK) {
+       if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                ret = -1;
        }
        return ret;
@@ -635,7 +650,7 @@ error:
  */
 static int _close_metadata(struct lttng_consumer_channel *channel)
 {
-       int ret = LTTNG_OK;
+       int ret = LTTCOMM_CONSUMERD_SUCCESS;
 
        assert(channel);
        assert(channel->type == CONSUMER_CHANNEL_TYPE_METADATA);
@@ -939,6 +954,12 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
                                        stream->name, stream->key);
                }
+               if (relayd_id != -1ULL) {
+                       ret = consumer_send_relayd_streams_sent(relayd_id);
+                       if (ret < 0) {
+                               goto error_unlock;
+                       }
+               }
 
                ustctl_flush_buffer(stream->ustream, 1);
 
@@ -1050,7 +1071,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
                int timer, int wait)
 {
-       int ret, ret_code = LTTNG_OK;
+       int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
 
        DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
@@ -1115,7 +1136,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
        struct lttng_consumer_channel *channel = NULL;
 
@@ -1257,9 +1278,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
                attr.chan_id = msg.u.ask_channel.chan_id;
-               attr.output = msg.u.ask_channel.output;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
 
+               /* Match channel buffer type to the UST abi. */
+               switch (msg.u.ask_channel.output) {
+               case LTTNG_EVENT_MMAP:
+               default:
+                       attr.output = LTTNG_UST_MMAP;
+                       break;
+               }
+
                /* Translate and save channel type. */
                switch (msg.u.ask_channel.type) {
                case LTTNG_UST_CHAN_PER_CPU:
@@ -1444,15 +1472,22 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                channel = consumer_find_channel(key);
                if (!channel) {
-                       ERR("UST consumer push metadata %" PRIu64 " not found", key);
-                       ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+                       /*
+                        * This is possible if the metadata creation on the consumer side
+                        * is in flight vis-a-vis a concurrent push metadata from the
+                        * session daemon.  Simply return that the channel failed and the
+                        * session daemon will handle that message correctly considering
+                        * that this race is acceptable thus the DBG() statement here.
+                        */
+                       DBG("UST consumer push metadata %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
                        goto end_msg_sessiond;
                }
 
                health_code_update();
 
                /* Tell session daemon we are ready to receive the metadata. */
-               ret = consumer_send_status_msg(sock, LTTNG_OK);
+               ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
@@ -1704,7 +1739,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
  *
  * Return 0 on success or else a negative value.
  */
-static int get_index_values(struct lttng_packet_index *index,
+static int get_index_values(struct ctf_packet_index *index,
                struct ustctl_consumer_stream *ustream)
 {
        int ret;
@@ -1869,7 +1904,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        long ret = 0;
        char dummy;
        struct ustctl_consumer_stream *ustream;
-       struct lttng_packet_index index;
+       struct ctf_packet_index index;
 
        assert(stream);
        assert(stream->ustream);
@@ -2164,7 +2199,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        uint64_t len, key, offset;
        int ret;
 
@@ -2253,7 +2288,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        /* Tell session daemon we are ready to receive the metadata. */
        ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
-                       LTTNG_OK);
+                       LTTCOMM_CONSUMERD_SUCCESS);
        if (ret < 0 || len == 0) {
                /*
                 * Somehow, the session daemon is not responding anymore or there is
This page took 0.025264 seconds and 4 git commands to generate.