X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer-stream.c;h=a62cef272294d360953e5d685fca745d7509bcff;hp=24f1b8a42d14c580c013f3f38bac9b9576f3c89a;hb=890d8fe47755c3bad936389cf48ffa141cff41c9;hpb=51230d709a394904ee9c449c26d645e737c4af94 diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 24f1b8a42..a62cef272 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -17,14 +17,18 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include +#include #include #include #include +#include +#include #include #include +#include #include "consumer-stream.h" @@ -57,8 +61,10 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, assert(stream); assert(relayd); - uatomic_dec(&relayd->refcount); - assert(uatomic_read(&relayd->refcount) >= 0); + if (stream->sent_to_relayd) { + uatomic_dec(&relayd->refcount); + assert(uatomic_read(&relayd->refcount) >= 0); + } /* Closing streams requires to lock the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -80,6 +86,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } + stream->net_seq_idx = (uint64_t) -1ULL; + stream->sent_to_relayd = 0; } /* @@ -110,12 +118,37 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) if (ret) { PERROR("close"); } + stream->wait_fd = -1; + } + if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) { + utils_close_pipe(stream->splice_pipe); } break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_del_stream(stream); + { + /* + * Special case for the metadata since the wait fd is an internal pipe + * polled in the metadata thread. + */ + if (stream->metadata_flag && stream->chan->monitor) { + int rpipe = stream->ust_metadata_poll_pipe[0]; + + /* + * This will stop the channel timer if one and close the write side + * of the metadata poll pipe. + */ + lttng_ustconsumer_close_metadata(stream->chan); + if (rpipe >= 0) { + ret = close(rpipe); + if (ret < 0) { + PERROR("closing metadata pipe read side"); + } + stream->ust_metadata_poll_pipe[0] = -1; + } + } break; + } default: ERR("Unknown consumer_data type"); assert(0); @@ -127,6 +160,15 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) if (ret) { PERROR("close"); } + stream->out_fd = -1; + } + + if (stream->index_fd >= 0) { + ret = close(stream->index_fd); + if (ret) { + PERROR("close stream index_fd"); + } + stream->index_fd = -1; } /* Check and cleanup relayd if needed. */ @@ -151,6 +193,8 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, struct lttng_ht_iter iter; assert(stream); + /* Should NEVER be called not in monitor mode. */ + assert(stream->chan->monitor); rcu_read_lock(); @@ -177,9 +221,11 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream, rcu_read_unlock(); - /* Decrement the stream count of the global consumer data. */ - assert(consumer_data.stream_count > 0); - consumer_data.stream_count--; + if (!stream->metadata_flag) { + /* Decrement the stream count of the global consumer data. */ + assert(consumer_data.stream_count > 0); + consumer_data.stream_count--; + } } /* @@ -193,29 +239,51 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) } /* - * Destroy a stream completely. This will delete, close and free the stream. - * Once return, the stream is NO longer usable. Its channel may get destroyed - * if conditions are met. - * - * This MUST be called WITHOUT the consumer data and stream lock acquired. + * Destroy the stream's buffers of the tracer. */ -void consumer_stream_destroy(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) { - struct lttng_consumer_channel *free_chan = NULL; - assert(stream); - DBG("Consumer stream destroy - wait_fd: %d", stream->wait_fd); + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_del_stream(stream); + break; + default: + ERR("Unknown consumer_data type"); + assert(0); + } +} - pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&stream->lock); +/* + * Destroy and close a already created stream. + */ +static void destroy_close_stream(struct lttng_consumer_stream *stream) +{ + assert(stream); - /* Remove every reference of the stream in the consumer. */ - consumer_stream_delete(stream, ht); + DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key); + /* Destroy tracer buffers of the stream. */ + consumer_stream_destroy_buffers(stream); /* Close down everything including the relayd if one. */ consumer_stream_close(stream); +} + +/* + * Decrement the stream's channel refcount and if down to 0, return the channel + * pointer so it can be destroyed by the caller or NULL if not. + */ +static struct lttng_consumer_channel *unref_channel( + struct lttng_consumer_stream *stream) +{ + struct lttng_consumer_channel *free_chan = NULL; + + assert(stream); + assert(stream->chan); /* Update refcount of channel and see if we need to destroy it. */ if (!uatomic_sub_return(&stream->chan->refcount, 1) @@ -223,16 +291,264 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, free_chan = stream->chan; } - /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; + return free_chan; +} + +/* + * Destroy a stream completely. This will delete, close and free the stream. + * Once return, the stream is NO longer usable. Its channel may get destroyed + * if conditions are met for a monitored stream. + * + * This MUST be called WITHOUT the consumer data and stream lock acquired if + * the stream is in _monitor_ mode else it does not matter. + */ +void consumer_stream_destroy(struct lttng_consumer_stream *stream, + struct lttng_ht *ht) +{ + assert(stream); + + /* Stream is in monitor mode. */ + if (stream->monitor) { + struct lttng_consumer_channel *free_chan = NULL; - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&consumer_data.lock); + /* + * This means that the stream was successfully removed from the streams + * list of the channel and sent to the right thread managing this + * stream thus being globally visible. + */ + if (stream->globally_visible) { + pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->lock); + /* Remove every reference of the stream in the consumer. */ + consumer_stream_delete(stream, ht); + + destroy_close_stream(stream); + + /* Update channel's refcount of the stream. */ + free_chan = unref_channel(stream); + + /* Indicates that the consumer data state MUST be updated after this. */ + consumer_data.need_update = 1; + + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); + pthread_mutex_unlock(&consumer_data.lock); + } else { + /* + * If the stream is not visible globally, this needs to be done + * outside of the consumer data lock section. + */ + free_chan = unref_channel(stream); + } - if (free_chan) { - consumer_del_channel(free_chan); + if (free_chan) { + consumer_del_channel(free_chan); + } + } else { + destroy_close_stream(stream); } /* Free stream within a RCU call. */ consumer_stream_free(stream); } + +/* + * Write index of a specific stream either on the relayd or local disk. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_write_index(struct lttng_consumer_stream *stream, + struct ctf_packet_index *index) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(index); + + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_send_index(&relayd->control_sock, index, + stream->relayd_stream_id, stream->next_net_seq_num - 1); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ssize_t size_ret; + + size_ret = index_write(stream->index_fd, index, + sizeof(struct ctf_packet_index)); + if (size_ret < sizeof(struct ctf_packet_index)) { + ret = -1; + } else { + ret = 0; + } + } + if (ret < 0) { + goto error; + } + +error: + rcu_read_unlock(); + return ret; +} + +/* + * Actually do the metadata sync using the given metadata stream. + * + * Return 0 on success else a negative value. ENODATA can be returned also + * indicating that there is no metadata available for that stream. + */ +static int do_sync_metadata(struct lttng_consumer_stream *metadata, + struct lttng_consumer_local_data *ctx) +{ + int ret; + + assert(metadata); + assert(metadata->metadata_flag); + assert(ctx); + + /* + * In UST, since we have to write the metadata from the cache packet + * by packet, we might need to start this procedure multiple times + * until all the metadata from the cache has been extracted. + */ + do { + /* + * Steps : + * - Lock the metadata stream + * - Check if metadata stream node was deleted before locking. + * - if yes, release and return success + * - Check if new metadata is ready (flush + snapshot pos) + * - If nothing : release and return. + * - Lock the metadata_rdv_lock + * - Unlock the metadata stream + * - cond_wait on metadata_rdv to wait the wakeup from the + * metadata thread + * - Unlock the metadata_rdv_lock + */ + pthread_mutex_lock(&metadata->lock); + + /* + * There is a possibility that we were able to acquire a reference on the + * stream from the RCU hash table but between then and now, the node might + * have been deleted just before the lock is acquired. Thus, after locking, + * we make sure the metadata node has not been deleted which means that the + * buffers are closed. + * + * In that case, there is no need to sync the metadata hence returning a + * success return code. + */ + ret = cds_lfht_is_node_deleted(&metadata->node.node); + if (ret) { + ret = 0; + goto end_unlock_mutex; + } + + switch (ctx->type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Empty the metadata cache and flush the current stream. + */ + ret = lttng_kconsumer_sync_metadata(metadata); + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Ask the sessiond if we have new metadata waiting and update the + * consumer metadata cache. + */ + ret = lttng_ustconsumer_sync_metadata(ctx, metadata); + break; + default: + assert(0); + ret = -1; + break; + } + /* + * Error or no new metadata, we exit here. + */ + if (ret <= 0 || ret == ENODATA) { + goto end_unlock_mutex; + } + + /* + * At this point, new metadata have been flushed, so we wait on the + * rendez-vous point for the metadata thread to wake us up when it + * finishes consuming the metadata and continue execution. + */ + + pthread_mutex_lock(&metadata->metadata_rdv_lock); + + /* + * Release metadata stream lock so the metadata thread can process it. + */ + pthread_mutex_unlock(&metadata->lock); + + /* + * Wait on the rendez-vous point. Once woken up, it means the metadata was + * consumed and thus synchronization is achieved. + */ + pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); + pthread_mutex_unlock(&metadata->metadata_rdv_lock); + } while (ret == EAGAIN); + + /* Success */ + return 0; + +end_unlock_mutex: + pthread_mutex_unlock(&metadata->lock); + return ret; +} + +/* + * Synchronize the metadata using a given session ID. A successful acquisition + * of a metadata stream will trigger a request to the session daemon and a + * snapshot so the metadata thread can consume it. + * + * This function call is a rendez-vous point between the metadata thread and + * the data thread. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, + uint64_t session_id) +{ + int ret; + struct lttng_consumer_stream *stream = NULL; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + + assert(ctx); + + /* Ease our life a bit. */ + ht = consumer_data.stream_list_ht; + + rcu_read_lock(); + + /* Search the metadata associated with the session id of the given stream. */ + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, + &session_id, &iter.iter, stream, node_session_id.node) { + if (!stream->metadata_flag) { + continue; + } + + ret = do_sync_metadata(stream, ctx); + if (ret < 0) { + goto end; + } + } + + /* + * Force return code to 0 (success) since ret might be ENODATA for instance + * which is not an error but rather that we should come back. + */ + ret = 0; + +end: + rcu_read_unlock(); + return ret; +}