X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=431b94626736d042d92db89db0774e7e941cc3e7;hp=06b59c58442e4a833b3b0947aa3d34244b28a305;hb=331744e34f56a5aec69b05d356d6901e67926acc;hpb=31fa4745f181bd1bdbceb89fbe27e130f5b4e2b9 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 06b59c584..431b94626 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -30,11 +30,14 @@ #include #include #include +#include #include #include #include #include +#include +#include #include "ust-consumer.h" @@ -530,10 +533,12 @@ error: /* * Write metadata to the given channel using ustctl to convert the string to * the ringbuffer. + * Called only from consumer_metadata_cache_write. + * The metadata cache lock MUST be acquired to write in the cache. * * Return 0 on success else a negative value. */ -static int push_metadata(struct lttng_consumer_channel *metadata, +int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata, const char *metadata_str, uint64_t target_offset, uint64_t len) { int ret; @@ -543,13 +548,13 @@ static int push_metadata(struct lttng_consumer_channel *metadata, DBG("UST consumer writing metadata to channel %s", metadata->name); - assert(target_offset == metadata->contig_metadata_written); - ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len); + assert(target_offset <= metadata->metadata_cache->max_offset); + ret = ustctl_write_metadata_to_channel(metadata->uchan, + metadata_str + target_offset, len); if (ret < 0) { ERR("ustctl write metadata fail with ret %d, len %ld", ret, len); goto error; } - metadata->contig_metadata_written += len; ustctl_flush_buffer(metadata->metadata_stream->ustream, 1); @@ -619,6 +624,11 @@ static int close_metadata(uint64_t chan_key) ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error; } + if (channel->switch_timer_enabled == 1) { + DBG("Deleting timer on metadata channel"); + consumer_timer_switch_stop(channel); + } + consumer_metadata_cache_destroy(channel); error: return ret; @@ -678,6 +688,51 @@ error: return ret; } +/* + * Receive the metadata updates from the sessiond. + */ +int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, + uint64_t len, struct lttng_consumer_channel *channel) +{ + int ret, ret_code = LTTNG_OK; + char *metadata_str; + + DBG("UST consumer push metadata key %lu of len %lu", key, len); + + metadata_str = zmalloc(len * sizeof(char)); + if (!metadata_str) { + PERROR("zmalloc metadata string"); + ret_code = LTTCOMM_CONSUMERD_ENOMEM; + goto end; + } + + /* Receive metadata string. */ + ret = lttcomm_recv_unix_sock(sock, metadata_str, len); + if (ret < 0) { + /* Session daemon is dead so return gracefully. */ + ret_code = ret; + goto end_free; + } + + pthread_mutex_lock(&channel->metadata_cache->lock); + ret = consumer_metadata_cache_write(channel, offset, len, metadata_str); + if (ret < 0) { + /* Unable to handle metadata. Notify session daemon. */ + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + } + pthread_mutex_unlock(&channel->metadata_cache->lock); + + while (consumer_metadata_cache_flushed(channel, offset + len)) { + DBG("Waiting for metadata to be flushed"); + usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); + } + +end_free: + free(metadata_str); +end: + return ret_code; +} + /* * Receive command from session daemon and process it. * @@ -847,6 +902,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_channel_error; } + /* * Channel and streams are now created. Inform the session daemon that * everything went well and should wait to receive the channel and @@ -861,6 +917,16 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { + ret = consumer_metadata_cache_allocate(channel); + if (ret < 0) { + ERR("Allocating metadata cache"); + goto end_channel_error; + } + consumer_timer_switch_start(channel, attr.switch_timer_interval); + attr.switch_timer_interval = 0; + } + break; } case LTTNG_CONSUMER_GET_CHANNEL: @@ -957,10 +1023,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret; uint64_t len = msg.u.push_metadata.len; - uint64_t target_offset = msg.u.push_metadata.target_offset; uint64_t key = msg.u.push_metadata.key; + uint64_t offset = msg.u.push_metadata.target_offset; struct lttng_consumer_channel *channel; - char *metadata_str; DBG("UST consumer push metadata key %lu of len %lu", key, len); @@ -968,14 +1033,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (!channel) { ERR("UST consumer push metadata %lu not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; - goto end_msg_sessiond; - } - - metadata_str = zmalloc(len * sizeof(char)); - if (!metadata_str) { - PERROR("zmalloc metadata string"); - ret_code = LTTCOMM_CONSUMERD_ENOMEM; - goto end_msg_sessiond; } /* Tell session daemon we are ready to receive the metadata. */ @@ -990,22 +1047,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - /* Receive metadata string. */ - ret = lttcomm_recv_unix_sock(sock, metadata_str, len); + ret = lttng_ustconsumer_recv_metadata(sock, key, offset, + len, channel); if (ret < 0) { - /* Session daemon is dead so return gracefully. */ + /* error receiving from sessiond */ goto end_nosignal; - } - - ret = push_metadata(channel, metadata_str, target_offset, len); - free(metadata_str); - if (ret < 0) { - /* Unable to handle metadata. Notify session daemon. */ - ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + } else { + ret_code = ret; goto end_msg_sessiond; } - - goto end_msg_sessiond; } case LTTNG_CONSUMER_SETUP_METADATA: { @@ -1223,6 +1273,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } err = ustctl_put_next_subbuf(ustream); assert(err == 0); + end: return ret; } @@ -1343,3 +1394,96 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) ERR("Unable to close wakeup fd"); } } + +int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *channel) +{ + struct lttcomm_metadata_request_msg request; + struct lttcomm_consumer_msg msg; + enum lttng_error_code ret_code = LTTNG_OK; + uint64_t len, key, offset; + int ret; + + assert(channel); + assert(channel->metadata_cache); + + /* send the metadata request to sessiond */ + switch (consumer_data.type) { + case LTTNG_CONSUMER64_UST: + request.bits_per_long = 64; + break; + case LTTNG_CONSUMER32_UST: + request.bits_per_long = 32; + break; + default: + request.bits_per_long = 0; + break; + } + + request.session_id = channel->session_id; + request.uid = channel->uid; + request.key = channel->key; + DBG("Sending metadata request to sessiond, session %" PRIu64, + channel->session_id); + + ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, + sizeof(request)); + if (ret < 0) { + ERR("Asking metadata to sessiond"); + goto end; + } + + /* Receive the metadata from sessiond */ + ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, + sizeof(msg)); + if (ret != sizeof(msg)) { + DBG("Consumer received unexpected message size %d (expects %lu)", + ret, sizeof(msg)); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + /* + * The ret value might 0 meaning an orderly shutdown but this is ok + * since the caller handles this. + */ + goto end; + } + + if (msg.cmd_type == LTTNG_ERR_UND) { + /* No registry found */ + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, + ret_code); + ret = 0; + goto end; + } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) { + ERR("Unexpected cmd_type received %d", msg.cmd_type); + ret = -1; + goto end; + } + + len = msg.u.push_metadata.len; + key = msg.u.push_metadata.key; + offset = msg.u.push_metadata.target_offset; + + assert(key == channel->key); + if (len == 0) { + DBG("No new metadata to receive for key %" PRIu64, key); + } + + /* Tell session daemon we are ready to receive the metadata. */ + ret = consumer_send_status_msg(ctx->consumer_metadata_socket, + LTTNG_OK); + if (ret < 0 || len == 0) { + /* + * Somehow, the session daemon is not responding anymore or there is + * nothing to receive. + */ + goto end; + } + + ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, + key, offset, len, channel); + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code); + ret = 0; + +end: + return ret; +}