Implement snapshot commands in lttng-sessiond
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index e0280f1489e2e8defa25e1311023f4f518e4d666..8ade3b35f272bfd6a0d77eb42094f8e521cbb026 100644 (file)
@@ -394,7 +394,7 @@ static int send_sessiond_channel(int sock,
                struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx, int *relayd_error)
 {
-       int ret;
+       int ret, ret_code = LTTNG_OK;
        struct lttng_consumer_stream *stream;
 
        assert(channel);
@@ -403,18 +403,6 @@ static int send_sessiond_channel(int sock,
 
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
-       /* Send channel to sessiond. */
-       ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
-       if (ret < 0) {
-               goto error;
-       }
-
-       ret = ustctl_channel_close_wakeup_fd(channel->uchan);
-       if (ret < 0) {
-               goto error;
-       }
-
-       /* The channel was sent successfully to the sessiond at this point. */
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Try to send the stream to the relayd if one is available. */
                ret = send_stream_to_relayd(stream);
@@ -426,9 +414,33 @@ static int send_sessiond_channel(int sock,
                        if (relayd_error) {
                                *relayd_error = 1;
                        }
-                       goto error;
+                       ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                }
+       }
+
+       /* Inform sessiond that we are about to send channel and streams. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0 || ret_code != LTTNG_OK) {
+               /*
+                * Either the session daemon is not responding or the relayd died so we
+                * stop now.
+                */
+               goto error;
+       }
 
+       /* Send channel to sessiond. */
+       ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = ustctl_channel_close_wakeup_fd(channel->uchan);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* The channel was sent successfully to the sessiond at this point. */
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Send stream to session daemon. */
                ret = send_sessiond_stream(sock, stream);
                if (ret < 0) {
@@ -447,6 +459,9 @@ static int send_sessiond_channel(int sock,
        return 0;
 
 error:
+       if (ret_code != LTTNG_OK) {
+               ret = -1;
+       }
        return ret;
 }
 
@@ -491,10 +506,6 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
 
        channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
 
-       if (ret < 0) {
-               goto error;
-       }
-
        /* Open all streams for this channel. */
        ret = create_ust_streams(channel, ctx);
        if (ret < 0) {
@@ -558,6 +569,11 @@ int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata,
 
        DBG("UST consumer writing metadata to channel %s", metadata->name);
 
+       if (!metadata->metadata_stream) {
+               ret = 0;
+               goto error;
+       }
+
        assert(target_offset <= metadata->metadata_cache->max_offset);
        ret = ustctl_write_metadata_to_channel(metadata->uchan,
                        metadata_str + target_offset, len);
@@ -623,17 +639,29 @@ static int close_metadata(uint64_t chan_key)
 
        channel = consumer_find_channel(chan_key);
        if (!channel) {
-               ERR("UST consumer close metadata %" PRIu64 " not found", chan_key);
+               /*
+                * This is possible if the metadata thread has issue a delete because
+                * the endpoint point of the stream hung up. There is no way the
+                * session daemon can know about it thus use a DBG instead of an actual
+                * error.
+                */
+               DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
                ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
                goto error;
        }
 
        pthread_mutex_lock(&consumer_data.lock);
-       if (!cds_lfht_is_node_deleted(&channel->node.node)) {
-               if (channel->switch_timer_enabled == 1) {
-                       DBG("Deleting timer on metadata channel");
-                       consumer_timer_switch_stop(channel);
-               }
+
+       if (cds_lfht_is_node_deleted(&channel->node.node)) {
+               goto error_unlock;
+       }
+
+       if (channel->switch_timer_enabled == 1) {
+               DBG("Deleting timer on metadata channel");
+               consumer_timer_switch_stop(channel);
+       }
+
+       if (channel->metadata_stream) {
                ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
                if (ret < 0) {
                        ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
@@ -664,7 +692,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        if (!metadata) {
                ERR("UST consumer push metadata %" PRIu64 " not found", key);
                ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-               goto error;
+               goto error_find;
        }
 
        /*
@@ -696,9 +724,17 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        /* List MUST be empty after or else it could be reused. */
        assert(cds_list_empty(&metadata->streams.head));
 
-       ret = 0;
+       return 0;
 
 error:
+       /*
+        * Delete metadata channel on error. At this point, the metadata stream can
+        * NOT be monitored by the metadata thread thus having the guarantee that
+        * the stream is still in the local stream list of the channel. This call
+        * will make sure to clean that list.
+        */
+       consumer_del_channel(metadata);
+error_find:
        return ret;
 }
 
@@ -728,13 +764,33 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                goto end_free;
        }
 
+       /*
+        * XXX: The consumer data lock is acquired before calling metadata cache
+        * write which calls push metadata that MUST be protected by the consumer
+        * lock in order to be able to check the validity of the metadata stream of
+        * the channel.
+        *
+        * Note that this will be subject to change to better fine grained locking
+        * and ultimately try to get rid of this global consumer data lock.
+        */
+       pthread_mutex_lock(&consumer_data.lock);
+
        pthread_mutex_lock(&channel->metadata_cache->lock);
        ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
        if (ret < 0) {
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+               /*
+                * Skip metadata flush on write error since the offset and len might
+                * not have been updated which could create an infinite loop below when
+                * waiting for the metadata cache to be flushed.
+                */
+               pthread_mutex_unlock(&channel->metadata_cache->lock);
+               pthread_mutex_unlock(&consumer_data.lock);
+               goto end_free;
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
+       pthread_mutex_unlock(&consumer_data.lock);
 
        while (consumer_metadata_cache_flushed(channel, offset + len)) {
                DBG("Waiting for metadata to be flushed");
@@ -769,6 +825,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 * The ret value might 0 meaning an orderly shutdown but this is ok
                 * since the caller handles this.
                 */
+               if (ret > 0) {
+                       ret = -1;
+               }
                return ret;
        }
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
@@ -844,6 +903,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                sizeof(is_data_pending));
                if (ret < 0) {
                        DBG("Error when sending the data pending ret code: %d", ret);
+                       goto error_fatal;
                }
 
                /*
@@ -950,10 +1010,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret = consumer_send_status_channel(sock, channel);
                if (ret < 0) {
                        /*
-                        * There is probably a problem on the socket so the poll will get
-                        * it and clean everything up.
+                        * There is probably a problem on the socket.
                         */
-                       goto end_nosignal;
+                       goto error_fatal;
                }
 
                break;
@@ -971,13 +1030,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_msg_sessiond;
                }
 
-               /* Inform sessiond that we are about to send channel and streams. */
-               ret = consumer_send_status_msg(sock, LTTNG_OK);
-               if (ret < 0) {
-                       /* Somehow, the session daemon is not responding anymore. */
-                       goto end_nosignal;
-               }
-
                /* Send everything to sessiond. */
                ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
                if (ret < 0) {
@@ -985,10 +1037,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                /*
                                 * We were unable to send to the relayd the stream so avoid
                                 * sending back a fatal error to the thread since this is OK
-                                * and the consumer can continue its work.
+                                * and the consumer can continue its work. The above call
+                                * has sent the error status message to the sessiond.
                                 */
-                               ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
-                               goto end_msg_sessiond;
+                               goto end_nosignal;
                        }
                        /*
                         * The communicaton was broken hence there is a bad state between
@@ -1071,14 +1123,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Wait for more data. */
                if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
-                       goto end_nosignal;
+                       goto error_fatal;
                }
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
                                len, channel);
                if (ret < 0) {
                        /* error receiving from sessiond */
-                       goto end_nosignal;
+                       goto error_fatal;
                } else {
                        ret_code = ret;
                        goto end_msg_sessiond;
@@ -1094,6 +1146,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
+       {
+               ret = consumer_send_status_msg(sock, ret_code);
+               if (ret < 0) {
+                       /* Somehow, the session daemon is not responding anymore. */
+                       goto end_nosignal;
+               }
+               break;
+       }
        default:
                break;
        }
@@ -1113,7 +1174,10 @@ end_msg_sessiond:
         * the caller because the session daemon socket management is done
         * elsewhere. Returning a negative code or 0 will shutdown the consumer.
         */
-       (void) consumer_send_status_msg(sock, ret_code);
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               goto error_fatal;
+       }
        rcu_read_unlock();
        return 1;
 end_channel_error:
@@ -1505,7 +1569,13 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
                        key, offset, len, channel);
-       (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+       if (ret_code >= 0) {
+               /*
+                * Only send the status msg if the sessiond is alive meaning a positive
+                * ret code.
+                */
+               (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
+       }
        ret = 0;
 
 end:
This page took 0.028101 seconds and 4 git commands to generate.