X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fust-app.c;h=d292779d156dca47d2ce1d861ac35d216811d731;hp=285467fbbacac990864a0871d18f200a0eb7f67b;hb=e9404c27e7cc9d841785e6c4292c1add19fbc1cc;hpb=ee02239944bbb8b019bf5505bfc849fb0936e2f6 diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 285467fbb..d292779d1 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -41,6 +41,8 @@ #include "ust-ctl.h" #include "utils.h" #include "session.h" +#include "lttng-sessiond.h" +#include "notification-thread-commands.h" static int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess); @@ -376,6 +378,8 @@ void delete_ust_app_channel_rcu(struct rcu_head *head) * Extract the lost packet or discarded events counter when the channel is * being deleted and store the value in the parent channel so we can * access it from lttng list and at stop/destroy. + * + * The session list lock must be held by the caller. */ static void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan) @@ -390,12 +394,22 @@ void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan) rcu_read_lock(); session = session_find_by_id(ua_chan->session->tracing_id); - if (!session) { - ERR("Missing LTT session to get discarded events"); - goto end; - } - if (!session->ust_session) { - ERR("Missing UST session to get discarded events"); + if (!session || !session->ust_session) { + /* + * Not finding the session is not an error because there are + * multiple ways the channels can be torn down. + * + * 1) The session daemon can initiate the destruction of the + * ust app session after receiving a destroy command or + * during its shutdown/teardown. + * 2) The application, since we are in per-pid tracing, is + * unregistering and tearing down its ust app session. + * + * Both paths are protected by the session list lock which + * ensures that the accounting of lost packets and discarded + * events is done exactly once. The session is then unpublished + * from the session list, resulting in this condition. + */ goto end; } @@ -426,6 +440,8 @@ end: /* * Delete ust app channel safely. RCU read lock must be held before calling * this function. + * + * The session list lock must be held by the caller. */ static void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, @@ -468,7 +484,8 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, /* Wipe and free registry from session registry. */ registry = get_session_registry(ua_chan->session); if (registry) { - ust_registry_channel_del_free(registry, ua_chan->key); + ust_registry_channel_del_free(registry, ua_chan->key, + true); } save_per_pid_lost_discarded_counters(ua_chan); } @@ -537,7 +554,7 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry, char *metadata_str = NULL; size_t len, offset, new_metadata_len_sent; ssize_t ret_val; - uint64_t metadata_key; + uint64_t metadata_key, metadata_version; assert(registry); assert(socket); @@ -552,21 +569,10 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry, return 0; } - /* - * On a push metadata error either the consumer is dead or the - * metadata channel has been destroyed because its endpoint - * might have died (e.g: relayd), or because the application has - * exited. If so, the metadata closed flag is set to 1 so we - * deny pushing metadata again which is not valid anymore on the - * consumer side. - */ - if (registry->metadata_closed) { - return -EPIPE; - } - offset = registry->metadata_len_sent; len = registry->metadata_len - registry->metadata_len_sent; new_metadata_len_sent = registry->metadata_len; + metadata_version = registry->metadata_version; if (len == 0) { DBG3("No metadata to push for metadata key %" PRIu64, registry->metadata_key); @@ -603,7 +609,7 @@ push_data: * different bidirectionnal communication sockets. */ ret = consumer_push_metadata(socket, metadata_key, - metadata_str, len, offset); + metadata_str, len, offset, metadata_version); pthread_mutex_lock(®istry->lock); if (ret < 0) { /* @@ -787,6 +793,8 @@ void delete_ust_app_session_rcu(struct rcu_head *head) /* * Delete ust app session safely. RCU read lock must be held before calling * this function. + * + * The session list lock must be held by the caller. */ static void delete_ust_app_session(int sock, struct ust_app_session *ua_sess, @@ -870,6 +878,11 @@ void delete_ust_app(struct ust_app *app) int ret, sock; struct ust_app_session *ua_sess, *tmp_ua_sess; + /* + * The session list lock must be held during this function to guarantee + * the existence of ua_sess. + */ + session_lock_list(); /* Delete ust app sessions info */ sock = app->sock; app->sock = -1; @@ -908,6 +921,7 @@ void delete_ust_app(struct ust_app *app) DBG2("UST app pid %d deleted", app->pid); free(app); + session_unlock_list(); } /* @@ -928,6 +942,8 @@ void delete_ust_app_rcu(struct rcu_head *head) /* * Delete the session from the application ht and delete the data structure by * freeing every object inside and releasing them. + * + * The session list lock must be held by the caller. */ static void destroy_app_session(struct ust_app *app, struct ust_app_session *ua_sess) @@ -1780,6 +1796,7 @@ static void shadow_copy_channel(struct ust_app_channel *ua_chan, ua_chan->attr.overwrite = uchan->attr.overwrite; ua_chan->attr.switch_timer_interval = uchan->attr.switch_timer_interval; ua_chan->attr.read_timer_interval = uchan->attr.read_timer_interval; + ua_chan->monitor_timer_interval = uchan->monitor_timer_interval; ua_chan->attr.output = uchan->attr.output; /* * Note that the attribute channel type is not set since the channel on the @@ -2796,9 +2813,6 @@ static int send_channel_uid_to_ust(struct buffer_reg_channel *reg_chan, (void) release_ust_app_stream(-1, &stream, app); if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) { ret = -ENOTCONN; /* Caused by app exiting. */ - goto error_stream_unlock; - } else if (ret < 0) { - goto error_stream_unlock; } goto error_stream_unlock; } @@ -2829,6 +2843,7 @@ static int create_channel_per_uid(struct ust_app *app, int ret; struct buffer_reg_uid *reg_uid; struct buffer_reg_channel *reg_chan; + bool created = false; assert(app); assert(usess); @@ -2872,7 +2887,7 @@ static int create_channel_per_uid(struct ust_app *app, * it's not visible anymore in the session registry. */ ust_registry_channel_del_free(reg_uid->registry->reg.ust, - ua_chan->tracing_channel_id); + ua_chan->tracing_channel_id, false); buffer_reg_channel_remove(reg_uid->registry, reg_chan); buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST); goto error; @@ -2888,7 +2903,7 @@ static int create_channel_per_uid(struct ust_app *app, ua_chan->name); goto error; } - + created = true; } /* Send buffers to the application. */ @@ -2900,6 +2915,41 @@ static int create_channel_per_uid(struct ust_app *app, goto error; } + if (created) { + enum lttng_error_code cmd_ret; + struct ltt_session *session; + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; + + rcu_read_lock(); + chan_reg_key = ua_chan->tracing_channel_id; + + pthread_mutex_lock(®_uid->registry->reg.ust->lock); + chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, + chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + chan_reg = NULL; + pthread_mutex_unlock(®_uid->registry->reg.ust->lock); + + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + rcu_read_unlock(); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + } + error: return ret; } @@ -2915,6 +2965,10 @@ static int create_channel_per_pid(struct ust_app *app, { int ret; struct ust_registry_session *registry; + enum lttng_error_code cmd_ret; + struct ltt_session *session; + uint64_t chan_reg_key; + struct ust_registry_channel *chan_reg; assert(app); assert(usess); @@ -2953,6 +3007,29 @@ static int create_channel_per_pid(struct ust_app *app, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + chan_reg_key = ua_chan->key; + pthread_mutex_lock(®istry->lock); + chan_reg = ust_registry_channel_find(registry, chan_reg_key); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + pthread_mutex_unlock(®istry->lock); + + cmd_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (cmd_ret != LTTNG_OK) { + ret = - (int) cmd_ret; + ERR("Failed to add channel to notification thread"); + goto error; + } + error: rcu_read_unlock(); return ret; @@ -3065,7 +3142,6 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess, /* Only add the channel if successful on the tracer side. */ lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node); - end: if (ua_chanp) { *ua_chanp = ua_chan; @@ -4625,6 +4701,155 @@ int ust_app_flush_session(struct ltt_ust_session *usess) return ret; } +static +int ust_app_clear_quiescent_app_session(struct ust_app *app, + struct ust_app_session *ua_sess) +{ + int ret = 0; + struct lttng_ht_iter iter; + struct ust_app_channel *ua_chan; + struct consumer_socket *socket; + + DBG("Clearing stream quiescent state for ust app pid %d", app->pid); + + rcu_read_lock(); + + if (!app->compatible) { + goto end_not_compatible; + } + + pthread_mutex_lock(&ua_sess->lock); + + if (ua_sess->deleted) { + goto end_unlock; + } + + health_code_update(); + + socket = consumer_find_socket_by_bitness(app->bits_per_long, + ua_sess->consumer); + if (!socket) { + ERR("Failed to find consumer (%" PRIu32 ") socket", + app->bits_per_long); + ret = -1; + goto end_unlock; + } + + /* Clear quiescent state. */ + switch (ua_sess->buffer_type) { + case LTTNG_BUFFER_PER_PID: + cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, + ua_chan, node.node) { + health_code_update(); + ret = consumer_clear_quiescent_channel(socket, + ua_chan->key); + if (ret) { + ERR("Error clearing quiescent state for consumer channel"); + ret = -1; + continue; + } + } + break; + case LTTNG_BUFFER_PER_UID: + default: + assert(0); + ret = -1; + break; + } + + health_code_update(); + +end_unlock: + pthread_mutex_unlock(&ua_sess->lock); + +end_not_compatible: + rcu_read_unlock(); + health_code_update(); + return ret; +} + +/* + * Clear quiescent state in each stream for all applications for a + * specific UST session. + * Called with UST session lock held. + */ +static +int ust_app_clear_quiescent_session(struct ltt_ust_session *usess) + +{ + int ret = 0; + + DBG("Clearing stream quiescent state for all ust apps"); + + rcu_read_lock(); + + switch (usess->buffer_type) { + case LTTNG_BUFFER_PER_UID: + { + struct lttng_ht_iter iter; + struct buffer_reg_uid *reg; + + /* + * Clear quiescent for all per UID buffers associated to + * that session. + */ + cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) { + struct consumer_socket *socket; + struct buffer_reg_channel *reg_chan; + + /* Get associated consumer socket.*/ + socket = consumer_find_socket_by_bitness( + reg->bits_per_long, usess->consumer); + if (!socket) { + /* + * Ignore request if no consumer is found for + * the session. + */ + continue; + } + + cds_lfht_for_each_entry(reg->registry->channels->ht, + &iter.iter, reg_chan, node.node) { + /* + * The following call will print error values so + * the return code is of little importance + * because whatever happens, we have to try them + * all. + */ + (void) consumer_clear_quiescent_channel(socket, + reg_chan->consumer_key); + } + } + break; + } + case LTTNG_BUFFER_PER_PID: + { + struct ust_app_session *ua_sess; + struct lttng_ht_iter iter; + struct ust_app *app; + + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, + pid_n.node) { + ua_sess = lookup_session_by_app(usess, app); + if (ua_sess == NULL) { + continue; + } + (void) ust_app_clear_quiescent_app_session(app, + ua_sess); + } + break; + } + default: + ret = -1; + assert(0); + break; + } + + rcu_read_unlock(); + health_code_update(); + return ret; +} + /* * Destroy a specific UST session in apps. */ @@ -4683,6 +4908,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess) rcu_read_lock(); + /* + * In a start-stop-start use-case, we need to clear the quiescent state + * of each channel set by the prior stop command, thus ensuring that a + * following stop or destroy is sure to grab a timestamp_end near those + * operations, even if the packet is empty. + */ + (void) ust_app_clear_quiescent_session(usess); + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) { ret = ust_app_start_trace(usess, app); if (ret < 0) { @@ -5024,54 +5257,6 @@ end: return ret; } -/* - * Calibrate registered applications. - */ -int ust_app_calibrate_glb(struct lttng_ust_calibrate *calibrate) -{ - int ret = 0; - struct lttng_ht_iter iter; - struct ust_app *app; - - rcu_read_lock(); - - cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) { - if (!app->compatible) { - /* - * TODO: In time, we should notice the caller of this error by - * telling him that this is a version error. - */ - continue; - } - - health_code_update(); - - pthread_mutex_lock(&app->sock_lock); - ret = ustctl_calibrate(app->sock, calibrate); - pthread_mutex_unlock(&app->sock_lock); - if (ret < 0) { - switch (ret) { - case -ENOSYS: - /* Means that it's not implemented on the tracer side. */ - ret = 0; - break; - default: - DBG2("Calibrate app PID %d returned with error %d", - app->pid, ret); - break; - } - } - } - - DBG("UST app global domain calibration finished"); - - rcu_read_unlock(); - - health_code_update(); - - return ret; -} - /* * Receive registration and populate the given msg structure. * @@ -5701,7 +5886,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, uint64_t nb_packets_per_stream) { int ret = 0; - unsigned int snapshot_done = 0; struct lttng_ht_iter iter; struct ust_app *app; char pathname[PATH_MAX]; @@ -5753,7 +5937,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, if (ret < 0) { goto error; } - snapshot_done = 1; } break; } @@ -5806,7 +5989,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, if (ret < 0) { goto error; } - snapshot_done = 1; } break; } @@ -5815,15 +5997,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, break; } - if (!snapshot_done) { - /* - * If no snapshot was made and we are not in the error path, this means - * that there are no buffers thus no (prior) application to snapshot - * data from so we have simply NO data. - */ - ret = -ENODATA; - } - error: rcu_read_unlock(); return ret; @@ -5920,9 +6093,11 @@ int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id, if (overwrite) { ret = consumer_get_lost_packets(ust_session_id, consumer_chan_key, consumer, lost); + *discarded = 0; } else { ret = consumer_get_discarded_events(ust_session_id, consumer_chan_key, consumer, discarded); + *lost = 0; } end: @@ -5965,16 +6140,83 @@ int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess, if (overwrite) { ret = consumer_get_lost_packets(usess->id, ua_chan->key, consumer, lost); + *discarded = 0; goto end; } else { ret = consumer_get_discarded_events(usess->id, ua_chan->key, consumer, discarded); + *lost = 0; goto end; } + } + +end: + rcu_read_unlock(); + return ret; +} + +static +int ust_app_regenerate_statedump(struct ltt_ust_session *usess, + struct ust_app *app) +{ + int ret = 0; + struct ust_app_session *ua_sess; + + DBG("Regenerating the metadata for ust app pid %d", app->pid); + + rcu_read_lock(); + + ua_sess = lookup_session_by_app(usess, app); + if (ua_sess == NULL) { + /* The session is in teardown process. Ignore and continue. */ goto end; } + pthread_mutex_lock(&ua_sess->lock); + + if (ua_sess->deleted) { + goto end_unlock; + } + + pthread_mutex_lock(&app->sock_lock); + ret = ustctl_regenerate_statedump(app->sock, ua_sess->handle); + pthread_mutex_unlock(&app->sock_lock); + +end_unlock: + pthread_mutex_unlock(&ua_sess->lock); + end: rcu_read_unlock(); + health_code_update(); return ret; } + +/* + * Regenerate the statedump for each app in the session. + */ +int ust_app_regenerate_statedump_all(struct ltt_ust_session *usess) +{ + int ret = 0; + struct lttng_ht_iter iter; + struct ust_app *app; + + DBG("Regenerating the metadata for all UST apps"); + + rcu_read_lock(); + + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) { + if (!app->compatible) { + continue; + } + + ret = ust_app_regenerate_statedump(usess, app); + if (ret < 0) { + /* Continue to the next app even on error */ + continue; + } + } + + rcu_read_unlock(); + + return 0; +}