X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=1a7de1cfd59cf0a095f80794e0ff96d462386a93;hb=683d081a7f3734fcb5c8dd4424b0aa102117d1a0;hp=6d6690a32161a4cbc24e05157f31407fe9c1df74;hpb=f5ba75b4f0c0b44092c76bc931b25b24a2e62718;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6d6690a32..1a7de1cfd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -78,6 +78,11 @@ static void destroy_channel(struct lttng_consumer_channel *channel) lttng_ustconsumer_del_channel(channel); lttng_ustconsumer_free_channel(channel); } + + if (channel->trace_chunk) { + lttng_trace_chunk_put(channel->trace_chunk); + } + free(channel); } @@ -404,7 +409,8 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, } return run_as_open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - session_credentials->uid, session_credentials->gid); + lttng_credentials_get_uid(session_credentials), + lttng_credentials_get_gid(session_credentials)); error_shm_path: return -1; @@ -482,8 +488,10 @@ error_open: ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials))); if (closeret) { PERROR("unlink %s", shm_path); } @@ -492,8 +500,10 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - channel->buffer_credentials.value.uid, - channel->buffer_credentials.value.gid, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); @@ -1456,8 +1466,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct ustctl_consumer_channel_attr attr; const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value; const struct lttng_credentials buffer_credentials = { - .uid = msg.u.ask_channel.buffer_credentials.uid, - .gid = msg.u.ask_channel.buffer_credentials.gid, + .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.ask_channel.buffer_credentials.gid), }; /* Create a plain object and reserve a channel key. */ @@ -2067,8 +2077,8 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = msg.u.create_trace_chunk.credentials.value.uid, - .gid = msg.u.create_trace_chunk.credentials.value.gid, + .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), }; const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; @@ -2176,6 +2186,28 @@ end_rotate_channel_nosignal: msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + /* + * The channel could have disappeared in per-pid + * buffering mode. + */ + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } default: break; } @@ -2375,8 +2407,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ERR("Cannot get stream shm path"); } ret = run_as_unlink(shm_path, - chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials))); if (ret) { PERROR("unlink %s", shm_path); } @@ -2395,8 +2429,10 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { (void) run_as_rmdir_recursive(chan->root_shm_path, - chan->buffer_credentials.value.uid, - chan->buffer_credentials.value.gid, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); @@ -2430,20 +2466,19 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) } static -void metadata_stream_reset_cache(struct lttng_consumer_stream *stream) +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) { DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; - stream->metadata_version = stream->chan->metadata_cache->version; - stream->reset_metadata_flag = 1; } /* * Write up to one packet from the metadata cache to the channel. * - * Returns the number of bytes pushed in the cache, or a negative value - * on error. + * Returns the number of bytes pushed from the cache into the ring buffer, or a + * negative value on error. */ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) @@ -2452,10 +2487,41 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->max_offset - == stream->ust_metadata_pushed) { - ret = 0; - goto end; + if (stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed) { + /* + * In the context of a user space metadata channel, a + * change in version can be detected in two ways: + * 1) During the pre-consume of the `read_subbuffer` loop, + * 2) When populating the metadata ring buffer (i.e. here). + * + * This function is invoked when there is no metadata + * available in the ring-buffer. If all data was consumed + * up to the size of the metadata cache, there is no metadata + * to insert in the ring-buffer. + * + * However, the metadata version could still have changed (a + * regeneration without any new data will yield the same cache + * size). + * + * The cache's version is checked for a version change and the + * consumed position is reset if one occurred. + * + * This check is only necessary for the user space domain as + * it has to manage the cache explicitly. If this reset was not + * performed, no metadata would be consumed (and no reset would + * occur as part of the pre-consume) until the metadata size + * exceeded the cache size. + */ + if (stream->metadata_version != + stream->chan->metadata_cache->version) { + metadata_stream_reset_cache_consumed_position(stream); + consumer_stream_metadata_set_version(stream, + stream->chan->metadata_cache->version); + } else { + ret = 0; + goto end; + } } write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, @@ -2496,15 +2562,13 @@ end: * awaiting on metadata to be pushed out. * * The RCU read side lock must be held by the caller. - * - * Return 0 if new metadatda is available, EAGAIN if the metadata stream - * is empty or a negative value on error. */ -int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, +enum sync_metadata_status lttng_ustconsumer_sync_metadata( + struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *metadata_stream) { int ret; - int retry = 0; + enum sync_metadata_status status; struct lttng_consumer_channel *metadata_channel; assert(ctx); @@ -2519,6 +2583,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { + status = SYNC_METADATA_STATUS_ERROR; goto end; } @@ -2536,38 +2601,30 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, if (consumer_stream_is_deleted(metadata_stream)) { DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", metadata_stream->key); - ret = 0; + status = SYNC_METADATA_STATUS_NO_DATA; goto end; } ret = commit_one_metadata_packet(metadata_stream); - if (ret <= 0) { + if (ret < 0) { + status = SYNC_METADATA_STATUS_ERROR; goto end; } else if (ret > 0) { - retry = 1; + status = SYNC_METADATA_STATUS_NEW_DATA; + } else /* ret == 0 */ { + status = SYNC_METADATA_STATUS_NO_DATA; + goto end; } ret = ustctl_snapshot(metadata_stream->ustream); if (ret < 0) { - if (errno != EAGAIN) { - ERR("Sync metadata, taking UST snapshot"); - goto end; - } - DBG("No new metadata when syncing them."); - /* No new metadata, exit. */ - ret = ENODATA; + ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret); + status = SYNC_METADATA_STATUS_ERROR; goto end; } - /* - * After this flush, we still need to extract metadata. - */ - if (retry) { - ret = EAGAIN; - } - end: - return ret; + return status; } /* @@ -2680,7 +2737,7 @@ static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - subbuf->info.metadata.version = stream->chan->metadata_cache->version; + subbuf->info.metadata.version = stream->metadata_version; end: return ret; @@ -2924,7 +2981,7 @@ static int lttng_ustconsumer_set_stream_ops( stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; ret = consumer_stream_enable_metadata_bucketization(