Implement consumer ring buffer position sampling
[lttng-tools.git] / src / bin / lttng-sessiond / ust-app.c
index 1bb183d5c68dfe7d4e5d56125f1dfbac26492a3a..d292779d156dca47d2ce1d861ac35d216811d731 100644 (file)
@@ -41,6 +41,8 @@
 #include "ust-ctl.h"
 #include "utils.h"
 #include "session.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
 
 static
 int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
@@ -482,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
                /* Wipe and free registry from session registry. */
                registry = get_session_registry(ua_chan->session);
                if (registry) {
-                       ust_registry_channel_del_free(registry, ua_chan->key);
+                       ust_registry_channel_del_free(registry, ua_chan->key,
+                               true);
                }
                save_per_pid_lost_discarded_counters(ua_chan);
        }
@@ -566,18 +569,6 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                return 0;
        }
 
-       /*
-        * On a push metadata error either the consumer is dead or the
-        * metadata channel has been destroyed because its endpoint
-        * might have died (e.g: relayd), or because the application has
-        * exited. If so, the metadata closed flag is set to 1 so we
-        * deny pushing metadata again which is not valid anymore on the
-        * consumer side.
-        */
-       if (registry->metadata_closed) {
-               return -EPIPE;
-       }
-
        offset = registry->metadata_len_sent;
        len = registry->metadata_len - registry->metadata_len_sent;
        new_metadata_len_sent = registry->metadata_len;
@@ -1805,6 +1796,7 @@ static void shadow_copy_channel(struct ust_app_channel *ua_chan,
        ua_chan->attr.overwrite = uchan->attr.overwrite;
        ua_chan->attr.switch_timer_interval = uchan->attr.switch_timer_interval;
        ua_chan->attr.read_timer_interval = uchan->attr.read_timer_interval;
+       ua_chan->monitor_timer_interval = uchan->monitor_timer_interval;
        ua_chan->attr.output = uchan->attr.output;
        /*
         * Note that the attribute channel type is not set since the channel on the
@@ -2821,9 +2813,6 @@ static int send_channel_uid_to_ust(struct buffer_reg_channel *reg_chan,
                        (void) release_ust_app_stream(-1, &stream, app);
                        if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) {
                                ret = -ENOTCONN; /* Caused by app exiting. */
-                               goto error_stream_unlock;
-                       } else if (ret < 0) {
-                               goto error_stream_unlock;
                        }
                        goto error_stream_unlock;
                }
@@ -2854,6 +2843,7 @@ static int create_channel_per_uid(struct ust_app *app,
        int ret;
        struct buffer_reg_uid *reg_uid;
        struct buffer_reg_channel *reg_chan;
+       bool created = false;
 
        assert(app);
        assert(usess);
@@ -2897,7 +2887,7 @@ static int create_channel_per_uid(struct ust_app *app,
                         * it's not visible anymore in the session registry.
                         */
                        ust_registry_channel_del_free(reg_uid->registry->reg.ust,
-                                       ua_chan->tracing_channel_id);
+                                       ua_chan->tracing_channel_id, false);
                        buffer_reg_channel_remove(reg_uid->registry, reg_chan);
                        buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST);
                        goto error;
@@ -2913,7 +2903,7 @@ static int create_channel_per_uid(struct ust_app *app,
                                ua_chan->name);
                        goto error;
                }
-
+               created = true;
        }
 
        /* Send buffers to the application. */
@@ -2925,6 +2915,41 @@ static int create_channel_per_uid(struct ust_app *app,
                goto error;
        }
 
+       if (created) {
+               enum lttng_error_code cmd_ret;
+               struct ltt_session *session;
+               uint64_t chan_reg_key;
+               struct ust_registry_channel *chan_reg;
+
+               rcu_read_lock();
+               chan_reg_key = ua_chan->tracing_channel_id;
+
+               pthread_mutex_lock(&reg_uid->registry->reg.ust->lock);
+               chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust,
+                               chan_reg_key);
+               assert(chan_reg);
+               chan_reg->consumer_key = ua_chan->key;
+               chan_reg = NULL;
+               pthread_mutex_unlock(&reg_uid->registry->reg.ust->lock);
+
+               session = session_find_by_id(ua_sess->tracing_id);
+               assert(session);
+
+               cmd_ret = notification_thread_command_add_channel(
+                               notification_thread_handle, session->name,
+                               ua_sess->euid, ua_sess->egid,
+                               ua_chan->name,
+                               ua_chan->key,
+                               LTTNG_DOMAIN_UST,
+                               ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+               rcu_read_unlock();
+               if (cmd_ret != LTTNG_OK) {
+                       ret = - (int) cmd_ret;
+                       ERR("Failed to add channel to notification thread");
+                       goto error;
+               }
+       }
+
 error:
        return ret;
 }
@@ -2940,6 +2965,10 @@ static int create_channel_per_pid(struct ust_app *app,
 {
        int ret;
        struct ust_registry_session *registry;
+       enum lttng_error_code cmd_ret;
+       struct ltt_session *session;
+       uint64_t chan_reg_key;
+       struct ust_registry_channel *chan_reg;
 
        assert(app);
        assert(usess);
@@ -2978,6 +3007,29 @@ static int create_channel_per_pid(struct ust_app *app,
                goto error;
        }
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+
+       chan_reg_key = ua_chan->key;
+       pthread_mutex_lock(&registry->lock);
+       chan_reg = ust_registry_channel_find(registry, chan_reg_key);
+       assert(chan_reg);
+       chan_reg->consumer_key = ua_chan->key;
+       pthread_mutex_unlock(&registry->lock);
+
+       cmd_ret = notification_thread_command_add_channel(
+                       notification_thread_handle, session->name,
+                       ua_sess->euid, ua_sess->egid,
+                       ua_chan->name,
+                       ua_chan->key,
+                       LTTNG_DOMAIN_UST,
+                       ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+       if (cmd_ret != LTTNG_OK) {
+               ret = - (int) cmd_ret;
+               ERR("Failed to add channel to notification thread");
+               goto error;
+       }
+
 error:
        rcu_read_unlock();
        return ret;
@@ -3090,7 +3142,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess,
 
        /* Only add the channel if successful on the tracer side. */
        lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node);
-
 end:
        if (ua_chanp) {
                *ua_chanp = ua_chan;
@@ -4650,6 +4701,155 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
        return ret;
 }
 
+static
+int ust_app_clear_quiescent_app_session(struct ust_app *app,
+               struct ust_app_session *ua_sess)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app_channel *ua_chan;
+       struct consumer_socket *socket;
+
+       DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
+
+       rcu_read_lock();
+
+       if (!app->compatible) {
+               goto end_not_compatible;
+       }
+
+       pthread_mutex_lock(&ua_sess->lock);
+
+       if (ua_sess->deleted) {
+               goto end_unlock;
+       }
+
+       health_code_update();
+
+       socket = consumer_find_socket_by_bitness(app->bits_per_long,
+                       ua_sess->consumer);
+       if (!socket) {
+               ERR("Failed to find consumer (%" PRIu32 ") socket",
+                               app->bits_per_long);
+               ret = -1;
+               goto end_unlock;
+       }
+
+       /* Clear quiescent state. */
+       switch (ua_sess->buffer_type) {
+       case LTTNG_BUFFER_PER_PID:
+               cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter,
+                               ua_chan, node.node) {
+                       health_code_update();
+                       ret = consumer_clear_quiescent_channel(socket,
+                                       ua_chan->key);
+                       if (ret) {
+                               ERR("Error clearing quiescent state for consumer channel");
+                               ret = -1;
+                               continue;
+                       }
+               }
+               break;
+       case LTTNG_BUFFER_PER_UID:
+       default:
+               assert(0);
+               ret = -1;
+               break;
+       }
+
+       health_code_update();
+
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+
+end_not_compatible:
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Clear quiescent state in each stream for all applications for a
+ * specific UST session.
+ * Called with UST session lock held.
+ */
+static
+int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
+
+{
+       int ret = 0;
+
+       DBG("Clearing stream quiescent state for all ust apps");
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct lttng_ht_iter iter;
+               struct buffer_reg_uid *reg;
+
+               /*
+                * Clear quiescent for all per UID buffers associated to
+                * that session.
+                */
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct consumer_socket *socket;
+                       struct buffer_reg_channel *reg_chan;
+
+                       /* Get associated consumer socket.*/
+                       socket = consumer_find_socket_by_bitness(
+                                       reg->bits_per_long, usess->consumer);
+                       if (!socket) {
+                               /*
+                                * Ignore request if no consumer is found for
+                                * the session.
+                                */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht,
+                                       &iter.iter, reg_chan, node.node) {
+                               /*
+                                * The following call will print error values so
+                                * the return code is of little importance
+                                * because whatever happens, we have to try them
+                                * all.
+                                */
+                               (void) consumer_clear_quiescent_channel(socket,
+                                               reg_chan->consumer_key);
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct ust_app_session *ua_sess;
+               struct lttng_ht_iter iter;
+               struct ust_app *app;
+
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app,
+                               pid_n.node) {
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (ua_sess == NULL) {
+                               continue;
+                       }
+                       (void) ust_app_clear_quiescent_app_session(app,
+                                       ua_sess);
+               }
+               break;
+       }
+       default:
+               ret = -1;
+               assert(0);
+               break;
+       }
+
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
 /*
  * Destroy a specific UST session in apps.
  */
@@ -4708,6 +4908,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
 
        rcu_read_lock();
 
+       /*
+        * In a start-stop-start use-case, we need to clear the quiescent state
+        * of each channel set by the prior stop command, thus ensuring that a
+        * following stop or destroy is sure to grab a timestamp_end near those
+        * operations, even if the packet is empty.
+        */
+       (void) ust_app_clear_quiescent_session(usess);
+
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                ret = ust_app_start_trace(usess, app);
                if (ret < 0) {
@@ -5049,54 +5257,6 @@ end:
        return ret;
 }
 
-/*
- * Calibrate registered applications.
- */
-int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate)
-{
-       int ret = 0;
-       struct lttng_ht_iter iter;
-       struct ust_app *app;
-
-       rcu_read_lock();
-
-       cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
-               if (!app->compatible) {
-                       /*
-                        * TODO: In time, we should notice the caller of this error by
-                        * telling him that this is a version error.
-                        */
-                       continue;
-               }
-
-               health_code_update();
-
-               pthread_mutex_lock(&app->sock_lock);
-               ret = ustctl_calibrate(app->sock, calibrate);
-               pthread_mutex_unlock(&app->sock_lock);
-               if (ret < 0) {
-                       switch (ret) {
-                       case -ENOSYS:
-                               /* Means that it's not implemented on the tracer side. */
-                               ret = 0;
-                               break;
-                       default:
-                               DBG2("Calibrate app PID %d returned with error %d",
-                                               app->pid, ret);
-                               break;
-                       }
-               }
-       }
-
-       DBG("UST app global domain calibration finished");
-
-       rcu_read_unlock();
-
-       health_code_update();
-
-       return ret;
-}
-
 /*
  * Receive registration and populate the given msg structure.
  *
@@ -5994,3 +6154,69 @@ end:
        rcu_read_unlock();
        return ret;
 }
+
+static
+int ust_app_regenerate_statedump(struct ltt_ust_session *usess,
+               struct ust_app *app)
+{
+       int ret = 0;
+       struct ust_app_session *ua_sess;
+
+       DBG("Regenerating the metadata for ust app pid %d", app->pid);
+
+       rcu_read_lock();
+
+       ua_sess = lookup_session_by_app(usess, app);
+       if (ua_sess == NULL) {
+               /* The session is in teardown process. Ignore and continue. */
+               goto end;
+       }
+
+       pthread_mutex_lock(&ua_sess->lock);
+
+       if (ua_sess->deleted) {
+               goto end_unlock;
+       }
+
+       pthread_mutex_lock(&app->sock_lock);
+       ret = ustctl_regenerate_statedump(app->sock, ua_sess->handle);
+       pthread_mutex_unlock(&app->sock_lock);
+
+end_unlock:
+       pthread_mutex_unlock(&ua_sess->lock);
+
+end:
+       rcu_read_unlock();
+       health_code_update();
+       return ret;
+}
+
+/*
+ * Regenerate the statedump for each app in the session.
+ */
+int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct ust_app *app;
+
+       DBG("Regenerating the metadata for all UST apps");
+
+       rcu_read_lock();
+
+       cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+               if (!app->compatible) {
+                       continue;
+               }
+
+               ret = ust_app_regenerate_statedump(usess, app);
+               if (ret < 0) {
+                       /* Continue to the next app even on error */
+                       continue;
+               }
+       }
+
+       rcu_read_unlock();
+
+       return 0;
+}
This page took 0.028491 seconds and 4 git commands to generate.