From 0dd01979d6f26886199ef746377640b57260421c Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 18 May 2016 14:04:11 -0400 Subject: [PATCH] Fix: UST should not generate packet at destroy after stop MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit In the following scenario: - create, enable events (ust), - start - ... - stop (await for data_pending to complete) - destroy - rm the trace directory We would expect that the "rm" operation would not conflict with the consumer daemon trying to output data into the trace files, since the "stop" operation ensured that there was no data_pending. However, the "destroy" operation currently generates an extra packet after the data_pending check (the "on_stream_hangup"). This causes the consumer daemon to try to perform trace file rotation concurrently with the trace directory removal in the scenario above, which triggers errors. The main reason why this empty packet is generated by "destroy" is to deal with trace start/stop scenario which would otherwise generate a completely empty stream. Therefore, introduce the concept of a "quiescent stream". It is initialized at false on stream creation (first packet is empty). When tracing is started, it is set to false (for cases of start/stop/start). When tracing is stopped, if the stream is not quiescent, perform a "final" flush (which will generate an empty packet if the current packet was empty), and set quiescent to true. On "destroy" stream and on application hangup: if the stream is not quiescent, perform a "final" flush, and set the quiescent state to true. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/consumer.c | 32 +++++ src/bin/lttng-sessiond/consumer.h | 1 + src/bin/lttng-sessiond/ust-app.c | 157 +++++++++++++++++++++++ src/common/consumer/consumer.h | 20 +++ src/common/sessiond-comm/sessiond-comm.h | 3 + src/common/ust-consumer/ust-consumer.c | 70 +++++++++- 6 files changed, 280 insertions(+), 3 deletions(-) diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 934af39bd..6ee397579 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1191,6 +1191,38 @@ end: return ret; } +/* + * Send a clear quiescent command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG2("Consumer clear quiescent channel key %" PRIu64, key); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL; + msg.u.clear_quiescent_channel.key = key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + /* * Send a close metadata command to consumer using the given channel key. * Called with registry lock held. diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 75a40f8ae..08b57eb73 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -284,6 +284,7 @@ int consumer_push_metadata(struct consumer_socket *socket, uint64_t metadata_key, char *metadata_str, size_t len, size_t target_offset, uint64_t version); int consumer_flush_channel(struct consumer_socket *socket, uint64_t key); +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key); int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, struct consumer_output *consumer, uint64_t *discarded); int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 17e33a108..f30df2097 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -4647,6 +4647,155 @@ int ust_app_flush_session(struct ltt_ust_session *usess) return ret; } +static +int ust_app_clear_quiescent_app_session(struct ust_app *app, + struct ust_app_session *ua_sess) +{ + int ret = 0; + struct lttng_ht_iter iter; + struct ust_app_channel *ua_chan; + struct consumer_socket *socket; + + DBG("Clearing stream quiescent state for ust app pid %d", app->pid); + + rcu_read_lock(); + + if (!app->compatible) { + goto end_not_compatible; + } + + pthread_mutex_lock(&ua_sess->lock); + + if (ua_sess->deleted) { + goto end_unlock; + } + + health_code_update(); + + socket = consumer_find_socket_by_bitness(app->bits_per_long, + ua_sess->consumer); + if (!socket) { + ERR("Failed to find consumer (%" PRIu32 ") socket", + app->bits_per_long); + ret = -1; + goto end_unlock; + } + + /* Clear quiescent state. */ + switch (ua_sess->buffer_type) { + case LTTNG_BUFFER_PER_PID: + cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, + ua_chan, node.node) { + health_code_update(); + ret = consumer_clear_quiescent_channel(socket, + ua_chan->key); + if (ret) { + ERR("Error clearing quiescent state for consumer channel"); + ret = -1; + continue; + } + } + break; + case LTTNG_BUFFER_PER_UID: + default: + assert(0); + ret = -1; + break; + } + + health_code_update(); + +end_unlock: + pthread_mutex_unlock(&ua_sess->lock); + +end_not_compatible: + rcu_read_unlock(); + health_code_update(); + return ret; +} + +/* + * Clear quiescent state in each stream for all applications for a + * specific UST session. + * Called with UST session lock held. + */ +static +int ust_app_clear_quiescent_session(struct ltt_ust_session *usess) + +{ + int ret = 0; + + DBG("Clearing stream quiescent state for all ust apps"); + + rcu_read_lock(); + + switch (usess->buffer_type) { + case LTTNG_BUFFER_PER_UID: + { + struct lttng_ht_iter iter; + struct buffer_reg_uid *reg; + + /* + * Clear quiescent for all per UID buffers associated to + * that session. + */ + cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) { + struct consumer_socket *socket; + struct buffer_reg_channel *reg_chan; + + /* Get associated consumer socket.*/ + socket = consumer_find_socket_by_bitness( + reg->bits_per_long, usess->consumer); + if (!socket) { + /* + * Ignore request if no consumer is found for + * the session. + */ + continue; + } + + cds_lfht_for_each_entry(reg->registry->channels->ht, + &iter.iter, reg_chan, node.node) { + /* + * The following call will print error values so + * the return code is of little importance + * because whatever happens, we have to try them + * all. + */ + (void) consumer_clear_quiescent_channel(socket, + reg_chan->consumer_key); + } + } + break; + } + case LTTNG_BUFFER_PER_PID: + { + struct ust_app_session *ua_sess; + struct lttng_ht_iter iter; + struct ust_app *app; + + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, + pid_n.node) { + ua_sess = lookup_session_by_app(usess, app); + if (ua_sess == NULL) { + continue; + } + (void) ust_app_clear_quiescent_app_session(app, + ua_sess); + } + break; + } + default: + ret = -1; + assert(0); + break; + } + + rcu_read_unlock(); + health_code_update(); + return ret; +} + /* * Destroy a specific UST session in apps. */ @@ -4705,6 +4854,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess) rcu_read_lock(); + /* + * In a start-stop-start use-case, we need to clear the quiescent state + * of each channel set by the prior stop command, thus ensuring that a + * following stop or destroy is sure to grab a timestamp_end near those + * operations, even if the packet is empty. + */ + (void) ust_app_clear_quiescent_session(usess); + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) { ret = ust_app_start_trace(usess, app); if (ret < 0) { diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 59764e105..db5cbf6a0 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -60,6 +60,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_STREAMS_SENT, LTTNG_CONSUMER_DISCARDED_EVENTS, LTTNG_CONSUMER_LOST_PACKETS, + LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL, }; /* State of each fd in consumer */ @@ -249,6 +250,25 @@ struct lttng_consumer_stream { int data_read; int hangup_flush_done; + /* + * Whether the stream is in a "complete" state (e.g. it does not have a + * partially written sub-buffer. + * + * Initialized to "false" on stream creation (first packet is empty). + * + * The various transitions of the quiescent state are: + * - On "start" tracing: set to false, since the stream is not + * "complete". + * - On "stop" tracing: if !quiescent -> flush FINAL (update + * timestamp_end), and set to true; the stream has entered a + * complete/quiescent state. + * - On "destroy" or stream/application hang-up: if !quiescent -> + * flush FINAL, and set to true. + * + * NOTE: Update and read are protected by the stream lock. + */ + bool quiescent; + /* * metadata_timer_lock protects flags waiting_on_metadata and * missed_metadata_flush. diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index e7fd69ddd..09838655b 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -500,6 +500,9 @@ struct lttcomm_consumer_msg { struct { uint64_t key; /* Channel key. */ } LTTNG_PACKED flush_channel; + struct { + uint64_t key; /* Channel key. */ + } LTTNG_PACKED clear_quiescent_channel; struct { char pathname[PATH_MAX]; /* Indicate if the snapshot goes on the relayd or locally. */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 4d1e7f155..a66f305cd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key) health_code_update(); - ustctl_flush_buffer(stream->ustream, 1); + pthread_mutex_lock(&stream->lock); + if (!stream->quiescent) { + ustctl_flush_buffer(stream->ustream, 0); + stream->quiescent = true; + } + pthread_mutex_unlock(&stream->lock); + } +error: + rcu_read_unlock(); + return ret; +} + +/* + * Clear quiescent state from channel's streams using the given key to + * retrieve the channel. + * + * Return 0 on success else an LTTng error code. + */ +static int clear_quiescent_channel(uint64_t chan_key) +{ + int ret = 0; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); + + rcu_read_lock(); + channel = consumer_find_channel(chan_key); + if (!channel) { + ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + ht = consumer_data.stream_per_chan_id_ht; + + /* For each stream of the channel id, clear quiescent state. */ + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, + &channel->key, &iter.iter, stream, node_channel_id.node) { + + health_code_update(); + + pthread_mutex_lock(&stream->lock); + stream->quiescent = false; + pthread_mutex_unlock(&stream->lock); } error: rcu_read_unlock(); @@ -1582,6 +1629,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_msg_sessiond; } + case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL: + { + int ret; + + ret = clear_quiescent_channel( + msg.u.clear_quiescent_channel.key); + if (ret != 0) { + ret_code = ret; + } + + goto end_msg_sessiond; + } case LTTNG_CONSUMER_PUSH_METADATA: { int ret; @@ -1945,14 +2004,19 @@ int lttng_ustconsumer_get_sequence_number( } /* - * Called when the stream signal the consumer that it has hang up. + * Called when the stream signals the consumer that it has hung up. */ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) { assert(stream); assert(stream->ustream); - ustctl_flush_buffer(stream->ustream, 0); + pthread_mutex_lock(&stream->lock); + if (!stream->quiescent) { + ustctl_flush_buffer(stream->ustream, 0); + stream->quiescent = true; + } + pthread_mutex_unlock(&stream->lock); stream->hangup_flush_done = 1; } -- 2.34.1