X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=01fca9b7fcdd8efd82f56c4fcc455aa1dc756d21;hp=5a09ff51fb751069a3ad2fdbed17e698a0630dd0;hb=b31398bb2b3fa91a53dea3b36fd693da4b50e0d3;hpb=d8ef542d25837bdfb960e5df2a91c5d18f5ef401 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 5a09ff51f..01fca9b7f 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -30,11 +30,15 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include #include "ust-consumer.h" @@ -109,13 +113,14 @@ error: */ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, const char *pathname, const char *name, uid_t uid, gid_t gid, - int relayd_id, uint64_t key, enum lttng_event_output output) + int relayd_id, uint64_t key, enum lttng_event_output output, + uint64_t tracefile_size, uint64_t tracefile_count) { assert(pathname); assert(name); return consumer_allocate_channel(key, session_id, pathname, name, uid, gid, - relayd_id, output); + relayd_id, output, tracefile_size, tracefile_count); } /* @@ -218,7 +223,9 @@ static int send_stream_to_relayd(struct lttng_consumer_stream *stream) pthread_mutex_lock(&relayd->ctrl_sock_mutex); /* Add stream on the relayd */ ret = relayd_add_stream(&relayd->control_sock, stream->name, - stream->chan->pathname, &stream->relayd_stream_id); + stream->chan->pathname, &stream->relayd_stream_id, + stream->chan->tracefile_size, + stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { goto error; @@ -271,6 +278,12 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, */ stream->wait_fd = wait_fd; + /* + * Increment channel refcount since the channel reference has now been + * assigned in the allocation process above. + */ + uatomic_inc(&stream->chan->refcount); + /* * Order is important this is why a list is used. On error, the caller * should clean this list. @@ -530,10 +543,12 @@ error: /* * Write metadata to the given channel using ustctl to convert the string to * the ringbuffer. + * Called only from consumer_metadata_cache_write. + * The metadata cache lock MUST be acquired to write in the cache. * * Return 0 on success else a negative value. */ -static int push_metadata(struct lttng_consumer_channel *metadata, +int lttng_ustconsumer_push_metadata(struct lttng_consumer_channel *metadata, const char *metadata_str, uint64_t target_offset, uint64_t len) { int ret; @@ -543,13 +558,13 @@ static int push_metadata(struct lttng_consumer_channel *metadata, DBG("UST consumer writing metadata to channel %s", metadata->name); - assert(target_offset == metadata->contig_metadata_written); - ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len); + assert(target_offset <= metadata->metadata_cache->max_offset); + ret = ustctl_write_metadata_to_channel(metadata->uchan, + metadata_str + target_offset, len); if (ret < 0) { - ERR("ustctl write metadata fail with ret %d, len %ld", ret, len); + ERR("ustctl write metadata fail with ret %d, len %" PRIu64, ret, len); goto error; } - metadata->contig_metadata_written += len; ustctl_flush_buffer(metadata->metadata_stream->ustream, 1); @@ -557,32 +572,78 @@ error: return ret; } +/* + * Flush channel's streams using the given key to retrieve the channel. + * + * Return 0 on success else an LTTng error code. + */ +static int flush_channel(uint64_t chan_key) +{ + int ret = 0; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + DBG("UST consumer flush channel key %" PRIu64, chan_key); + + rcu_read_lock(); + channel = consumer_find_channel(chan_key); + if (!channel) { + ERR("UST consumer flush channel %" PRIu64 " not found", chan_key); + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; + goto error; + } + + ht = consumer_data.stream_per_chan_id_ht; + + /* For each stream of the channel id, flush it. */ + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, + &channel->key, &iter.iter, stream, node_channel_id.node) { + ustctl_flush_buffer(stream->ustream, 1); + } +error: + rcu_read_unlock(); + return ret; +} + /* * Close metadata stream wakeup_fd using the given key to retrieve the channel. + * RCU read side lock MUST be acquired before calling this function. * * Return 0 on success else an LTTng error code. */ static int close_metadata(uint64_t chan_key) { - int ret; + int ret = 0; struct lttng_consumer_channel *channel; - DBG("UST consumer close metadata key %lu", chan_key); + DBG("UST consumer close metadata key %" PRIu64, chan_key); channel = consumer_find_channel(chan_key); if (!channel) { - ERR("UST consumer close metadata %lu not found", chan_key); + ERR("UST consumer close metadata %" PRIu64 " not found", chan_key); ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; goto error; } - ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream); - if (ret < 0) { - ERR("UST consumer unable to close fd of metadata (ret: %d)", ret); - ret = LTTCOMM_CONSUMERD_ERROR_METADATA; - goto error; + pthread_mutex_lock(&consumer_data.lock); + if (!cds_lfht_is_node_deleted(&channel->node.node)) { + if (channel->switch_timer_enabled == 1) { + DBG("Deleting timer on metadata channel"); + consumer_timer_switch_stop(channel); + } + ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream); + if (ret < 0) { + ERR("UST consumer unable to close fd of metadata (ret: %d)", ret); + ret = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto error_unlock; + } } +error_unlock: + pthread_mutex_unlock(&consumer_data.lock); error: return ret; } @@ -597,7 +658,7 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) int ret; struct lttng_consumer_channel *metadata; - DBG("UST consumer setup metadata key %lu", key); + DBG("UST consumer setup metadata key %" PRIu64, key); metadata = consumer_find_channel(key); if (!metadata) { @@ -641,6 +702,51 @@ error: return ret; } +/* + * Receive the metadata updates from the sessiond. + */ +int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, + uint64_t len, struct lttng_consumer_channel *channel) +{ + int ret, ret_code = LTTNG_OK; + char *metadata_str; + + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); + + metadata_str = zmalloc(len * sizeof(char)); + if (!metadata_str) { + PERROR("zmalloc metadata string"); + ret_code = LTTCOMM_CONSUMERD_ENOMEM; + goto end; + } + + /* Receive metadata string. */ + ret = lttcomm_recv_unix_sock(sock, metadata_str, len); + if (ret < 0) { + /* Session daemon is dead so return gracefully. */ + ret_code = ret; + goto end_free; + } + + pthread_mutex_lock(&channel->metadata_cache->lock); + ret = consumer_metadata_cache_write(channel, offset, len, metadata_str); + if (ret < 0) { + /* Unable to handle metadata. Notify session daemon. */ + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + } + pthread_mutex_unlock(&channel->metadata_cache->lock); + + while (consumer_metadata_cache_flushed(channel, offset + len)) { + DBG("Waiting for metadata to be flushed"); + usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); + } + +end_free: + free(metadata_str); +end: + return ret_code; +} + /* * Receive command from session daemon and process it. * @@ -756,7 +862,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.ask_channel.pathname, msg.u.ask_channel.name, msg.u.ask_channel.uid, msg.u.ask_channel.gid, msg.u.ask_channel.relayd_id, msg.u.ask_channel.key, - (enum lttng_event_output) msg.u.ask_channel.output); + (enum lttng_event_output) msg.u.ask_channel.output, + msg.u.ask_channel.tracefile_size, + msg.u.ask_channel.tracefile_count); if (!channel) { goto end_channel_error; } @@ -767,6 +875,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, attr.overwrite = msg.u.ask_channel.overwrite; attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval; attr.read_timer_interval = msg.u.ask_channel.read_timer_interval; + attr.chan_id = msg.u.ask_channel.chan_id; memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid)); /* Translate the event output type to UST. */ @@ -784,6 +893,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_UST_CHAN_PER_CPU: channel->type = CONSUMER_CHANNEL_TYPE_DATA; attr.type = LTTNG_UST_CHAN_PER_CPU; + /* + * Set refcount to 1 for owner. Below, we will + * pass ownership to the + * consumer_thread_channel_poll() thread. + */ + channel->refcount = 1; break; case LTTNG_UST_CHAN_METADATA: channel->type = CONSUMER_CHANNEL_TYPE_METADATA; @@ -799,13 +914,31 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_channel_error; } + if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { + ret = consumer_metadata_cache_allocate(channel); + if (ret < 0) { + ERR("Allocating metadata cache"); + goto end_channel_error; + } + consumer_timer_switch_start(channel, attr.switch_timer_interval); + attr.switch_timer_interval = 0; + } + /* * Add the channel to the internal state AFTER all streams were created * and successfully sent to session daemon. This way, all streams must * be ready before this channel is visible to the threads. + * If add_channel succeeds, ownership of the channel is + * passed to consumer_thread_channel_poll(). */ ret = add_channel(channel, ctx); if (ret < 0) { + if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) { + if (channel->switch_timer_enabled == 1) { + consumer_timer_switch_stop(channel); + } + consumer_metadata_cache_destroy(channel); + } goto end_channel_error; } @@ -833,7 +966,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer get channel key %lu not found", key); + ERR("UST consumer get channel key %" PRIu64 " not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; goto end_msg_sessiond; } @@ -880,17 +1013,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_DESTROY_CHANNEL: { uint64_t key = msg.u.destroy_channel.key; - struct lttng_consumer_channel *channel; - - channel = consumer_find_channel(key); - if (!channel) { - ERR("UST consumer get channel key %lu not found", key); - ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; - goto end_msg_sessiond; - } - - destroy_channel(channel); + /* + * Only called if streams have not been sent to stream + * manager thread. However, channel has been sent to + * channel manager thread. + */ + notify_thread_del_channel(ctx, key); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_METADATA: @@ -904,27 +1033,32 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_msg_sessiond; } + case LTTNG_CONSUMER_FLUSH_CHANNEL: + { + int ret; + + ret = flush_channel(msg.u.flush_channel.key); + if (ret != 0) { + ret_code = ret; + } + + goto end_msg_sessiond; + } case LTTNG_CONSUMER_PUSH_METADATA: { int ret; uint64_t len = msg.u.push_metadata.len; - uint64_t target_offset = msg.u.push_metadata.target_offset; uint64_t key = msg.u.push_metadata.key; + uint64_t offset = msg.u.push_metadata.target_offset; struct lttng_consumer_channel *channel; - char *metadata_str; - DBG("UST consumer push metadata key %lu of len %lu", key, len); + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, + len); channel = consumer_find_channel(key); if (!channel) { - ERR("UST consumer push metadata %lu not found", key); + ERR("UST consumer push metadata %" PRIu64 " not found", key); ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND; - } - - metadata_str = zmalloc(len * sizeof(char)); - if (!metadata_str) { - PERROR("zmalloc metadata string"); - ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto end_msg_sessiond; } @@ -940,22 +1074,15 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - /* Receive metadata string. */ - ret = lttcomm_recv_unix_sock(sock, metadata_str, len); + ret = lttng_ustconsumer_recv_metadata(sock, key, offset, + len, channel); if (ret < 0) { - /* Session daemon is dead so return gracefully. */ + /* error receiving from sessiond */ goto end_nosignal; - } - - ret = push_metadata(channel, metadata_str, target_offset, len); - free(metadata_str); - if (ret < 0) { - /* Unable to handle metadata. Notify session daemon. */ - ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + } else { + ret_code = ret; goto end_msg_sessiond; } - - goto end_msg_sessiond; } case LTTNG_CONSUMER_SETUP_METADATA: { @@ -1081,6 +1208,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) assert(chan); assert(chan->uchan); + if (chan->switch_timer_enabled == 1) { + consumer_timer_switch_stop(chan); + } + consumer_metadata_cache_destroy(chan); ustctl_destroy_channel(chan->uchan); } @@ -1089,6 +1220,9 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) assert(stream); assert(stream->ustream); + if (stream->chan->switch_timer_enabled == 1) { + consumer_timer_switch_stop(stream->chan); + } ustctl_destroy_stream(stream->ustream); } @@ -1168,46 +1302,37 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * happen and it is OK with the code flow. */ DBG("Error writing to tracefile " - "(ret: %zd != len: %lu != subbuf_size: %lu)", + "(ret: %ld != len: %lu != subbuf_size: %lu)", ret, len, subbuf_size); } err = ustctl_put_next_subbuf(ustream); assert(err == 0); + end: return ret; } /* * Called when a stream is created. + * + * Return 0 on success or else a negative value. */ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) { int ret; - char full_path[PATH_MAX]; - - /* Opening the tracefile in write mode */ - if (stream->net_seq_idx != (uint64_t) -1ULL) { - goto end; - } - - ret = snprintf(full_path, sizeof(full_path), "%s/%s", - stream->chan->pathname, stream->name); - if (ret < 0) { - PERROR("snprintf on_recv_stream"); - goto error; - } - ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC, - S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid); - if (ret < 0) { - PERROR("open stream path %s", full_path); - goto error; + /* Don't create anything if this is set for streaming. */ + if (stream->net_seq_idx == (uint64_t) -1ULL) { + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, + stream->uid, stream->gid); + if (ret < 0) { + goto error; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; } - stream->out_fd = ret; - -end: - /* we return 0 to let the library handle the FD internally */ - return 0; + ret = 0; error: return ret; @@ -1293,3 +1418,96 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) ERR("Unable to close wakeup fd"); } } + +int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_channel *channel) +{ + struct lttcomm_metadata_request_msg request; + struct lttcomm_consumer_msg msg; + enum lttng_error_code ret_code = LTTNG_OK; + uint64_t len, key, offset; + int ret; + + assert(channel); + assert(channel->metadata_cache); + + /* send the metadata request to sessiond */ + switch (consumer_data.type) { + case LTTNG_CONSUMER64_UST: + request.bits_per_long = 64; + break; + case LTTNG_CONSUMER32_UST: + request.bits_per_long = 32; + break; + default: + request.bits_per_long = 0; + break; + } + + request.session_id = channel->session_id; + request.uid = channel->uid; + request.key = channel->key; + DBG("Sending metadata request to sessiond, session %" PRIu64, + channel->session_id); + + ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, + sizeof(request)); + if (ret < 0) { + ERR("Asking metadata to sessiond"); + goto end; + } + + /* Receive the metadata from sessiond */ + ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, + sizeof(msg)); + if (ret != sizeof(msg)) { + DBG("Consumer received unexpected message size %d (expects %zu)", + ret, sizeof(msg)); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + /* + * The ret value might 0 meaning an orderly shutdown but this is ok + * since the caller handles this. + */ + goto end; + } + + if (msg.cmd_type == LTTNG_ERR_UND) { + /* No registry found */ + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, + ret_code); + ret = 0; + goto end; + } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) { + ERR("Unexpected cmd_type received %d", msg.cmd_type); + ret = -1; + goto end; + } + + len = msg.u.push_metadata.len; + key = msg.u.push_metadata.key; + offset = msg.u.push_metadata.target_offset; + + assert(key == channel->key); + if (len == 0) { + DBG("No new metadata to receive for key %" PRIu64, key); + } + + /* Tell session daemon we are ready to receive the metadata. */ + ret = consumer_send_status_msg(ctx->consumer_metadata_socket, + LTTNG_OK); + if (ret < 0 || len == 0) { + /* + * Somehow, the session daemon is not responding anymore or there is + * nothing to receive. + */ + goto end; + } + + ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, + key, offset, len, channel); + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code); + ret = 0; + +end: + return ret; +}