X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-stream.c;h=a62cef272294d360953e5d685fca745d7509bcff;hp=422dd0daa55929a7058e163977bcc05d7bba371e;hb=890d8fe47755c3bad936389cf48ffa141cff41c9;hpb=50adc26400482c07210afcda8ef1d3322f75871d diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 422dd0daa..a62cef272 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -17,7 +17,7 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -28,6 +28,7 @@ #include #include #include +#include #include "consumer-stream.h" @@ -119,10 +120,35 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) } stream->wait_fd = -1; } + if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) { + utils_close_pipe(stream->splice_pipe); + } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: + { + /* + * Special case for the metadata since the wait fd is an internal pipe + * polled in the metadata thread. + */ + if (stream->metadata_flag && stream->chan->monitor) { + int rpipe = stream->ust_metadata_poll_pipe[0]; + + /* + * This will stop the channel timer if one and close the write side + * of the metadata poll pipe. + */ + lttng_ustconsumer_close_metadata(stream->chan); + if (rpipe >= 0) { + ret = close(rpipe); + if (ret < 0) { + PERROR("closing metadata pipe read side"); + } + stream->ust_metadata_poll_pipe[0] = -1; + } + } break; + } default: ERR("Unknown consumer_data type"); assert(0); @@ -195,9 +221,11 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, rcu_read_unlock(); - /* Decrement the stream count of the global consumer data. */ - assert(consumer_data.stream_count > 0); - consumer_data.stream_count--; + if (!stream->metadata_flag) { + /* Decrement the stream count of the global consumer data. */ + assert(consumer_data.stream_count > 0); + consumer_data.stream_count--; + } } /* @@ -342,8 +370,10 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_index(&relayd->control_sock, index, stream->relayd_stream_id, stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ssize_t size_ret; @@ -365,45 +395,20 @@ error: } /* - * Synchronize the metadata using a given session ID. A successful acquisition - * of a metadata stream will trigger a request to the session daemon and a - * snapshot so the metadata thread can consume it. - * - * This function call is a rendez-vous point between the metadata thread and - * the data thread. + * Actually do the metadata sync using the given metadata stream. * - * Return 0 on success or else a negative value. + * Return 0 on success else a negative value. ENODATA can be returned also + * indicating that there is no metadata available for that stream. */ -int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, - uint64_t session_id) +static int do_sync_metadata(struct lttng_consumer_stream *metadata, + struct lttng_consumer_local_data *ctx) { int ret; - struct lttng_consumer_stream *metadata = NULL, *stream = NULL; - struct lttng_ht_iter iter; - struct lttng_ht *ht; + assert(metadata); + assert(metadata->metadata_flag); assert(ctx); - /* Ease our life a bit. */ - ht = consumer_data.stream_list_ht; - - rcu_read_lock(); - - /* Search the metadata associated with the session id of the given stream. */ - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, - &session_id, &iter.iter, stream, node_session_id.node) { - if (stream->metadata_flag) { - metadata = stream; - break; - } - } - if (!metadata) { - ret = 0; - goto end_unlock_rcu; - } - /* * In UST, since we have to write the metadata from the cache packet * by packet, we might need to start this procedure multiple times @@ -489,12 +494,61 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, pthread_mutex_unlock(&metadata->metadata_rdv_lock); } while (ret == EAGAIN); - ret = 0; - goto end_unlock_rcu; + /* Success */ + return 0; end_unlock_mutex: pthread_mutex_unlock(&metadata->lock); -end_unlock_rcu: + return ret; +} + +/* + * Synchronize the metadata using a given session ID. A successful acquisition + * of a metadata stream will trigger a request to the session daemon and a + * snapshot so the metadata thread can consume it. + * + * This function call is a rendez-vous point between the metadata thread and + * the data thread. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, + uint64_t session_id) +{ + int ret; + struct lttng_consumer_stream *stream = NULL; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + + assert(ctx); + + /* Ease our life a bit. */ + ht = consumer_data.stream_list_ht; + + rcu_read_lock(); + + /* Search the metadata associated with the session id of the given stream. */ + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, + &session_id, &iter.iter, stream, node_session_id.node) { + if (!stream->metadata_flag) { + continue; + } + + ret = do_sync_metadata(stream, ctx); + if (ret < 0) { + goto end; + } + } + + /* + * Force return code to 0 (success) since ret might be ENODATA for instance + * which is not an error but rather that we should come back. + */ + ret = 0; + +end: rcu_read_unlock(); return ret; }