Implement consumer ring buffer position sampling
[lttng-tools.git] / src / bin / lttng-sessiond / ust-app.c
index 285467fbbacac990864a0871d18f200a0eb7f67b..d292779d156dca47d2ce1d861ac35d216811d731 100644 (file)
@@ -41,6 +41,8 @@
 #include "ust-ctl.h"
 #include "utils.h"
 #include "session.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
 
 static
 int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
@@ -376,6 +378,8 @@ void delete_ust_app_channel_rcu(struct rcu_head *head)
  * Extract the lost packet or discarded events counter when the channel is
  * being deleted and store the value in the parent channel so we can
  * access it from lttng list and at stop/destroy.
+ *
+ * The session list lock must be held by the caller.
  */
 static
 void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan)
@@ -390,12 +394,22 @@ void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan)
 
        rcu_read_lock();
        session = session_find_by_id(ua_chan->session->tracing_id);
-       if (!session) {
-               ERR("Missing LTT session to get discarded events");
-               goto end;
-       }
-       if (!session->ust_session) {
-               ERR("Missing UST session to get discarded events");
+       if (!session || !session->ust_session) {
+               /*
+                * Not finding the session is not an error because there are
+                * multiple ways the channels can be torn down.
+                *
+                * 1) The session daemon can initiate the destruction of the
+                *    ust app session after receiving a destroy command or
+                *    during its shutdown/teardown.
+                * 2) The application, since we are in per-pid tracing, is
+                *    unregistering and tearing down its ust app session.
+                *
+                * Both paths are protected by the session list lock which
+                * ensures that the accounting of lost packets and discarded
+                * events is done exactly once. The session is then unpublished
+                * from the session list, resulting in this condition.
+                */
                goto end;
        }
 
@@ -426,6 +440,8 @@ end:
 /*
  * Delete ust app channel safely. RCU read lock must be held before calling
  * this function.
+ *
+ * The session list lock must be held by the caller.
  */
 static
 void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
@@ -468,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
                /* Wipe and free registry from session registry. */
                registry = get_session_registry(ua_chan->session);
                if (registry) {
-                       ust_registry_channel_del_free(registry, ua_chan->key);
+                       ust_registry_channel_del_free(registry, ua_chan->key,
+                               true);
                }
                save_per_pid_lost_discarded_counters(ua_chan);
        }
@@ -537,7 +554,7 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
        char *metadata_str = NULL;
        size_t len, offset, new_metadata_len_sent;
        ssize_t ret_val;
-       uint64_t metadata_key;
+       uint64_t metadata_key, metadata_version;
 
        assert(registry);
        assert(socket);
@@ -552,21 +569,10 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                return 0;
        }
 
-       /*
-        * On a push metadata error either the consumer is dead or the
-        * metadata channel has been destroyed because its endpoint
-        * might have died (e.g: relayd), or because the application has
-        * exited. If so, the metadata closed flag is set to 1 so we
-        * deny pushing metadata again which is not valid anymore on the
-        * consumer side.
-        */
-       if (registry->metadata_closed) {
-               return -EPIPE;
-       }
-
        offset = registry->metadata_len_sent;
        len = registry->metadata_len - registry->metadata_len_sent;
        new_metadata_len_sent = registry->metadata_len;
+       metadata_version = registry->metadata_version;
        if (len == 0) {
                DBG3("No metadata to push for metadata key %" PRIu64,
                                registry->metadata_key);
@@ -603,7 +609,7 @@ push_data:
         * different bidirectionnal communication sockets.
         */
        ret = consumer_push_metadata(socket, metadata_key,
-                       metadata_str, len, offset);
+                       metadata_str, len, offset, metadata_version);
        pthread_mutex_lock(&registry->lock);
        if (ret < 0) {
                /*
@@ -787,6 +793,8 @@ void delete_ust_app_session_rcu(struct rcu_head *head)
 /*
  * Delete ust app session safely. RCU read lock must be held before calling
  * this function.
+ *
+ * The session list lock must be held by the caller.
  */
 static
 void delete_ust_app_session(int sock, struct ust_app_session *ua_sess,
@@ -870,6 +878,11 @@ void delete_ust_app(struct ust_app *app)
        int ret, sock;
        struct ust_app_session *ua_sess, *tmp_ua_sess;
 
+       /*
+        * The session list lock must be held during this function to guarantee
+        * the existence of ua_sess.
+        */
+       session_lock_list();
        /* Delete ust app sessions info */
        sock = app->sock;
        app->sock = -1;
@@ -908,6 +921,7 @@ void delete_ust_app(struct ust_app *app)
 
        DBG2("UST app pid %d deleted", app->pid);
        free(app);
+       session_unlock_list();
 }
 
 /*
@@ -928,6 +942,8 @@ void delete_ust_app_rcu(struct rcu_head *head)
 /*
  * Delete the session from the application ht and delete the data structure by
  * freeing every object inside and releasing them.
+ *
+ * The session list lock must be held by the caller.
  */
 static void destroy_app_session(struct ust_app *app,
                struct ust_app_session *ua_sess)
@@ -1780,6 +1796,7 @@ static void shadow_copy_channel(struct ust_app_channel *ua_chan,
        ua_chan->attr.overwrite = uchan->attr.overwrite;
        ua_chan->attr.switch_timer_interval = uchan->attr.switch_timer_interval;
        ua_chan->attr.read_timer_interval = uchan->attr.read_timer_interval;
+       ua_chan->monitor_timer_interval = uchan->monitor_timer_interval;
        ua_chan->attr.output = uchan->attr.output;
        /*
         * Note that the attribute channel type is not set since the channel on the
@@ -2796,9 +2813,6 @@ static int send_channel_uid_to_ust(struct buffer_reg_channel *reg_chan,
                        (void) release_ust_app_stream(-1, &stream, app);
                        if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) {
                                ret = -ENOTCONN; /* Caused by app exiting. */
-                               goto error_stream_unlock;
-                       } else if (ret < 0) {
-                               goto error_stream_unlock;
                        }
                        goto error_stream_unlock;
                }
@@ -2829,6 +2843,7 @@ static int create_channel_per_uid(struct ust_app *app,
        int ret;
        struct buffer_reg_uid *reg_uid;
        struct buffer_reg_channel *reg_chan;
+       bool created = false;
 
        assert(app);
        assert(usess);
@@ -2872,7 +2887,7 @@ static int create_channel_per_uid(struct ust_app *app,
                         * it's not visible anymore in the session registry.
                         */
                        ust_registry_channel_del_free(reg_uid->registry->reg.ust,
-                                       ua_chan->tracing_channel_id);
+                                       ua_chan->tracing_channel_id, false);
                        buffer_reg_channel_remove(reg_uid->registry, reg_chan);
                        buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST);
                        goto error;
@@ -2888,7 +2903,7 @@ static int create_channel_per_uid(struct ust_app *app,
                                ua_chan->name);
                        goto error;
                }
-
+               created = true;
        }
 
        /* Send buffers to the application. */
@@ -2900,6 +2915,41 @@ static int create_channel_per_uid(struct ust_app *app,
                goto error;
        }
 
+       if (created) {
+               enum lttng_error_code cmd_ret;
+               struct ltt_session *session;
+               uint64_t chan_reg_key;
+               struct ust_registry_channel *chan_reg;
+
+               rcu_read_lock();
+               chan_reg_key = ua_chan->tracing_channel_id;
+
+               pthread_mutex_lock(&reg_uid->registry->reg.ust->lock);
+               chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust,
+                               chan_reg_key);
+               assert(chan_reg);
+               chan_reg->consumer_key = ua_chan->key;
+               chan_reg = NULL;
+               pthread_mutex_unlock(&reg_uid->registry->reg.ust->lock);
+
+               session = session_find_by_id(ua_sess->tracing_id);
+               assert(session);
+
+               cmd_ret = notification_thread_command_add_channel(
+                               notification_thread_handle, session->name,
+                               ua_sess->euid, ua_sess->egid,
+                               ua_chan->name,
+                               ua_chan->key,
+                               LTTNG_DOMAIN_UST,
+                               ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+               rcu_read_unlock();
+               if (cmd_ret != LTTNG_OK) {
+                       ret = - (int) cmd_ret;
+                       ERR("Failed to add channel to notification thread");
+                       goto error;
+               }
+       }
+
 error:
        return ret;
 }
@@ -2915,6 +2965,10 @@ static int create_channel_per_pid(struct ust_app *app,
 {
        int ret;
        struct ust_registry_session *registry;
+       enum lttng_error_code cmd_ret;
+       struct ltt_session *session;
+       uint64_t chan_reg_key;
+       struct ust_registry_channel *chan_reg;
 
        assert(app);
        assert(usess);
@@ -2953,6 +3007,29 @@ static int create_channel_per_pid(struct ust_app *app,
                goto error;
        }
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+
+       chan_reg_key = ua_chan->key;
+       pthread_mutex_lock(&registry->lock);
+       chan_reg = ust_registry_channel_find(registry, chan_reg_key);
+       assert(chan_reg);
+       chan_reg->consumer_key = ua_chan->key;
+       pthread_mutex_unlock(&registry->lock);
+
+       cmd_ret = notification_thread_command_add_channel(
+                       notification_thread_handle, session->name,
+                       ua_sess->euid, ua_sess->egid,
+                       ua_chan->name,
+                       ua_chan->key,
+                       LTTNG_DOMAIN_UST,
+                       ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+       if (cmd_ret != LTTNG_OK) {
+               ret = - (int) cmd_ret;
+               ERR("Failed to add channel to notification thread");
+               goto error;
+       }
+
 error:
        rcu_read_unlock();
        return ret;
@@ -3065,7 +3142,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess,
 
        /* Only add the channel if successful on the tracer side. */
        lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node);
-
 end:
        if (ua_chanp) {
                *ua_chanp = ua_chan;
@@ -4625,6 +4701,155 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
        return ret;
 }
 
+static
+int ust_app_clear_quiescent_app_session(struct ust_app *app,
+               struct ust_app_session *ua_sess)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app_channel *ua_chan;
+       struct consumer_socket *socket;
+
+       DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
+
+       rcu_read_lock();
+
+       if (!app->compatible) {
+               goto end_not_compatible;
+       }
+
+       pthread_mutex_lock(&ua_sess->lock);
+
+       if (ua_sess->deleted) {
+               goto end_unlock;
+       }
+
+       health_code_update();
+
+       socket = consumer_find_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ERR("Failed to find consumer (%" PRIu32 ") socket",
+                               app->bits_per_long);
+               ret = -1;
+               goto end_unlock;
+       }
+
+       /* Clear quiescent state. */
+       switch (ua_sess->buffer_type) {
+       case LTTNG_BUFFER_PER_PID:
+               cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter,
+                               ua_chan, node.node) {
+                       health_code_update();
+                       ret = consumer_clear_quiescent_channel(socket,
+                                       ua_chan->key);
+                       if (ret) {
+                               ERR("Error clearing quiescent state for consumer channel");
+                               ret = -1;
+                               continue;
+                       }
+               }
+               break;
+       case LTTNG_BUFFER_PER_UID:
+       default:
+               assert(0);
+               ret = -1;
+               break;
+       }
+
+       health_code_update();
+
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+
+end_not_compatible:
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Clear quiescent state in each stream for all applications for a
+ * specific UST session.
+ * Called with UST session lock held.
+ */
+static
+int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
+
+{
+       int ret = 0;
+
+       DBG("Clearing stream quiescent state for all ust apps");
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct lttng_ht_iter iter;
+               struct buffer_reg_uid *reg;
+
+               /*
+                * Clear quiescent for all per UID buffers associated to
+                * that session.
+                */
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct consumer_socket *socket;
+                       struct buffer_reg_channel *reg_chan;
+
+                       /* Get associated consumer socket.*/
+                       socket = consumer_find_socket_by_bitness(
+                                       reg->bits_per_long, usess->consumer);
+                       if (!socket) {
+                               /*
+                                * Ignore request if no consumer is found for
+                                * the session.
+                                */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht,
+                                       &iter.iter, reg_chan, node.node) {
+                               /*
+                                * The following call will print error values so
+                                * the return code is of little importance
+                                * because whatever happens, we have to try them
+                                * all.
+                                */
+                               (void) consumer_clear_quiescent_channel(socket,
+                                               reg_chan->consumer_key);
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct ust_app_session *ua_sess;
+               struct lttng_ht_iter iter;
+               struct ust_app *app;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app,
+                               pid_n.node) {
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (ua_sess == NULL) {
+                               continue;
+                       }
+                       (void) ust_app_clear_quiescent_app_session(app,
+                                       ua_sess);
+               }
+               break;
+       }
+       default:
+               ret = -1;
+               assert(0);
+               break;
+       }
+
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
 /*
  * Destroy a specific UST session in apps.
  */
@@ -4683,6 +4908,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
 
        rcu_read_lock();
 
+       /*
+        * In a start-stop-start use-case, we need to clear the quiescent state
+        * of each channel set by the prior stop command, thus ensuring that a
+        * following stop or destroy is sure to grab a timestamp_end near those
+        * operations, even if the packet is empty.
+        */
+       (void) ust_app_clear_quiescent_session(usess);
+
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_start_trace(usess, app);
                if (ret < 0) {
@@ -5024,54 +5257,6 @@ end:
        return ret;
 }
 
-/*
- * Calibrate registered applications.
- */
-int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate)
-{
-       int ret = 0;
-       struct lttng_ht_iter iter;
-       struct ust_app *app;
-
-       rcu_read_lock();
-
-       cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
-               if (!app->compatible) {
-                       /*
-                        * TODO: In time, we should notice the caller of this error by
-                        * telling him that this is a version error.
-                        */
-                       continue;
-               }
-
-               health_code_update();
-
-               pthread_mutex_lock(&app->sock_lock);
-               ret = ustctl_calibrate(app->sock, calibrate);
-               pthread_mutex_unlock(&app->sock_lock);
-               if (ret < 0) {
-                       switch (ret) {
-                       case -ENOSYS:
-                               /* Means that it's not implemented on the tracer side. */
-                               ret = 0;
-                               break;
-                       default:
-                               DBG2("Calibrate app PID %d returned with error %d",
-                                               app->pid, ret);
-                               break;
-                       }
-               }
-       }
-
-       DBG("UST app global domain calibration finished");
-
-       rcu_read_unlock();
-
-       health_code_update();
-
-       return ret;
-}
-
 /*
  * Receive registration and populate the given msg structure.
  *
@@ -5701,7 +5886,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                uint64_t nb_packets_per_stream)
 {
        int ret = 0;
-       unsigned int snapshot_done = 0;
        struct lttng_ht_iter iter;
        struct ust_app *app;
        char pathname[PATH_MAX];
@@ -5753,7 +5937,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        if (ret < 0) {
                                goto error;
                        }
-                       snapshot_done = 1;
                }
                break;
        }
@@ -5806,7 +5989,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        if (ret < 0) {
                                goto error;
                        }
-                       snapshot_done = 1;
                }
                break;
        }
@@ -5815,15 +5997,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                break;
        }
 
-       if (!snapshot_done) {
-               /*
-                * If no snapshot was made and we are not in the error path, this means
-                * that there are no buffers thus no (prior) application to snapshot
-                * data from so we have simply NO data.
-                */
-               ret = -ENODATA;
-       }
-
 error:
        rcu_read_unlock();
        return ret;
@@ -5920,9 +6093,11 @@ int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
        if (overwrite) {
                ret = consumer_get_lost_packets(ust_session_id,
                                consumer_chan_key, consumer, lost);
+               *discarded = 0;
        } else {
                ret = consumer_get_discarded_events(ust_session_id,
                                consumer_chan_key, consumer, discarded);
+               *lost = 0;
        }
 
 end:
@@ -5965,16 +6140,83 @@ int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
                if (overwrite) {
                        ret = consumer_get_lost_packets(usess->id, ua_chan->key,
                                        consumer, lost);
+                       *discarded = 0;
                        goto end;
                } else {
                        ret = consumer_get_discarded_events(usess->id,
                                        ua_chan->key, consumer, discarded);
+                       *lost = 0;
                        goto end;
                }
+       }
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+static
+int ust_app_regenerate_statedump(struct ltt_ust_session *usess,
+               struct ust_app *app)
+{
+       int ret = 0;
+       struct ust_app_session *ua_sess;
+
+       DBG("Regenerating the metadata for ust app pid %d", app->pid);
+
+       rcu_read_lock();
+
+       ua_sess = lookup_session_by_app(usess, app);
+       if (ua_sess == NULL) {
+               /* The session is in teardown process. Ignore and continue. */
                goto end;
        }
 
+       pthread_mutex_lock(&ua_sess->lock);
+
+       if (ua_sess->deleted) {
+               goto end_unlock;
+       }
+
+       pthread_mutex_lock(&app->sock_lock);
+       ret = ustctl_regenerate_statedump(app->sock, ua_sess->handle);
+       pthread_mutex_unlock(&app->sock_lock);
+
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+
 end:
        rcu_read_unlock();
+       health_code_update();
        return ret;
 }
+
+/*
+ * Regenerate the statedump for each app in the session.
+ */
+int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app *app;
+
+       DBG("Regenerating the metadata for all UST apps");
+
+       rcu_read_lock();
+
+       cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+               if (!app->compatible) {
+                       continue;
+               }
+
+               ret = ust_app_regenerate_statedump(usess, app);
+               if (ret < 0) {
+                       /* Continue to the next app even on error */
+                       continue;
+               }
+       }
+
+       rcu_read_unlock();
+
+       return 0;
+}
This page took 0.030032 seconds and 4 git commands to generate.