rcu_read_lock();
session = session_find_by_id(ua_chan->session->tracing_id);
- if (!session) {
- ERR("Missing LTT session to get discarded events");
- goto end;
- }
- if (!session->ust_session) {
- ERR("Missing UST session to get discarded events");
+ if (!session || !session->ust_session) {
+ /*
+ * Not finding the session is not an error because there are
+ * multiple ways the channels can be torn down.
+ *
+ * 1) The session daemon can initiate the destruction of the
+ * ust app session after receiving a destroy command or
+ * during its shutdown/teardown.
+ * 2) The application, since we are in per-pid tracing, is
+ * unregistering and tearing down its ust app session.
+ *
+ * Both paths are protected by the session list lock which
+ * ensures that the accounting of lost packets and discarded
+ * events is done exactly once. The session is then unpublished
+ * from the session list, resulting in this condition.
+ */
goto end;
}
char *metadata_str = NULL;
size_t len, offset, new_metadata_len_sent;
ssize_t ret_val;
- uint64_t metadata_key;
+ uint64_t metadata_key, metadata_version;
assert(registry);
assert(socket);
return 0;
}
- /*
- * On a push metadata error either the consumer is dead or the
- * metadata channel has been destroyed because its endpoint
- * might have died (e.g: relayd), or because the application has
- * exited. If so, the metadata closed flag is set to 1 so we
- * deny pushing metadata again which is not valid anymore on the
- * consumer side.
- */
- if (registry->metadata_closed) {
- return -EPIPE;
- }
-
offset = registry->metadata_len_sent;
len = registry->metadata_len - registry->metadata_len_sent;
new_metadata_len_sent = registry->metadata_len;
+ metadata_version = registry->metadata_version;
if (len == 0) {
DBG3("No metadata to push for metadata key %" PRIu64,
registry->metadata_key);
* different bidirectionnal communication sockets.
*/
ret = consumer_push_metadata(socket, metadata_key,
- metadata_str, len, offset);
+ metadata_str, len, offset, metadata_version);
pthread_mutex_lock(®istry->lock);
if (ret < 0) {
/*
ua_sess->deleted = true;
registry = get_session_registry(ua_sess);
+ /* Registry can be null on error path during initialization. */
if (registry) {
/* Push metadata for application before freeing the application. */
(void) push_metadata(registry, ua_sess->consumer);
if (ua_sess->buffer_type == LTTNG_BUFFER_PER_PID) {
struct buffer_reg_pid *reg_pid = buffer_reg_pid_find(ua_sess->id);
if (reg_pid) {
+ /*
+ * Registry can be null on error path during
+ * initialization.
+ */
buffer_reg_pid_remove(reg_pid);
buffer_reg_pid_destroy(reg_pid);
}
/*
* Ask the consumer to create a channel and get it if successful.
*
+ * Called with UST app session lock held.
+ *
* Return 0 on success or else a negative value.
*/
static int do_consumer_create_channel(struct ltt_ust_session *usess,
(void) release_ust_app_stream(-1, &stream, app);
if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) {
ret = -ENOTCONN; /* Caused by app exiting. */
- goto error_stream_unlock;
- } else if (ret < 0) {
- goto error_stream_unlock;
}
goto error_stream_unlock;
}
/*
* Create and send to the application the created buffers with per PID buffers.
*
+ * Called with UST app session lock held.
+ *
* Return 0 on success else a negative value.
*/
static int create_channel_per_pid(struct ust_app *app,
rcu_read_lock();
registry = get_session_registry(ua_sess);
+ /* The UST app session lock is held, registry shall not be null. */
assert(registry);
/* Create and add a new channel registry to session. */
* need and send it to the application. This MUST be called with a RCU read
* side lock acquired.
*
+ * Called with UST app session lock held.
+ *
* Return 0 on success or else a negative value. Returns -ENOTCONN if
* the application exited concurrently.
*/
assert(consumer);
registry = get_session_registry(ua_sess);
+ /* The UST app session is held registry shall not be null. */
assert(registry);
pthread_mutex_lock(®istry->lock);
/*
* Start tracing for a specific UST session and app.
+ *
+ * Called with UST app session lock held.
+ *
*/
static
int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app)
health_code_update();
registry = get_session_registry(ua_sess);
+
+ /* The UST app session is held registry shall not be null. */
assert(registry);
/* Push metadata for application before freeing the application. */
return ret;
}
+static
+int ust_app_clear_quiescent_app_session(struct ust_app *app,
+ struct ust_app_session *ua_sess)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct ust_app_channel *ua_chan;
+ struct consumer_socket *socket;
+
+ DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
+
+ rcu_read_lock();
+
+ if (!app->compatible) {
+ goto end_not_compatible;
+ }
+
+ pthread_mutex_lock(&ua_sess->lock);
+
+ if (ua_sess->deleted) {
+ goto end_unlock;
+ }
+
+ health_code_update();
+
+ socket = consumer_find_socket_by_bitness(app->bits_per_long,
+ ua_sess->consumer);
+ if (!socket) {
+ ERR("Failed to find consumer (%" PRIu32 ") socket",
+ app->bits_per_long);
+ ret = -1;
+ goto end_unlock;
+ }
+
+ /* Clear quiescent state. */
+ switch (ua_sess->buffer_type) {
+ case LTTNG_BUFFER_PER_PID:
+ cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter,
+ ua_chan, node.node) {
+ health_code_update();
+ ret = consumer_clear_quiescent_channel(socket,
+ ua_chan->key);
+ if (ret) {
+ ERR("Error clearing quiescent state for consumer channel");
+ ret = -1;
+ continue;
+ }
+ }
+ break;
+ case LTTNG_BUFFER_PER_UID:
+ default:
+ assert(0);
+ ret = -1;
+ break;
+ }
+
+ health_code_update();
+
+end_unlock:
+ pthread_mutex_unlock(&ua_sess->lock);
+
+end_not_compatible:
+ rcu_read_unlock();
+ health_code_update();
+ return ret;
+}
+
+/*
+ * Clear quiescent state in each stream for all applications for a
+ * specific UST session.
+ * Called with UST session lock held.
+ */
+static
+int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
+
+{
+ int ret = 0;
+
+ DBG("Clearing stream quiescent state for all ust apps");
+
+ rcu_read_lock();
+
+ switch (usess->buffer_type) {
+ case LTTNG_BUFFER_PER_UID:
+ {
+ struct lttng_ht_iter iter;
+ struct buffer_reg_uid *reg;
+
+ /*
+ * Clear quiescent for all per UID buffers associated to
+ * that session.
+ */
+ cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+ struct consumer_socket *socket;
+ struct buffer_reg_channel *reg_chan;
+
+ /* Get associated consumer socket.*/
+ socket = consumer_find_socket_by_bitness(
+ reg->bits_per_long, usess->consumer);
+ if (!socket) {
+ /*
+ * Ignore request if no consumer is found for
+ * the session.
+ */
+ continue;
+ }
+
+ cds_lfht_for_each_entry(reg->registry->channels->ht,
+ &iter.iter, reg_chan, node.node) {
+ /*
+ * The following call will print error values so
+ * the return code is of little importance
+ * because whatever happens, we have to try them
+ * all.
+ */
+ (void) consumer_clear_quiescent_channel(socket,
+ reg_chan->consumer_key);
+ }
+ }
+ break;
+ }
+ case LTTNG_BUFFER_PER_PID:
+ {
+ struct ust_app_session *ua_sess;
+ struct lttng_ht_iter iter;
+ struct ust_app *app;
+
+ cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app,
+ pid_n.node) {
+ ua_sess = lookup_session_by_app(usess, app);
+ if (ua_sess == NULL) {
+ continue;
+ }
+ (void) ust_app_clear_quiescent_app_session(app,
+ ua_sess);
+ }
+ break;
+ }
+ default:
+ ret = -1;
+ assert(0);
+ break;
+ }
+
+ rcu_read_unlock();
+ health_code_update();
+ return ret;
+}
+
/*
* Destroy a specific UST session in apps.
*/
rcu_read_lock();
+ /*
+ * In a start-stop-start use-case, we need to clear the quiescent state
+ * of each channel set by the prior stop command, thus ensuring that a
+ * following stop or destroy is sure to grab a timestamp_end near those
+ * operations, even if the packet is empty.
+ */
+ (void) ust_app_clear_quiescent_session(usess);
+
cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
ret = ust_app_start_trace(usess, app);
if (ret < 0) {
/* Lookup application. If not found, there is a code flow error. */
app = find_app_by_notify_sock(sock);
if (!app) {
- DBG("Application socket %d is being teardown. Abort event notify",
+ DBG("Application socket %d is being torn down. Abort event notify",
sock);
ret = 0;
- free(fields);
goto error_rcu_unlock;
}
/* Lookup channel by UST object descriptor. */
ua_chan = find_channel_by_objd(app, cobjd);
if (!ua_chan) {
- DBG("Application channel is being teardown. Abort event notify");
+ DBG("Application channel is being torn down. Abort event notify");
ret = 0;
- free(fields);
goto error_rcu_unlock;
}
/* Get right session registry depending on the session buffer type. */
registry = get_session_registry(ua_sess);
- assert(registry);
+ if (!registry) {
+ DBG("Application session is being torn down. Abort event notify");
+ ret = 0;
+ goto error_rcu_unlock;
+ };
/* Depending on the buffer type, a different channel key is used. */
if (ua_sess->buffer_type == LTTNG_BUFFER_PER_UID) {
chan_reg->nr_ctx_fields = nr_fields;
chan_reg->ctx_fields = fields;
+ fields = NULL;
chan_reg->header_type = type;
} else {
/* Get current already assigned values. */
type = chan_reg->header_type;
- free(fields);
- /* Set to NULL so the error path does not do a double free. */
- fields = NULL;
}
/* Channel id is set during the object creation. */
chan_id = chan_reg->chan_id;
pthread_mutex_unlock(®istry->lock);
error_rcu_unlock:
rcu_read_unlock();
- if (ret) {
- free(fields);
- }
+ free(fields);
return ret;
}
/* Lookup application. If not found, there is a code flow error. */
app = find_app_by_notify_sock(sock);
if (!app) {
- DBG("Application socket %d is being teardown. Abort event notify",
+ DBG("Application socket %d is being torn down. Abort event notify",
sock);
ret = 0;
- free(sig);
- free(fields);
- free(model_emf_uri);
goto error_rcu_unlock;
}
/* Lookup channel by UST object descriptor. */
ua_chan = find_channel_by_objd(app, cobjd);
if (!ua_chan) {
- DBG("Application channel is being teardown. Abort event notify");
+ DBG("Application channel is being torn down. Abort event notify");
ret = 0;
- free(sig);
- free(fields);
- free(model_emf_uri);
goto error_rcu_unlock;
}
ua_sess = ua_chan->session;
registry = get_session_registry(ua_sess);
- assert(registry);
+ if (!registry) {
+ DBG("Application session is being torn down. Abort event notify");
+ ret = 0;
+ goto error_rcu_unlock;
+ }
if (ua_sess->buffer_type == LTTNG_BUFFER_PER_UID) {
chan_reg_key = ua_chan->tracing_channel_id;
sobjd, cobjd, name, sig, nr_fields, fields,
loglevel_value, model_emf_uri, ua_sess->buffer_type,
&event_id, app);
+ sig = NULL;
+ fields = NULL;
+ model_emf_uri = NULL;
/*
* The return value is returned to ustctl so in case of an error, the
pthread_mutex_unlock(®istry->lock);
error_rcu_unlock:
rcu_read_unlock();
+ free(sig);
+ free(fields);
+ free(model_emf_uri);
return ret;
}
ua_sess = find_session_by_objd(app, sobjd);
if (!ua_sess) {
/* Return an error since this is not an error */
- DBG("Application session is being torn down. Aborting enum registration.");
+ DBG("Application session is being torn down (session not found). Aborting enum registration.");
free(entries);
goto error_rcu_unlock;
}
registry = get_session_registry(ua_sess);
- assert(registry);
+ if (!registry) {
+ DBG("Application session is being torn down (registry not found). Aborting enum registration.");
+ free(entries);
+ goto error_rcu_unlock;
+ }
pthread_mutex_lock(®istry->lock);
uint64_t nb_packets_per_stream)
{
int ret = 0;
- unsigned int snapshot_done = 0;
struct lttng_ht_iter iter;
struct ust_app *app;
char pathname[PATH_MAX];
if (ret < 0) {
goto error;
}
- snapshot_done = 1;
}
break;
}
}
registry = get_session_registry(ua_sess);
- assert(registry);
+ if (!registry) {
+ DBG("Application session is being torn down. Abort snapshot record.");
+ ret = -1;
+ goto error;
+ }
ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
1, ua_sess->euid, ua_sess->egid, pathname, wait, 0);
if (ret < 0) {
goto error;
}
- snapshot_done = 1;
}
break;
}
break;
}
- if (!snapshot_done) {
- /*
- * If no snapshot was made and we are not in the error path, this means
- * that there are no buffers thus no (prior) application to snapshot
- * data from so we have simply NO data.
- */
- ret = -ENODATA;
- }
-
error:
rcu_read_unlock();
return ret;
if (overwrite) {
ret = consumer_get_lost_packets(ust_session_id,
consumer_chan_key, consumer, lost);
+ *discarded = 0;
} else {
ret = consumer_get_discarded_events(ust_session_id,
consumer_chan_key, consumer, discarded);
+ *lost = 0;
}
end:
if (overwrite) {
ret = consumer_get_lost_packets(usess->id, ua_chan->key,
consumer, lost);
+ *discarded = 0;
goto end;
} else {
ret = consumer_get_discarded_events(usess->id,
ua_chan->key, consumer, discarded);
+ *lost = 0;
goto end;
}
}