X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=5bcca4e17a3d94622f423d8a30e410e05473d741;hp=b2434eb5a78948da66555e4ce48951c1b776fede;hb=b8086166dbf40d53a4a1e3f0cddbf291fac97bd3;hpb=5c786dedd0156b93984f89ba47ec841277ed7dae diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b2434eb5a..5bcca4e17 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -230,8 +230,18 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, */ while ((ustream = ustctl_create_stream(channel->uchan, cpu))) { int wait_fd; + int ust_metadata_pipe[2]; - wait_fd = ustctl_stream_get_wait_fd(ustream); + if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) { + ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe); + if (ret < 0) { + ERR("Create ust metadata poll pipe"); + goto error; + } + wait_fd = ust_metadata_pipe[0]; + } else { + wait_fd = ustctl_stream_get_wait_fd(ustream); + } /* Allocate consumer stream object. */ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); @@ -285,6 +295,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, /* Keep stream reference when creating metadata. */ if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { channel->metadata_stream = stream; + stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0]; + stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1]; } } @@ -343,7 +355,7 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream) assert(stream); assert(sock >= 0); - DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key); + DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key); /* Send stream to session daemon. */ ret = ustctl_send_stream_to_sessiond(sock, stream->ustream); @@ -538,43 +550,6 @@ error: return ret; } -/* - * Write metadata to the given channel using ustctl to convert the string to - * the ringbuffer. - * Called only from consumer_metadata_cache_write. - * The metadata cache lock MUST be acquired to write in the cache. - * - * Return 0 on success else a negative value. - */ -int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata, - const char *metadata_str, uint64_t target_offset, uint64_t len) -{ - int ret; - - assert(metadata); - assert(metadata_str); - - DBG("UST consumer writing metadata to channel %s", metadata->name); - - if (!metadata->metadata_stream) { - ret = 0; - goto error; - } - - assert(target_offset <= metadata->metadata_cache->max_offset); - ret = ustctl_write_metadata_to_channel(metadata->uchan, - metadata_str + target_offset, len); - if (ret < 0) { - ERR("ustctl write metadata fail with ret %d, len %" PRIu64, ret, len); - goto error; - } - - ustctl_flush_buffer(metadata->metadata_stream->ustream, 1); - -error: - return ret; -} - /* * Flush channel's streams using the given key to retrieve the channel. * @@ -604,7 +579,7 @@ static int flush_channel(uint64_t chan_key) cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { - ustctl_flush_buffer(stream->ustream, 1); + ustctl_flush_buffer(stream->ustream, 1); } error: rcu_read_unlock(); @@ -638,6 +613,7 @@ static int close_metadata(uint64_t chan_key) } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; @@ -655,9 +631,17 @@ static int close_metadata(uint64_t chan_key) ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error_unlock; } + if (channel->monitor) { + /* close the read-side in consumer_del_metadata_stream */ + ret = close(channel->metadata_stream->ust_metadata_poll_pipe[1]); + if (ret < 0) { + PERROR("Close UST metadata write-side poll pipe"); + } + } } error_unlock: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); error: return ret; @@ -750,8 +734,6 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, struct lttng_consumer_local_data *ctx) { int ret = 0; - ssize_t write_len; - uint64_t total_len = 0; struct lttng_consumer_channel *metadata_channel; struct lttng_consumer_stream *metadata_stream; @@ -775,7 +757,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0); if (ret < 0) { goto error; } @@ -811,30 +793,13 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, } pthread_mutex_lock(&metadata_channel->metadata_cache->lock); - while (total_len < metadata_channel->metadata_cache->total_bytes_written) { - /* - * Write at most one packet of metadata into the channel - * to avoid blocking here. - */ - write_len = ustctl_write_one_packet_to_channel(metadata_channel->uchan, - metadata_channel->metadata_cache->data, - metadata_channel->metadata_cache->total_bytes_written); - if (write_len < 0) { - ERR("UST consumer snapshot writing metadata packet"); - ret = -1; - goto error_unlock; - } - total_len += write_len; - DBG("Written %" PRIu64 " bytes to metadata (left: %" PRIu64 ")", - write_len, - metadata_channel->metadata_cache->total_bytes_written - write_len); - ustctl_flush_buffer(metadata_stream->ustream, 1); + do { ret = lttng_consumer_read_subbuffer(metadata_stream, ctx); if (ret < 0) { goto error_unlock; } - } + } while (ret > 0); error_unlock: pthread_mutex_unlock(&metadata_channel->metadata_cache->lock); @@ -1015,7 +980,8 @@ error: * Receive the metadata updates from the sessiond. */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, - uint64_t len, struct lttng_consumer_channel *channel) + uint64_t len, struct lttng_consumer_channel *channel, + int timer) { int ret, ret_code = LTTNG_OK; char *metadata_str; @@ -1037,17 +1003,6 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, goto end_free; } - /* - * XXX: The consumer data lock is acquired before calling metadata cache - * write which calls push metadata that MUST be protected by the consumer - * lock in order to be able to check the validity of the metadata stream of - * the channel. - * - * Note that this will be subject to change to better fine grained locking - * and ultimately try to get rid of this global consumer data lock. - */ - pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&channel->metadata_cache->lock); ret = consumer_metadata_cache_write(channel, offset, len, metadata_str); if (ret < 0) { @@ -1059,13 +1014,11 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ pthread_mutex_unlock(&channel->metadata_cache->lock); - pthread_mutex_unlock(&consumer_data.lock); goto end_free; } pthread_mutex_unlock(&channel->metadata_cache->lock); - pthread_mutex_unlock(&consumer_data.lock); - while (consumer_metadata_cache_flushed(channel, offset + len)) { + while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { DBG("Waiting for metadata to be flushed"); usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); } @@ -1400,7 +1353,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttng_ustconsumer_recv_metadata(sock, key, offset, - len, channel); + len, channel, 0); if (ret < 0) { /* error receiving from sessiond */ goto error_fatal; @@ -1612,28 +1565,57 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(stream->ustream); assert(ctx); - DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd, + DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd, stream->name); /* Ease our life for what's next. */ ustream = stream->ustream; /* We can consume the 1 byte written into the wait_fd by UST */ - if (!stream->hangup_flush_done) { + if (stream->monitor && !stream->hangup_flush_done) { ssize_t readlen; do { readlen = read(stream->wait_fd, &dummy, 1); } while (readlen == -1 && errno == EINTR); - if (readlen == -1) { + if (readlen == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; goto end; } } +retry: /* Get the next subbuffer */ err = ustctl_get_next_subbuf(ustream); if (err != 0) { + /* + * Populate metadata info if the existing info has + * already been read. + */ + if (stream->metadata_flag) { + ssize_t write_len; + + if (stream->chan->metadata_cache->contiguous + == stream->ust_metadata_pushed) { + ret = 0; + goto end; + } + + write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, + &stream->chan->metadata_cache->data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contiguous + - stream->ust_metadata_pushed); + assert(write_len != 0); + if (write_len < 0) { + ERR("Writing one metadata packet"); + ret = -1; + goto end; + } + stream->ust_metadata_pushed += write_len; + ustctl_flush_buffer(stream->ustream, 1); + goto retry; + } + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, @@ -1730,15 +1712,44 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) DBG("UST consumer checking data pending"); - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret == 0) { - /* There is still data so let's put back this subbuffer. */ - ret = ustctl_put_subbuf(stream->ustream); - assert(ret == 0); - ret = 1; /* Data is pending */ + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; goto end; } + if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) { + /* + * We can simply check whether all contiguously available data + * has been pushed to the ring buffer, since the push operation + * is performed within get_next_subbuf(), and because both + * get_next_subbuf() and put_next_subbuf() are issued atomically + * thanks to the stream lock within + * lttng_ustconsumer_read_subbuffer(). This basically means that + * whetnever ust_metadata_pushed is incremented, the associated + * metadata has been consumed from the metadata stream. + */ + DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, + stream->chan->metadata_cache->contiguous, + stream->ust_metadata_pushed); + if (stream->chan->metadata_cache->contiguous + != stream->ust_metadata_pushed) { + ret = 1; /* Data is pending */ + goto end; + } + } else { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret == 0) { + /* + * There is still data so let's put back this + * subbuffer. + */ + ret = ustctl_put_subbuf(stream->ustream); + assert(ret == 0); + ret = 1; /* Data is pending */ + goto end; + } + } + /* Data is NOT pending so ready to be read. */ ret = 0; @@ -1794,8 +1805,14 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) } } +/* + * Please refer to consumer-timer.c before adding any lock within this + * function or any of its callees. Timers have a very strict locking + * semantic with respect to teardown. Failure to respect this semantic + * introduces deadlocks. + */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel) + struct lttng_consumer_channel *channel, int timer) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -1828,6 +1845,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, channel->session_id, channel->session_id_per_pid); + pthread_mutex_lock(&ctx->metadata_socket_lock); ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request)); if (ret < 0) { @@ -1882,7 +1900,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, } ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, channel); + key, offset, len, channel, timer); if (ret_code >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive @@ -1893,5 +1911,6 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, ret = 0; end: + pthread_mutex_unlock(&ctx->metadata_socket_lock); return ret; }