Fix: deadlock between UST registry lock and consumer lock
[lttng-tools.git] / src / bin / lttng-sessiond / ust-app.c
index 0c4045c3b26aebf4d675c4b8916b7c5fc84701ee..2366fd320f46046f1db7b45d8013c23f9460794d 100644 (file)
@@ -426,8 +426,9 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
 /*
  * Push metadata to consumer socket.
  *
- * The socket lock MUST be acquired.
- * The ust app session lock MUST be acquired.
+ * RCU read-side lock must be held to guarantee existance of socket.
+ * Must be called with the ust app session lock held.
+ * Must be called with the registry lock held.
  *
  * On success, return the len of metadata pushed or else a negative value.
  */
@@ -442,25 +443,22 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
        assert(registry);
        assert(socket);
 
-       pthread_mutex_lock(&registry->lock);
-
        /*
-        * Means that no metadata was assigned to the session. This can happens if
-        * no start has been done previously.
+        * Means that no metadata was assigned to the session. This can
+        * happens if no start has been done previously.
         */
        if (!registry->metadata_key) {
-               pthread_mutex_unlock(&registry->lock);
                return 0;
        }
 
        /*
-        * On a push metadata error either the consumer is dead or the metadata
-        * channel has been destroyed because its endpoint might have died (e.g:
-        * relayd). If so, the metadata closed flag is set to 1 so we deny pushing
-        * metadata again which is not valid anymore on the consumer side.
+        * On a push metadata error either the consumer is dead or the
+        * metadata channel has been destroyed because its endpoint
+        * might have died (e.g: relayd). If so, the metadata closed
+        * flag is set to 1 so we deny pushing metadata again which is
+        * not valid anymore on the consumer side.
         */
        if (registry->metadata_closed) {
-               pthread_mutex_unlock(&registry->lock);
                return -EPIPE;
        }
 
@@ -489,29 +487,32 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
        registry->metadata_len_sent += len;
 
 push_data:
-       pthread_mutex_unlock(&registry->lock);
        ret = consumer_push_metadata(socket, registry->metadata_key,
                        metadata_str, len, offset);
        if (ret < 0) {
                /*
-                * There is an acceptable race here between the registry metadata key
-                * assignment and the creation on the consumer. The session daemon can
-                * concurrently push metadata for this registry while being created on
-                * the consumer since the metadata key of the registry is assigned
-                * *before* it is setup to avoid the consumer to ask for metadata that
-                * could possibly be not found in the session daemon.
+                * There is an acceptable race here between the registry
+                * metadata key assignment and the creation on the
+                * consumer. The session daemon can concurrently push
+                * metadata for this registry while being created on the
+                * consumer since the metadata key of the registry is
+                * assigned *before* it is setup to avoid the consumer
+                * to ask for metadata that could possibly be not found
+                * in the session daemon.
                 *
-                * The metadata will get pushed either by the session being stopped or
-                * the consumer requesting metadata if that race is triggered.
+                * The metadata will get pushed either by the session
+                * being stopped or the consumer requesting metadata if
+                * that race is triggered.
                 */
                if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) {
                        ret = 0;
                }
 
-               /* Update back the actual metadata len sent since it failed here. */
-               pthread_mutex_lock(&registry->lock);
+               /*
+                * Update back the actual metadata len sent since it
+                * failed here.
+                */
                registry->metadata_len_sent -= len;
-               pthread_mutex_unlock(&registry->lock);
                ret_val = ret;
                goto error_push;
        }
@@ -523,13 +524,14 @@ end:
 error:
        if (ret_val) {
                /*
-                * On error, flag the registry that the metadata is closed. We were unable
-                * to push anything and this means that either the consumer is not
-                * responding or the metadata cache has been destroyed on the consumer.
+                * On error, flag the registry that the metadata is
+                * closed. We were unable to push anything and this
+                * means that either the consumer is not responding or
+                * the metadata cache has been destroyed on the
+                * consumer.
                 */
                registry->metadata_closed = 1;
        }
-       pthread_mutex_unlock(&registry->lock);
 error_push:
        free(metadata_str);
        return ret_val;
@@ -541,7 +543,8 @@ error_push:
  * socket to send the metadata is retrieved from consumer, if sock
  * is not NULL we use it to send the metadata.
  * RCU read-side lock must be held while calling this function,
- * therefore ensuring existance of registry.
+ * therefore ensuring existance of registry. It also ensures existance
+ * of socket throughout this function.
  *
  * Return 0 on success else a negative error.
  */
@@ -556,50 +559,37 @@ static int push_metadata(struct ust_registry_session *registry,
        assert(consumer);
 
        pthread_mutex_lock(&registry->lock);
-
        if (registry->metadata_closed) {
-               pthread_mutex_unlock(&registry->lock);
-               return -EPIPE;
+               ret_val = -EPIPE;
+               goto error;
        }
 
        /* Get consumer socket to use to push the metadata.*/
        socket = consumer_find_socket_by_bitness(registry->bits_per_long,
                        consumer);
-       pthread_mutex_unlock(&registry->lock);
        if (!socket) {
                ret_val = -1;
                goto error;
        }
 
-       /*
-        * TODO: Currently, we hold the socket lock around sampling of the next
-        * metadata segment to ensure we send metadata over the consumer socket in
-        * the correct order. This makes the registry lock nest inside the socket
-        * lock.
-        *
-        * Please note that this is a temporary measure: we should move this lock
-        * back into ust_consumer_push_metadata() when the consumer gets the
-        * ability to reorder the metadata it receives.
-        */
-       pthread_mutex_lock(socket->lock);
        ret = ust_app_push_metadata(registry, socket, 0);
-       pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
                ret_val = ret;
                goto error;
        }
-
+       pthread_mutex_unlock(&registry->lock);
        return 0;
 
 error:
 end:
+       pthread_mutex_unlock(&registry->lock);
        return ret_val;
 }
 
 /*
  * Send to the consumer a close metadata command for the given session. Once
  * done, the metadata channel is deleted and the session metadata pointer is
- * nullified. The session lock MUST be acquired here unless the application is
+ * nullified. The session lock MUST be held unless the application is
  * in the destroy path.
  *
  * Return 0 on success else a negative value.
@@ -2395,6 +2385,7 @@ static int create_buffer_reg_channel(struct buffer_reg_session *reg_sess,
        assert(reg_chan);
        reg_chan->consumer_key = ua_chan->key;
        reg_chan->subbuf_size = ua_chan->attr.subbuf_size;
+       reg_chan->num_subbuf = ua_chan->attr.num_subbuf;
 
        /* Create and add a channel registry to session. */
        ret = ust_registry_channel_add(reg_sess->reg.ust,
@@ -4131,7 +4122,7 @@ static
 int ust_app_flush_session(struct ltt_ust_session *usess)
 
 {
-       int ret;
+       int ret = 0;
 
        DBG("Flushing session buffers for all ust apps");
 
@@ -4172,7 +4163,6 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
                        /* Push metadata. */
                        (void) push_metadata(ust_session_reg, usess->consumer);
                }
-               ret = 0;
                break;
        }
        case LTTNG_BUFFER_PER_PID:
@@ -4191,6 +4181,7 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
                break;
        }
        default:
+               ret = -1;
                assert(0);
                break;
        }
@@ -5083,7 +5074,8 @@ void ust_app_destroy(struct ust_app *app)
  * Return 0 on success or else a negative value.
  */
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait, uint64_t max_stream_size)
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream)
 {
        int ret = 0;
        unsigned int snapshot_done = 0;
@@ -5127,14 +5119,14 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                        reg_chan, node.node) {
                                ret = consumer_snapshot_channel(socket, reg_chan->consumer_key,
                                                output, 0, usess->uid, usess->gid, pathname, wait,
-                                               max_stream_size);
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
                        }
                        ret = consumer_snapshot_channel(socket,
                                        reg->registry->reg.ust->metadata_key, output, 1,
-                                       usess->uid, usess->gid, pathname, wait, max_stream_size);
+                                       usess->uid, usess->gid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5178,7 +5170,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                        ua_chan, node.node) {
                                ret = consumer_snapshot_channel(socket, ua_chan->key, output,
                                                0, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                               max_stream_size);
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -5187,8 +5179,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        registry = get_session_registry(ua_sess);
                        assert(registry);
                        ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
-                                       1, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                       max_stream_size);
+                                       1, ua_sess->euid, ua_sess->egid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5216,11 +5207,12 @@ error:
 }
 
 /*
- * Return the number of streams for a UST session.
+ * Return the size taken by one more packet per stream.
  */
-unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
+uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *usess,
+               uint64_t cur_nr_packets)
 {
-       unsigned int ret = 0;
+       uint64_t tot_size = 0;
        struct ust_app *app;
        struct lttng_ht_iter iter;
 
@@ -5237,7 +5229,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                        rcu_read_lock();
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-                               ret += reg_chan->stream_count;
+                               if (cur_nr_packets >= reg_chan->num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += reg_chan->subbuf_size * reg_chan->stream_count;
                        }
                        rcu_read_unlock();
                }
@@ -5259,7 +5258,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
 
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               ret += ua_chan->streams.count;
+                               if (cur_nr_packets >= ua_chan->attr.num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += ua_chan->attr.subbuf_size * ua_chan->streams.count;
                        }
                }
                rcu_read_unlock();
@@ -5270,5 +5276,5 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                break;
        }
 
-       return ret;
+       return tot_size;
 }
This page took 0.02706 seconds and 4 git commands to generate.