Cleanup: Remove unused "end" label in push_metadata()
[lttng-tools.git] / src / bin / lttng-sessiond / ust-app.c
index dcb6e7c00ff5129cb9ec8930a46fac39d65bcc9b..b33378f00a36e1982c20fabe6f7db689d2486378 100644 (file)
@@ -426,8 +426,9 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
 /*
  * Push metadata to consumer socket.
  *
- * The socket lock MUST be acquired.
- * The ust app session lock MUST be acquired.
+ * RCU read-side lock must be held to guarantee existance of socket.
+ * Must be called with the ust app session lock held.
+ * Must be called with the registry lock held.
  *
  * On success, return the len of metadata pushed or else a negative value.
  */
@@ -443,20 +444,24 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
        assert(socket);
 
        /*
-        * On a push metadata error either the consumer is dead or the metadata
-        * channel has been destroyed because its endpoint might have died (e.g:
-        * relayd). If so, the metadata closed flag is set to 1 so we deny pushing
-        * metadata again which is not valid anymore on the consumer side.
-        *
-        * The ust app session mutex locked allows us to make this check without
-        * the registry lock.
+        * Means that no metadata was assigned to the session. This can
+        * happens if no start has been done previously.
+        */
+       if (!registry->metadata_key) {
+               return 0;
+       }
+
+       /*
+        * On a push metadata error either the consumer is dead or the
+        * metadata channel has been destroyed because its endpoint
+        * might have died (e.g: relayd). If so, the metadata closed
+        * flag is set to 1 so we deny pushing metadata again which is
+        * not valid anymore on the consumer side.
         */
        if (registry->metadata_closed) {
                return -EPIPE;
        }
 
-       pthread_mutex_lock(&registry->lock);
-
        offset = registry->metadata_len_sent;
        len = registry->metadata_len - registry->metadata_len_sent;
        if (len == 0) {
@@ -482,29 +487,32 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
        registry->metadata_len_sent += len;
 
 push_data:
-       pthread_mutex_unlock(&registry->lock);
        ret = consumer_push_metadata(socket, registry->metadata_key,
                        metadata_str, len, offset);
        if (ret < 0) {
                /*
-                * There is an acceptable race here between the registry metadata key
-                * assignment and the creation on the consumer. The session daemon can
-                * concurrently push metadata for this registry while being created on
-                * the consumer since the metadata key of the registry is assigned
-                * *before* it is setup to avoid the consumer to ask for metadata that
-                * could possibly be not found in the session daemon.
+                * There is an acceptable race here between the registry
+                * metadata key assignment and the creation on the
+                * consumer. The session daemon can concurrently push
+                * metadata for this registry while being created on the
+                * consumer since the metadata key of the registry is
+                * assigned *before* it is setup to avoid the consumer
+                * to ask for metadata that could possibly be not found
+                * in the session daemon.
                 *
-                * The metadata will get pushed either by the session being stopped or
-                * the consumer requesting metadata if that race is triggered.
+                * The metadata will get pushed either by the session
+                * being stopped or the consumer requesting metadata if
+                * that race is triggered.
                 */
                if (ret == -LTTCOMM_CONSUMERD_CHANNEL_FAIL) {
                        ret = 0;
                }
 
-               /* Update back the actual metadata len sent since it failed here. */
-               pthread_mutex_lock(&registry->lock);
+               /*
+                * Update back the actual metadata len sent since it
+                * failed here.
+                */
                registry->metadata_len_sent -= len;
-               pthread_mutex_unlock(&registry->lock);
                ret_val = ret;
                goto error_push;
        }
@@ -514,18 +522,29 @@ push_data:
 
 end:
 error:
-       pthread_mutex_unlock(&registry->lock);
+       if (ret_val) {
+               /*
+                * On error, flag the registry that the metadata is
+                * closed. We were unable to push anything and this
+                * means that either the consumer is not responding or
+                * the metadata cache has been destroyed on the
+                * consumer.
+                */
+               registry->metadata_closed = 1;
+       }
 error_push:
        free(metadata_str);
        return ret_val;
 }
 
 /*
- * For a given application and session, push metadata to consumer. The session
- * lock MUST be acquired here before calling this.
+ * For a given application and session, push metadata to consumer.
  * Either sock or consumer is required : if sock is NULL, the default
  * socket to send the metadata is retrieved from consumer, if sock
  * is not NULL we use it to send the metadata.
+ * RCU read-side lock must be held while calling this function,
+ * therefore ensuring existance of registry. It also ensures existance
+ * of socket throughout this function.
  *
  * Return 0 on success else a negative error.
  */
@@ -539,15 +558,10 @@ static int push_metadata(struct ust_registry_session *registry,
        assert(registry);
        assert(consumer);
 
-       rcu_read_lock();
-
-       /*
-        * Means that no metadata was assigned to the session. This can happens if
-        * no start has been done previously.
-        */
-       if (!registry->metadata_key) {
-               ret_val = 0;
-               goto end_rcu_unlock;
+       pthread_mutex_lock(&registry->lock);
+       if (registry->metadata_closed) {
+               ret_val = -EPIPE;
+               goto error;
        }
 
        /* Get consumer socket to use to push the metadata.*/
@@ -555,46 +569,26 @@ static int push_metadata(struct ust_registry_session *registry,
                        consumer);
        if (!socket) {
                ret_val = -1;
-               goto error_rcu_unlock;
+               goto error;
        }
 
-       /*
-        * TODO: Currently, we hold the socket lock around sampling of the next
-        * metadata segment to ensure we send metadata over the consumer socket in
-        * the correct order. This makes the registry lock nest inside the socket
-        * lock.
-        *
-        * Please note that this is a temporary measure: we should move this lock
-        * back into ust_consumer_push_metadata() when the consumer gets the
-        * ability to reorder the metadata it receives.
-        */
-       pthread_mutex_lock(socket->lock);
        ret = ust_app_push_metadata(registry, socket, 0);
-       pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
                ret_val = ret;
-               goto error_rcu_unlock;
+               goto error;
        }
-
-       rcu_read_unlock();
+       pthread_mutex_unlock(&registry->lock);
        return 0;
 
-error_rcu_unlock:
-       /*
-        * On error, flag the registry that the metadata is closed. We were unable
-        * to push anything and this means that either the consumer is not
-        * responding or the metadata cache has been destroyed on the consumer.
-        */
-       registry->metadata_closed = 1;
-end_rcu_unlock:
-       rcu_read_unlock();
+error:
+       pthread_mutex_unlock(&registry->lock);
        return ret_val;
 }
 
 /*
  * Send to the consumer a close metadata command for the given session. Once
  * done, the metadata channel is deleted and the session metadata pointer is
- * nullified. The session lock MUST be acquired here unless the application is
+ * nullified. The session lock MUST be held unless the application is
  * in the destroy path.
  *
  * Return 0 on success else a negative value.
@@ -610,6 +604,8 @@ static int close_metadata(struct ust_registry_session *registry,
 
        rcu_read_lock();
 
+       pthread_mutex_lock(&registry->lock);
+
        if (!registry->metadata_key || registry->metadata_closed) {
                ret = 0;
                goto end;
@@ -636,6 +632,7 @@ error:
         */
        registry->metadata_closed = 1;
 end:
+       pthread_mutex_unlock(&registry->lock);
        rcu_read_unlock();
        return ret;
 }
@@ -674,7 +671,7 @@ void delete_ust_app_session(int sock, struct ust_app_session *ua_sess,
        pthread_mutex_lock(&ua_sess->lock);
 
        registry = get_session_registry(ua_sess);
-       if (registry && !registry->metadata_closed) {
+       if (registry) {
                /* Push metadata for application before freeing the application. */
                (void) push_metadata(registry, ua_sess->consumer);
 
@@ -684,8 +681,7 @@ void delete_ust_app_session(int sock, struct ust_app_session *ua_sess,
                 * previous push metadata could have flag the metadata registry to
                 * close so don't send a close command if closed.
                 */
-               if (ua_sess->buffer_type != LTTNG_BUFFER_PER_UID &&
-                               !registry->metadata_closed) {
+               if (ua_sess->buffer_type != LTTNG_BUFFER_PER_UID) {
                        /* And ask to close it for this session registry. */
                        (void) close_metadata(registry, ua_sess->consumer);
                }
@@ -2388,6 +2384,7 @@ static int create_buffer_reg_channel(struct buffer_reg_session *reg_sess,
        assert(reg_chan);
        reg_chan->consumer_key = ua_chan->key;
        reg_chan->subbuf_size = ua_chan->attr.subbuf_size;
+       reg_chan->num_subbuf = ua_chan->attr.num_subbuf;
 
        /* Create and add a channel registry to session. */
        ret = ust_registry_channel_add(reg_sess->reg.ust,
@@ -2841,6 +2838,8 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
        registry = get_session_registry(ua_sess);
        assert(registry);
 
+       pthread_mutex_lock(&registry->lock);
+
        /* Metadata already exists for this registry or it was closed previously */
        if (registry->metadata_key || registry->metadata_closed) {
                ret = 0;
@@ -2913,6 +2912,7 @@ error_consumer:
        lttng_fd_put(LTTNG_FD_APPS, 1);
        delete_ust_app_channel(-1, metadata, app);
 error:
+       pthread_mutex_unlock(&registry->lock);
        return ret;
 }
 
@@ -3098,9 +3098,9 @@ void ust_app_unregister(int sock)
        DBG("PID %d unregistering with sock %d", lta->pid, sock);
 
        /*
-        * Perform "push metadata" and flush all application streams
-        * before removing app from hash tables, ensuring proper
-        * behavior of data_pending check.
+        * For per-PID buffers, perform "push metadata" and flush all
+        * application streams before removing app from hash tables,
+        * ensuring proper behavior of data_pending check.
         * Remove sessions so they are not visible during deletion.
         */
        cds_lfht_for_each_entry(lta->sessions->ht, &iter.iter, ua_sess,
@@ -3113,7 +3113,9 @@ void ust_app_unregister(int sock)
                        continue;
                }
 
-               (void) ust_app_flush_app_session(lta, ua_sess);
+               if (ua_sess->buffer_type == LTTNG_BUFFER_PER_PID) {
+                       (void) ust_app_flush_app_session(lta, ua_sess);
+               }
 
                /*
                 * Add session to list for teardown. This is safe since at this point we
@@ -3133,7 +3135,7 @@ void ust_app_unregister(int sock)
                 * session so the delete session will NOT push/close a second time.
                 */
                registry = get_session_registry(ua_sess);
-               if (registry && !registry->metadata_closed) {
+               if (registry) {
                        /* Push metadata for application before freeing the application. */
                        (void) push_metadata(registry, ua_sess->consumer);
 
@@ -3143,8 +3145,7 @@ void ust_app_unregister(int sock)
                         * previous push metadata could have flag the metadata registry to
                         * close so don't send a close command if closed.
                         */
-                       if (ua_sess->buffer_type != LTTNG_BUFFER_PER_UID &&
-                                       !registry->metadata_closed) {
+                       if (ua_sess->buffer_type != LTTNG_BUFFER_PER_UID) {
                                /* And ask to close it for this session registry. */
                                (void) close_metadata(registry, ua_sess->consumer);
                        }
@@ -4039,10 +4040,8 @@ int ust_app_stop_trace(struct ltt_ust_session *usess, struct ust_app *app)
        registry = get_session_registry(ua_sess);
        assert(registry);
 
-       if (!registry->metadata_closed) {
-               /* Push metadata for application before freeing the application. */
-               (void) push_metadata(registry, ua_sess->consumer);
-       }
+       /* Push metadata for application before freeing the application. */
+       (void) push_metadata(registry, ua_sess->consumer);
 
 end_unlock:
        pthread_mutex_unlock(&ua_sess->lock);
@@ -4082,21 +4081,32 @@ int ust_app_flush_app_session(struct ust_app *app,
        /* Flushing buffers */
        socket = consumer_find_socket_by_bitness(app->bits_per_long,
                        ua_sess->consumer);
-       cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
-                       node.node) {
-               health_code_update();
-               assert(ua_chan->is_sent);
-               ret = consumer_flush_channel(socket, ua_chan->key);
-               if (ret) {
-                       ERR("Error flushing consumer channel");
-                       retval = -1;
-                       continue;
+
+       /* Flush buffers and push metadata. */
+       switch (ua_sess->buffer_type) {
+       case LTTNG_BUFFER_PER_PID:
+               cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
+                               node.node) {
+                       health_code_update();
+                       assert(ua_chan->is_sent);
+                       ret = consumer_flush_channel(socket, ua_chan->key);
+                       if (ret) {
+                               ERR("Error flushing consumer channel");
+                               retval = -1;
+                               continue;
+                       }
                }
+               break;
+       case LTTNG_BUFFER_PER_UID:
+       default:
+               assert(0);
+               break;
        }
 
        health_code_update();
 
        pthread_mutex_unlock(&ua_sess->lock);
+
 end_not_compatible:
        rcu_read_unlock();
        health_code_update();
@@ -4104,25 +4114,76 @@ end_not_compatible:
 }
 
 /*
- * Flush buffers for a specific UST session and app.
+ * Flush buffers for all applications for a specific UST session.
+ * Called with UST session lock held.
  */
 static
-int ust_app_flush_session(struct ust_app *app, struct ltt_ust_session *usess)
+int ust_app_flush_session(struct ltt_ust_session *usess)
 
 {
-       int ret;
-       struct ust_app_session *ua_sess;
+       int ret = 0;
 
-       DBG("Flushing session buffers for ust app pid %d", app->pid);
+       DBG("Flushing session buffers for all ust apps");
 
        rcu_read_lock();
 
-       ua_sess = lookup_session_by_app(usess, app);
-       if (ua_sess == NULL) {
+       /* Flush buffers and push metadata. */
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct buffer_reg_uid *reg;
+               struct lttng_ht_iter iter;
+
+               /* Flush all per UID buffers associated to that session. */
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct ust_registry_session *ust_session_reg;
+                       struct buffer_reg_channel *reg_chan;
+                       struct consumer_socket *socket;
+
+                       /* Get consumer socket to use to push the metadata.*/
+                       socket = consumer_find_socket_by_bitness(reg->bits_per_long,
+                                       usess->consumer);
+                       if (!socket) {
+                               /* Ignore request if no consumer is found for the session. */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+                                       reg_chan, node.node) {
+                               /*
+                                * The following call will print error values so the return
+                                * code is of little importance because whatever happens, we
+                                * have to try them all.
+                                */
+                               (void) consumer_flush_channel(socket, reg_chan->consumer_key);
+                       }
+
+                       ust_session_reg = reg->registry->reg.ust;
+                       /* Push metadata. */
+                       (void) push_metadata(ust_session_reg, usess->consumer);
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct ust_app_session *ua_sess;
+               struct lttng_ht_iter iter;
+               struct ust_app *app;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (ua_sess == NULL) {
+                               continue;
+                       }
+                       (void) ust_app_flush_app_session(app, ua_sess);
+               }
+               break;
+       }
+       default:
                ret = -1;
-               goto end_no_session;
+               assert(0);
+               break;
        }
-       ret = ust_app_flush_app_session(app, ua_sess);
 
 end_no_session:
        rcu_read_unlock();
@@ -4201,6 +4262,7 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
 
 /*
  * Start tracing for the UST session.
+ * Called with UST session lock held.
  */
 int ust_app_stop_trace_all(struct ltt_ust_session *usess)
 {
@@ -4220,58 +4282,7 @@ int ust_app_stop_trace_all(struct ltt_ust_session *usess)
                }
        }
 
-       /* Flush buffers and push metadata (for UID buffers). */
-       switch (usess->buffer_type) {
-       case LTTNG_BUFFER_PER_UID:
-       {
-               struct buffer_reg_uid *reg;
-
-               /* Flush all per UID buffers associated to that session. */
-               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
-                       struct ust_registry_session *ust_session_reg;
-                       struct buffer_reg_channel *reg_chan;
-                       struct consumer_socket *socket;
-
-                       /* Get consumer socket to use to push the metadata.*/
-                       socket = consumer_find_socket_by_bitness(reg->bits_per_long,
-                                       usess->consumer);
-                       if (!socket) {
-                               /* Ignore request if no consumer is found for the session. */
-                               continue;
-                       }
-
-                       cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
-                                       reg_chan, node.node) {
-                               /*
-                                * The following call will print error values so the return
-                                * code is of little importance because whatever happens, we
-                                * have to try them all.
-                                */
-                               (void) consumer_flush_channel(socket, reg_chan->consumer_key);
-                       }
-
-                       ust_session_reg = reg->registry->reg.ust;
-                       if (!ust_session_reg->metadata_closed) {
-                               /* Push metadata. */
-                               (void) push_metadata(ust_session_reg, usess->consumer);
-                       }
-               }
-
-               break;
-       }
-       case LTTNG_BUFFER_PER_PID:
-               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
-                       ret = ust_app_flush_session(app, usess);
-                       if (ret < 0) {
-                               /* Continue to next apps even on error */
-                               continue;
-                       }
-               }
-               break;
-       default:
-               assert(0);
-               break;
-       }
+       (void) ust_app_flush_session(usess);
 
        rcu_read_unlock();
 
@@ -5062,7 +5073,8 @@ void ust_app_destroy(struct ust_app *app)
  * Return 0 on success or else a negative value.
  */
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait, uint64_t max_stream_size)
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream)
 {
        int ret = 0;
        unsigned int snapshot_done = 0;
@@ -5106,14 +5118,14 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                        reg_chan, node.node) {
                                ret = consumer_snapshot_channel(socket, reg_chan->consumer_key,
                                                output, 0, usess->uid, usess->gid, pathname, wait,
-                                               max_stream_size);
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
                        }
                        ret = consumer_snapshot_channel(socket,
                                        reg->registry->reg.ust->metadata_key, output, 1,
-                                       usess->uid, usess->gid, pathname, wait, max_stream_size);
+                                       usess->uid, usess->gid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5157,7 +5169,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                        ua_chan, node.node) {
                                ret = consumer_snapshot_channel(socket, ua_chan->key, output,
                                                0, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                               max_stream_size);
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -5166,8 +5178,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        registry = get_session_registry(ua_sess);
                        assert(registry);
                        ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
-                                       1, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                       max_stream_size);
+                                       1, ua_sess->euid, ua_sess->egid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5195,11 +5206,12 @@ error:
 }
 
 /*
- * Return the number of streams for a UST session.
+ * Return the size taken by one more packet per stream.
  */
-unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
+uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *usess,
+               uint64_t cur_nr_packets)
 {
-       unsigned int ret = 0;
+       uint64_t tot_size = 0;
        struct ust_app *app;
        struct lttng_ht_iter iter;
 
@@ -5216,7 +5228,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                        rcu_read_lock();
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-                               ret += reg_chan->stream_count;
+                               if (cur_nr_packets >= reg_chan->num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += reg_chan->subbuf_size * reg_chan->stream_count;
                        }
                        rcu_read_unlock();
                }
@@ -5238,7 +5257,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
 
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               ret += ua_chan->streams.count;
+                               if (cur_nr_packets >= ua_chan->attr.num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += ua_chan->attr.subbuf_size * ua_chan->streams.count;
                        }
                }
                rcu_read_unlock();
@@ -5249,5 +5275,5 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                break;
        }
 
-       return ret;
+       return tot_size;
 }
This page took 0.03241 seconds and 4 git commands to generate.