X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=3f40d03ed3d846cb8fa95788c94bf555be5fb28f;hb=32670d719327feb585374283a50eeb76ce36b962;hp=766c9d30c7ff27f1a7ff796043f24aaf2a3ad90a;hpb=64803277bbdbe0a943360d918298a48157d9da55;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 766c9d30c..3f40d03ed 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -8,101 +8,63 @@ */ #define _LGPL_SOURCE +#include "ust-consumer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include + +#include +#include +#include #include #include +#include +#include +#include #include #include #include #include #include #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ust-consumer.hpp" - -#define INT_MAX_STR_LEN 12 /* includes \0 */ +#define INT_MAX_STR_LEN 12 /* includes \0 */ extern struct lttng_consumer_global_data the_consumer_data; extern int consumer_poll_timeout; LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE(); -/* - * Free channel object and all streams associated with it. This MUST be used - * only and only if the channel has _NEVER_ been added to the global channel - * hash table. - */ -static void destroy_channel(struct lttng_consumer_channel *channel) -{ - struct lttng_consumer_stream *stream, *stmp; - - LTTNG_ASSERT(channel); - - DBG("UST consumer cleaning stream list"); - - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - - health_code_update(); - - cds_list_del_init(&stream->send_node); - lttng_ust_ctl_destroy_stream(stream->ustream); - lttng_trace_chunk_put(stream->trace_chunk); - free(stream); - } - - /* - * If a channel is available meaning that was created before the streams - * were, delete it. - */ - if (channel->uchan) { - lttng_ustconsumer_del_channel(channel); - lttng_ustconsumer_free_channel(channel); - } - - if (channel->trace_chunk) { - lttng_trace_chunk_put(channel->trace_chunk); - } - - free(channel); -} - /* * Add channel to internal consumer state. * * Returns 0 on success or else a negative value. */ static int add_channel(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret = 0; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - if (ctx->on_recv_channel != NULL) { + if (ctx->on_recv_channel != nullptr) { ret = ctx->on_recv_channel(channel); if (ret == 0) { ret = consumer_add_channel(channel, ctx); @@ -127,29 +89,30 @@ error: * * Return NULL on error else the newly allocated stream object. */ -static struct lttng_consumer_stream *allocate_stream(int cpu, int key, - struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *_alloc_ret) +static struct lttng_consumer_stream *allocate_stream(int cpu, + int key, + struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx, + int *_alloc_ret) { int alloc_ret; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - stream = consumer_stream_create( - channel, - channel->key, - key, - channel->name, - channel->relayd_id, - channel->session_id, - channel->trace_chunk, - cpu, - &alloc_ret, - channel->type, - channel->monitor); - if (stream == NULL) { + stream = consumer_stream_create(channel, + channel->key, + key, + channel->name, + channel->relayd_id, + channel->session_id, + channel->trace_chunk, + cpu, + &alloc_ret, + channel->type, + channel->monitor); + if (stream == nullptr) { switch (alloc_ret) { case -ENOENT: /* @@ -182,7 +145,7 @@ error: * Returns 0 on success else a negative value. */ static int send_stream_to_thread(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; struct lttng_pipe *stream_pipe; @@ -205,11 +168,12 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, stream->globally_visible = 1; cds_list_del_init(&stream->send_node); - ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); + ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a + pointer. */ if (ret < 0) { ERR("Consumer write %s stream to pipe %d", - stream->metadata_flag ? "metadata" : "data", - lttng_pipe_get_writefd(stream_pipe)); + stream->metadata_flag ? "metadata" : "data", + lttng_pipe_get_writefd(stream_pipe)); if (stream->metadata_flag) { consumer_del_stream_for_metadata(stream); } else { @@ -222,10 +186,9 @@ error: return ret; } -static -int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) +static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) { - char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ + char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ int ret; strncpy(stream_shm_path, shm_path, PATH_MAX); @@ -235,8 +198,7 @@ int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) PERROR("snprintf"); goto end; } - strncat(stream_shm_path, cpu_nr, - PATH_MAX - strlen(stream_shm_path) - 1); + strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1); ret = 0; end: return ret; @@ -249,12 +211,12 @@ end: * Return 0 on success else a negative value. */ static int create_ust_streams(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret, cpu = 0; struct lttng_ust_ctl_consumer_stream *ustream; struct lttng_consumer_stream *stream; - pthread_mutex_t *current_stream_lock = NULL; + pthread_mutex_t *current_stream_lock = nullptr; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); @@ -309,11 +271,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, */ cds_list_add_tail(&stream->send_node, &channel->streams.head); - ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, - &stream->max_sb_size); + ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size); if (ret < 0) { - ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", - stream->name); + ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name); goto error; } @@ -326,7 +286,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64, - stream->name, stream->key, stream->relayd_stream_id); + stream->name, + stream->key, + stream->relayd_stream_id); /* Set next CPU stream. */ channel->streams.count = ++cpu; @@ -337,12 +299,12 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, if (channel->monitor) { /* Set metadata poll pipe if we created one */ memcpy(stream->ust_metadata_poll_pipe, - ust_metadata_pipe, - sizeof(ust_metadata_pipe)); + ust_metadata_pipe, + sizeof(ust_metadata_pipe)); } } pthread_mutex_unlock(&stream->lock); - current_stream_lock = NULL; + current_stream_lock = nullptr; } return 0; @@ -355,8 +317,9 @@ error_alloc: return ret; } -static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, - const struct lttng_credentials *session_credentials) +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, + int cpu, + const struct lttng_credentials *session_credentials) { char shm_path[PATH_MAX]; int ret; @@ -369,9 +332,10 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, goto error_shm_path; } return run_as_open(shm_path, - O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - lttng_credentials_get_uid(session_credentials), - lttng_credentials_get_gid(session_credentials)); + O_RDWR | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR, + lttng_credentials_get_uid(session_credentials), + lttng_credentials_get_gid(session_credentials)); error_shm_path: return -1; @@ -384,8 +348,8 @@ error_shm_path: * Return 0 on success or else a negative value. */ static int create_ust_channel(struct lttng_consumer_channel *channel, - struct lttng_ust_ctl_consumer_channel_attr *attr, - struct lttng_ust_ctl_consumer_channel **ust_chanp) + struct lttng_ust_ctl_consumer_channel_attr *attr, + struct lttng_ust_ctl_consumer_channel **ust_chanp) { int ret, nr_stream_fds, i, j; int *stream_fds; @@ -397,11 +361,16 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel->buffer_credentials.is_set); DBG3("Creating channel to ustctl with attr: [overwrite: %d, " - "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " - "switch_timer_interval: %u, read_timer_interval: %u, " - "output: %d, type: %d", attr->overwrite, attr->subbuf_size, - attr->num_subbuf, attr->switch_timer_interval, - attr->read_timer_interval, attr->output, attr->type); + "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " + "switch_timer_interval: %u, read_timer_interval: %u, " + "output: %d, type: %d", + attr->overwrite, + attr->subbuf_size, + attr->num_subbuf, + attr->switch_timer_interval, + attr->read_timer_interval, + attr->output, + attr->type); if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) nr_stream_fds = 1; @@ -413,8 +382,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, goto error_alloc; } for (i = 0; i < nr_stream_fds; i++) { - stream_fds[i] = open_ust_stream_fd(channel, i, - &channel->buffer_credentials.value); + stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value); if (stream_fds[i] < 0) { ret = -1; goto error_open; @@ -443,16 +411,15 @@ error_open: if (channel->shm_path[0]) { char shm_path[PATH_MAX]; - closeret = get_stream_shm_path(shm_path, - channel->shm_path, j); + closeret = get_stream_shm_path(shm_path, channel->shm_path, j); if (closeret) { ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials))); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials))); if (closeret) { PERROR("unlink %s", shm_path); } @@ -461,11 +428,11 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -502,8 +469,9 @@ error: * Return 0 on success or else a negative value. */ static int send_channel_to_sessiond_and_relayd(int sock, - struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *relayd_error) + struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx, + int *relayd_error) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; @@ -516,13 +484,13 @@ static int send_channel_to_sessiond_and_relayd(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); if (channel->relayd_id != (uint64_t) -1ULL) { - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Try to send the stream to the relayd if one is available. */ DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd", - stream->key, channel->name); + stream->key, + channel->name); ret = consumer_send_relayd_stream(stream, stream->chan->pathname); if (ret < 0) { /* @@ -562,8 +530,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* The channel was sent successfully to the sessiond at this point. */ - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Send stream to session daemon. */ @@ -574,7 +541,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* Tell sessiond there is no more stream. */ - ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL); + ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr); if (ret < 0) { goto error; } @@ -599,8 +566,8 @@ error: * MUST be destroyed by consumer_del_channel(). */ static int ask_channel(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, - struct lttng_ust_ctl_consumer_channel_attr *attr) + struct lttng_consumer_channel *channel, + struct lttng_ust_ctl_consumer_channel_attr *attr) { int ret; @@ -658,7 +625,7 @@ end: * On error, return a negative value else 0 on success. */ static int send_streams_to_thread(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *stream, *stmp; @@ -667,9 +634,7 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, LTTNG_ASSERT(ctx); /* Send streams to the corresponding thread. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - + cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { health_code_update(); /* Sending the stream to the thread. */ @@ -702,7 +667,7 @@ static int flush_channel(uint64_t chan_key) DBG("UST consumer flush channel key %" PRIu64, chan_key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer flush channel %" PRIu64 " not found", chan_key); @@ -714,9 +679,13 @@ static int flush_channel(uint64_t chan_key) /* For each stream of the channel id, flush it. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, - &channel->key, &iter.iter, stream, node_channel_id.node) { - + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -731,19 +700,28 @@ static int flush_channel(uint64_t chan_key) if (!stream->quiescent) { ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0); if (ret) { - ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 ", channel name = '%s'", - chan_key, channel->name); + ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 + ", channel name = '%s'", + chan_key, + channel->name); ret = LTTNG_ERR_BUFFER_FLUSH_FAILED; pthread_mutex_unlock(&stream->lock); goto error; } stream->quiescent = true; } -next: + next: pthread_mutex_unlock(&stream->lock); } + + /* + * Send one last buffer statistics update to the session daemon. This + * ensures that the session daemon gets at least one statistics update + * per channel even in the case of short-lived channels, such as when a + * short-lived app is traced in per-pid mode. + */ + sample_and_send_channel_buffer_stats(channel); error: - rcu_read_unlock(); return ret; } @@ -763,7 +741,7 @@ static int clear_quiescent_channel(uint64_t chan_key) DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); @@ -775,9 +753,13 @@ static int clear_quiescent_channel(uint64_t chan_key) /* For each stream of the channel id, clear quiescent state. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, - &channel->key, &iter.iter, stream, node_channel_id.node) { - + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -785,7 +767,6 @@ static int clear_quiescent_channel(uint64_t chan_key) pthread_mutex_unlock(&stream->lock); } error: - rcu_read_unlock(); return ret; } @@ -909,14 +890,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) /* Send metadata stream to relayd if needed. */ if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { - ret = consumer_send_relayd_stream(metadata->metadata_stream, - metadata->pathname); + ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname); if (ret < 0) { ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error; } - ret = consumer_send_relayd_streams_sent( - metadata->metadata_stream->net_seq_idx); + ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx); if (ret < 0) { ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; @@ -949,8 +928,10 @@ error: * the stream is still in the local stream list of the channel. This call * will make sure to clean that list. */ - consumer_stream_destroy(metadata->metadata_stream, NULL); - metadata->metadata_stream = NULL; + consumer_stream_destroy(metadata->metadata_stream, nullptr); + metadata->metadata_stream = nullptr; + metadata->metadata_pushed_wait_queue.wake_all(); + send_streams_error: error_no_stream: end: @@ -964,8 +945,10 @@ end: * Returns 0 on success, < 0 on error */ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, - uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) + uint64_t key, + char *path, + uint64_t relayd_id, + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *metadata_stream; @@ -974,10 +957,9 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, LTTNG_ASSERT(ctx); ASSERT_RCU_READ_LOCKED(); - DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", - key, path); + DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(!metadata_channel->monitor); @@ -987,7 +969,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1); if (ret < 0) { goto error; } @@ -1011,8 +993,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); } else { - ret = consumer_stream_create_output_files(metadata_stream, - false); + ret = consumer_stream_create_output_files(metadata_stream, false); } if (ret < 0) { goto error_stream; @@ -1032,17 +1013,15 @@ error_stream: * Clean up the stream completely because the next snapshot will use a * new metadata stream. */ - consumer_stream_destroy(metadata_stream, NULL); - metadata_channel->metadata_stream = NULL; + consumer_stream_destroy(metadata_stream, nullptr); + metadata_channel->metadata_stream = nullptr; + metadata_channel->metadata_pushed_wait_queue.wake_all(); error: - rcu_read_unlock(); return ret; } -static -int get_current_subbuf_addr(struct lttng_consumer_stream *stream, - const char **addr) +static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr) { int ret; unsigned long mmap_offset; @@ -1050,8 +1029,7 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream, mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream); if (!mmap_base) { - ERR("Failed to get mmap base for stream `%s`", - stream->name); + ERR("Failed to get mmap base for stream `%s`", stream->name); ret = -EPERM; goto error; } @@ -1066,7 +1044,6 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream, *addr = mmap_base + mmap_offset; error: return ret; - } /* @@ -1076,9 +1053,11 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(struct lttng_consumer_channel *channel, - uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream, - struct lttng_consumer_local_data *ctx) + uint64_t key, + char *path, + uint64_t relayd_id, + uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -1089,7 +1068,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(ctx); ASSERT_RCU_READ_LOCKED(); - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; if (relayd_id != (uint64_t) -1ULL) { use_relayd = 1; @@ -1098,7 +1077,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(!channel->monitor); DBG("UST consumer snapshot channel %" PRIu64, key); - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Lock stream because we are about to change its state. */ @@ -1121,16 +1100,14 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { - goto error_unlock; + goto error_close_stream; } } else { - ret = consumer_stream_create_output_files(stream, - false); + ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto error_unlock; + goto error_close_stream; } - DBG("UST consumer snapshot stream (%" PRIu64 ")", - stream->key); + DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key); } /* @@ -1140,8 +1117,10 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (!stream->quiescent) { ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0); if (ret < 0) { - ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 ", channel name = '%s'", - channel->key, channel->name); + ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 + ", channel name = '%s'", + channel->key, + channel->name); goto error_unlock; } } @@ -1149,19 +1128,19 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking UST snapshot"); - goto error_unlock; + goto error_close_stream; } ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced UST snapshot position"); - goto error_unlock; + goto error_close_stream; } ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd UST snapshot position"); - goto error_unlock; + goto error_close_stream; } /* @@ -1170,9 +1149,8 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, * daemon should never send a maximum stream size that is lower than * subbuffer size. */ - consumed_pos = consumer_get_consume_start_pos(consumed_pos, - produced_pos, nb_packets_per_stream, - stream->max_sb_size); + consumed_pos = consumer_get_consume_start_pos( + consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size); while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; @@ -1213,10 +1191,9 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_put_subbuf; } - subbuf_view = lttng_buffer_view_init( - subbuf_addr, 0, padded_len); + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap( - stream, &subbuf_view, padded_len - len); + stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1238,11 +1215,10 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, } /* Simply close the stream so we can use it on the next snapshot. */ - consumer_stream_close(stream); + consumer_stream_close_output(stream); pthread_mutex_unlock(&stream->lock); } - rcu_read_unlock(); return 0; error_put_subbuf: @@ -1250,21 +1226,17 @@ error_put_subbuf: ERR("Snapshot lttng_ust_ctl_put_subbuf"); } error_close_stream: - consumer_stream_close(stream); + consumer_stream_close_output(stream); error_unlock: pthread_mutex_unlock(&stream->lock); - rcu_read_unlock(); return ret; } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) +static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream) { ASSERT_LOCKED(stream->lock); - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); + DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; } @@ -1276,9 +1248,14 @@ void metadata_stream_reset_cache_consumed_position( * the metadata cache flush to concurrently progress in order to * complete. */ -int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, - uint64_t len, uint64_t version, - struct lttng_consumer_channel *channel, int timer, int wait) +int lttng_ustconsumer_recv_metadata(int sock, + uint64_t key, + uint64_t offset, + uint64_t len, + uint64_t version, + struct lttng_consumer_channel *channel, + bool invoked_by_timer, + int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; @@ -1307,8 +1284,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, pthread_mutex_lock(&channel->metadata_cache->lock); cache_write_status = consumer_metadata_cache_write( - channel->metadata_cache, offset, len, version, - metadata_str); + channel->metadata_cache, offset, len, version, metadata_str); pthread_mutex_unlock(&channel->metadata_cache->lock); switch (cache_write_status) { case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: @@ -1330,10 +1306,9 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * channel is under a snapshot session type. No need to update * the stream position in that scenario. */ - if (channel->metadata_stream != NULL) { + if (channel->metadata_stream != nullptr) { pthread_mutex_lock(&channel->metadata_stream->lock); - metadata_stream_reset_cache_consumed_position( - channel->metadata_stream); + metadata_stream_reset_cache_consumed_position(channel->metadata_stream); pthread_mutex_unlock(&channel->metadata_stream->lock); } else { /* Validate we are in snapshot mode. */ @@ -1367,13 +1342,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, if (!wait) { goto end_free; } - while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { - DBG("Waiting for metadata to be flushed"); - - health_code_update(); - usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); - } + consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer); end_free: free(metadata_str); @@ -1387,12 +1357,13 @@ end: * Return 1 on success else a negative value or 0. */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll) + int sock, + struct pollfd *consumer_sockpoll) { int ret_func; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; - struct lttng_consumer_channel *channel = NULL; + struct lttng_consumer_channel *channel = nullptr; health_code_update(); @@ -1402,14 +1373,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret_recv != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", - ret_recv, sizeof(msg)); + ret_recv, + sizeof(msg)); /* * The ret value might 0 meaning an orderly shutdown but this is ok * since the caller handles this. */ if (ret_recv > 0) { - lttng_consumer_send_error(ctx, - LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret_recv = -1; } return ret_recv; @@ -1424,7 +1395,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* relayd needs RCU read-side lock */ - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: @@ -1432,15 +1403,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, uint32_t major = msg.u.relayd_sock.major; uint32_t minor = msg.u.relayd_sock.minor; enum lttcomm_sock_proto protocol = - (enum lttcomm_sock_proto) msg.u.relayd_sock - .relayd_socket_protocol; + (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol; /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, - consumer_sockpoll, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id, major, - minor, protocol); + msg.u.relayd_sock.type, + ctx, + sock, + consumer_sockpoll, + msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, + major, + minor, + protocol); goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_RELAYD: @@ -1452,7 +1427,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(index); - if (relayd == NULL) { + if (relayd == nullptr) { DBG("Unable to find relayd %" PRIu64, index); ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } @@ -1475,7 +1450,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { - rcu_read_unlock(); return -ENOSYS; } case LTTNG_CONSUMER_DATA_PENDING: @@ -1489,11 +1463,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, is_data_pending = consumer_data_pending(id); /* Send back returned value to session daemon */ - ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, - sizeof(is_data_pending)); + ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending)); if (ret_send < 0) { - DBG("Error when sending the data pending ret code: %zd", - ret_send); + DBG("Error when sending the data pending ret code: %zd", ret_send); goto error_fatal; } @@ -1515,28 +1487,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Create a plain object and reserve a channel key. */ channel = consumer_allocate_channel( - msg.u.ask_channel.key, - msg.u.ask_channel.session_id, - msg.u.ask_channel.chunk_id.is_set ? - &chunk_id : NULL, - msg.u.ask_channel.pathname, - msg.u.ask_channel.name, - msg.u.ask_channel.relayd_id, - (enum lttng_event_output) msg.u.ask_channel.output, - msg.u.ask_channel.tracefile_size, - msg.u.ask_channel.tracefile_count, - msg.u.ask_channel.session_id_per_pid, - msg.u.ask_channel.monitor, - msg.u.ask_channel.live_timer_interval, - msg.u.ask_channel.is_live, - msg.u.ask_channel.root_shm_path, - msg.u.ask_channel.shm_path); + msg.u.ask_channel.key, + msg.u.ask_channel.session_id, + msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr, + msg.u.ask_channel.pathname, + msg.u.ask_channel.name, + msg.u.ask_channel.relayd_id, + (enum lttng_event_output) msg.u.ask_channel.output, + msg.u.ask_channel.tracefile_size, + msg.u.ask_channel.tracefile_count, + msg.u.ask_channel.session_id_per_pid, + msg.u.ask_channel.monitor, + msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.is_live, + msg.u.ask_channel.root_shm_path, + msg.u.ask_channel.shm_path); if (!channel) { goto end_channel_error; } - LTTNG_OPTIONAL_SET(&channel->buffer_credentials, - buffer_credentials); + LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials); /* * Assign UST application UID to the channel. This value is ignored for @@ -1553,7 +1523,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, attr.read_timer_interval = msg.u.ask_channel.read_timer_interval; attr.chan_id = msg.u.ask_channel.chan_id; memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid)); - attr.blocking_timeout= msg.u.ask_channel.blocking_timeout; + attr.blocking_timeout = msg.u.ask_channel.blocking_timeout; /* Match channel buffer type to the UST abi. */ switch (msg.u.ask_channel.output) { @@ -1594,8 +1564,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) { int ret_allocate; - ret_allocate = consumer_metadata_cache_allocate( - channel); + ret_allocate = consumer_metadata_cache_allocate(channel); if (ret_allocate < 0) { ERR("Allocating metadata cache"); goto end_channel_error; @@ -1605,11 +1574,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { int monitor_start_ret; - consumer_timer_live_start(channel, - msg.u.ask_channel.live_timer_interval); + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); monitor_start_ret = consumer_timer_monitor_start( - channel, - msg.u.ask_channel.monitor_timer_interval); + channel, msg.u.ask_channel.monitor_timer_interval); if (monitor_start_ret < 0) { ERR("Starting channel monitoring timer failed"); goto end_channel_error; @@ -1675,8 +1642,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* Send the channel to sessiond (and relayd, if applicable). */ - ret = send_channel_to_sessiond_and_relayd( - sock, found_channel, ctx, &relayd_err); + ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err); if (ret < 0) { if (relayd_err) { /* @@ -1714,11 +1680,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } /* List MUST be empty after or else it could be reused. */ LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head)); -end_get_channel: + end_get_channel: goto end_msg_sessiond; -error_get_channel_fatal: + error_get_channel_fatal: goto error_fatal; -end_get_channel_nosignal: + end_get_channel_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_CHANNEL: @@ -1759,8 +1725,7 @@ end_get_channel_nosignal: { int ret; - ret = clear_quiescent_channel( - msg.u.clear_quiescent_channel.key); + ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key); if (ret != 0) { ret_code = (lttcomm_return_code) ret; } @@ -1776,8 +1741,7 @@ end_get_channel_nosignal: uint64_t version = msg.u.push_metadata.version; struct lttng_consumer_channel *found_channel; - DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, - len); + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); found_channel = consumer_find_channel(key); if (!found_channel) { @@ -1823,8 +1787,8 @@ end_get_channel_nosignal: health_code_update(); - ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len, - version, found_channel, 0, 1); + ret = lttng_ustconsumer_recv_metadata( + sock, key, offset, len, version, found_channel, false, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -1832,9 +1796,9 @@ end_get_channel_nosignal: ret_code = (lttcomm_return_code) ret; goto end_push_metadata_msg_sessiond; } -end_push_metadata_msg_sessiond: + end_push_metadata_msg_sessiond: goto end_msg_sessiond; -error_push_metadata_fatal: + error_push_metadata_fatal: goto error_fatal; } case LTTNG_CONSUMER_SETUP_METADATA: @@ -1862,10 +1826,10 @@ error_push_metadata_fatal: int ret_snapshot; ret_snapshot = snapshot_metadata(found_channel, - key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - ctx); + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + ctx); if (ret_snapshot < 0) { ERR("Snapshot metadata failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1873,13 +1837,13 @@ error_push_metadata_fatal: } else { int ret_snapshot; - ret_snapshot = snapshot_channel(found_channel, - key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel - .nb_packets_per_stream, - ctx); + ret_snapshot = snapshot_channel( + found_channel, + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.nb_packets_per_stream, + ctx); if (ret_snapshot < 0) { ERR("Snapshot channel failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1905,9 +1869,7 @@ error_push_metadata_fatal: uint64_t id = msg.u.discarded_events.session_id; uint64_t key = msg.u.discarded_events.channel_key; - DBG("UST consumer discarded events command for session id %" - PRIu64, id); - rcu_read_lock(); + DBG("UST consumer discarded events command for session id %" PRIu64, id); pthread_mutex_lock(&the_consumer_data.lock); ht = the_consumer_data.stream_list_ht; @@ -1921,19 +1883,24 @@ error_push_metadata_fatal: */ discarded_events = 0; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, &id, - &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, + &id, + &iter.iter, + stream, + node_session_id.node) + { if (stream->chan->key == key) { discarded_events = stream->chan->discarded_events; break; } } pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); - DBG("UST consumer discarded events command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("UST consumer discarded events command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); health_code_update(); @@ -1948,7 +1915,7 @@ error_push_metadata_fatal: } case LTTNG_CONSUMER_LOST_PACKETS: { - int ret; + int ret; uint64_t lost_packets; struct lttng_ht_iter iter; struct lttng_ht *ht; @@ -1956,9 +1923,7 @@ error_push_metadata_fatal: uint64_t id = msg.u.lost_packets.session_id; uint64_t key = msg.u.lost_packets.channel_key; - DBG("UST consumer lost packets command for session id %" - PRIu64, id); - rcu_read_lock(); + DBG("UST consumer lost packets command for session id %" PRIu64, id); pthread_mutex_lock(&the_consumer_data.lock); ht = the_consumer_data.stream_list_ht; @@ -1969,27 +1934,31 @@ error_push_metadata_fatal: * to extract the information we need, we default to 0 if not * found (no packets lost if the channel is not yet in use). */ - lost_packets = 0; + lost_packets = 0; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, &id, - &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, + &id, + &iter.iter, + stream, + node_session_id.node) + { if (stream->chan->key == key) { - lost_packets = stream->chan->lost_packets; + lost_packets = stream->chan->lost_packets; break; } } pthread_mutex_unlock(&the_consumer_data.lock); - rcu_read_unlock(); - DBG("UST consumer lost packets command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("UST consumer lost packets command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); health_code_update(); /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &lost_packets, - sizeof(lost_packets)); + ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets)); if (ret < 0) { PERROR("send lost packets"); goto error_fatal; @@ -1999,8 +1968,7 @@ error_push_metadata_fatal: } case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: { - int channel_monitor_pipe, ret_send, - ret_set_channel_monitor_pipe; + int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe; ssize_t ret_recv; ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -2010,8 +1978,7 @@ error_push_metadata_fatal: goto error_fatal; } - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &channel_monitor_pipe, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1); if (ret_recv != sizeof(channel_monitor_pipe)) { ERR("Failed to receive channel monitor pipe"); goto error_fatal; @@ -2019,8 +1986,7 @@ error_push_metadata_fatal: DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); ret_set_channel_monitor_pipe = - consumer_timer_thread_set_channel_monitor_pipe( - channel_monitor_pipe); + consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe); if (!ret_set_channel_monitor_pipe) { int flags; int ret_fcntl; @@ -2034,8 +2000,7 @@ error_push_metadata_fatal: } flags = ret_fcntl; - ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, - flags | O_NONBLOCK); + ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK); if (ret_fcntl == -1) { PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); goto error_fatal; @@ -2064,8 +2029,7 @@ error_push_metadata_fatal: * this channel. */ rotate_channel = lttng_consumer_rotate_channel( - found_channel, key, - msg.u.rotate_channel.relayd_id); + found_channel, key, msg.u.rotate_channel.relayd_id); if (rotate_channel < 0) { ERR("Rotate channel failed"); ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; @@ -2091,14 +2055,13 @@ error_push_metadata_fatal: int ret_rotate_read_streams; ret_rotate_read_streams = - lttng_consumer_rotate_ready_streams( - found_channel, key); + lttng_consumer_rotate_ready_streams(found_channel, key); if (ret_rotate_read_streams < 0) { ERR("Rotate channel failed"); } } break; -end_rotate_channel_nosignal: + end_rotate_channel_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_CLEAR_CHANNEL: @@ -2114,8 +2077,7 @@ end_rotate_channel_nosignal: } else { int ret_clear_channel; - ret_clear_channel = lttng_consumer_clear_channel( - found_channel); + ret_clear_channel = lttng_consumer_clear_channel(found_channel); if (ret_clear_channel) { ERR("Clear channel failed key %" PRIu64, key); ret_code = (lttcomm_return_code) ret_clear_channel; @@ -2133,9 +2095,12 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_INIT: { int ret_send_status; + lttng_uuid sessiond_uuid; - ret_code = lttng_consumer_init_command(ctx, - msg.u.init.sessiond_uuid); + std::copy(std::begin(msg.u.init.sessiond_uuid), + std::end(msg.u.init.sessiond_uuid), + sessiond_uuid.begin()); + ret_code = lttng_consumer_init_command(ctx, sessiond_uuid); health_code_update(); ret_send_status = consumer_send_status_msg(sock, ret_code); if (ret_send_status < 0) { @@ -2147,18 +2112,17 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), + .uid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.gid), }; - const bool is_local_trace = - !msg.u.create_trace_chunk.relayd_id.is_set; - const uint64_t relayd_id = - msg.u.create_trace_chunk.relayd_id.value; - const char *chunk_override_name = - *msg.u.create_trace_chunk.override_name ? - msg.u.create_trace_chunk.override_name : - NULL; - struct lttng_directory_handle *chunk_directory_handle = NULL; + const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + nullptr; + struct lttng_directory_handle *chunk_directory_handle = nullptr; /* * The session daemon will only provide a chunk directory file @@ -2170,8 +2134,7 @@ end_rotate_channel_nosignal: ssize_t ret_recv; /* Acnowledge the reception of the command. */ - ret_send_status = consumer_send_status_msg( - sock, LTTCOMM_CONSUMERD_SUCCESS); + ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret_send_status < 0) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; @@ -2180,17 +2143,15 @@ end_rotate_channel_nosignal: /* * Receive trace chunk domain dirfd. */ - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &chunk_dirfd, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); if (ret_recv != sizeof(chunk_dirfd)) { ERR("Failed to receive trace chunk domain directory file descriptor"); goto error_fatal; } - DBG("Received trace chunk domain directory fd (%d)", - chunk_dirfd); - chunk_directory_handle = lttng_directory_handle_create_from_dirfd( - chunk_dirfd); + DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd); + chunk_directory_handle = + lttng_directory_handle_create_from_dirfd(chunk_dirfd); if (!chunk_directory_handle) { ERR("Failed to initialize chunk domain directory handle from directory file descriptor"); if (close(chunk_dirfd)) { @@ -2201,48 +2162,39 @@ end_rotate_channel_nosignal: } ret_code = lttng_consumer_create_trace_chunk( - !is_local_trace ? &relayd_id : NULL, - msg.u.create_trace_chunk.session_id, - msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk - .creation_timestamp, - chunk_override_name, - msg.u.create_trace_chunk.credentials.is_set ? - &credentials : - NULL, - chunk_directory_handle); + !is_local_trace ? &relayd_id : nullptr, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk.creation_timestamp, + chunk_override_name, + msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr, + chunk_directory_handle); lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { enum lttng_trace_chunk_command_type close_command = - (lttng_trace_chunk_command_type) - msg.u.close_trace_chunk.close_command.value; - const uint64_t relayd_id = - msg.u.close_trace_chunk.relayd_id.value; + (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value; + const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; struct lttcomm_consumer_close_trace_chunk_reply reply; char closed_trace_chunk_path[LTTNG_PATH_MAX] = {}; int ret; ret_code = lttng_consumer_close_trace_chunk( - msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : - NULL, - msg.u.close_trace_chunk.session_id, - msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp, - msg.u.close_trace_chunk.close_command.is_set ? - &close_command : - NULL, closed_trace_chunk_path); + msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr, + closed_trace_chunk_path); reply.ret_code = ret_code; reply.path_length = strlen(closed_trace_chunk_path) + 1; ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); if (ret != sizeof(reply)) { goto error_fatal; } - ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, - reply.path_length); + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length); if (ret != reply.path_length) { goto error_fatal; } @@ -2250,26 +2202,22 @@ end_rotate_channel_nosignal: } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { - const uint64_t relayd_id = - msg.u.trace_chunk_exists.relayd_id.value; + const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value; ret_code = lttng_consumer_trace_chunk_exists( - msg.u.trace_chunk_exists.relayd_id.is_set ? - &relayd_id : NULL, - msg.u.trace_chunk_exists.session_id, - msg.u.trace_chunk_exists.chunk_id); + msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: { const uint64_t key = msg.u.open_channel_packets.key; - struct lttng_consumer_channel *found_channel = - consumer_find_channel(key); + struct lttng_consumer_channel *found_channel = consumer_find_channel(key); if (found_channel) { pthread_mutex_lock(&found_channel->lock); - ret_code = lttng_consumer_open_channel_packets( - found_channel); + ret_code = lttng_consumer_open_channel_packets(found_channel); pthread_mutex_unlock(&found_channel->lock); } else { /* @@ -2315,17 +2263,13 @@ end_msg_sessiond: end_channel_error: if (channel) { - /* - * Free channel here since no one has a reference to it. We don't - * free after that because a stream can store this pointer. - */ - destroy_channel(channel); + consumer_del_channel(channel); } /* We have to send a status channel message indicating an error. */ { int ret_send_status; - ret_send_status = consumer_send_status_channel(sock, NULL); + ret_send_status = consumer_send_status_channel(sock, nullptr); if (ret_send_status < 0) { /* Stop everything if session daemon can not be notified. */ goto error_fatal; @@ -2341,13 +2285,11 @@ error_fatal: goto end; end: - rcu_read_unlock(); health_code_update(); return ret_func; } -int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, - int producer_active) +int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2373,8 +2315,7 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream) * * Returns 0 on success, < 0 on error. */ -int lttng_ustconsumer_sample_snapshot_positions( - struct lttng_consumer_stream *stream) +int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2387,8 +2328,8 @@ int lttng_ustconsumer_sample_snapshot_positions( * * Returns 0 on success, < 0 on error */ -int lttng_ustconsumer_get_produced_snapshot( - struct lttng_consumer_stream *stream, unsigned long *pos) +int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2402,8 +2343,8 @@ int lttng_ustconsumer_get_produced_snapshot( * * Returns 0 on success, < 0 on error */ -int lttng_ustconsumer_get_consumed_snapshot( - struct lttng_consumer_stream *stream, unsigned long *pos) +int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2412,8 +2353,7 @@ int lttng_ustconsumer_get_consumed_snapshot( return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos); } -int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, - int producer) +int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2429,8 +2369,7 @@ int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream) return lttng_ust_ctl_clear_buffer(stream->ustream); } -int lttng_ustconsumer_get_current_timestamp( - struct lttng_consumer_stream *stream, uint64_t *ts) +int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2439,8 +2378,7 @@ int lttng_ustconsumer_get_current_timestamp( return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts); } -int lttng_ustconsumer_get_sequence_number( - struct lttng_consumer_stream *stream, uint64_t *seq) +int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2465,8 +2403,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) stream->quiescent = true; } } - pthread_mutex_unlock(&stream->lock); + stream->hangup_flush_done = 1; + pthread_mutex_unlock(&stream->lock); } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) @@ -2495,10 +2434,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ERR("Cannot get stream shm path"); } ret = run_as_unlink(shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials))); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials))); if (ret) { PERROR("unlink %s", shm_path); } @@ -2516,12 +2455,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) lttng_ust_ctl_destroy_channel(chan->uchan); /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { - (void) run_as_rmdir_recursive(chan->root_shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); + (void) run_as_rmdir_recursive( + chan->root_shm_path, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)), + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -2559,15 +2497,13 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) * Returns the number of bytes pushed from the cache into the ring buffer, or a * negative value on error. */ -static -int commit_one_metadata_packet(struct lttng_consumer_stream *stream) +static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) { ssize_t write_len; int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->contents.size == - stream->ust_metadata_pushed) { + if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) { /* * In the context of a user space metadata channel, a * change in version can be detected in two ways: @@ -2592,31 +2528,31 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) * occur as part of the pre-consume) until the metadata size * exceeded the cache size. */ - if (stream->metadata_version != - stream->chan->metadata_cache->version) { + if (stream->metadata_version != stream->chan->metadata_cache->version) { metadata_stream_reset_cache_consumed_position(stream); consumer_stream_metadata_set_version(stream, - stream->chan->metadata_cache->version); + stream->chan->metadata_cache->version); } else { ret = 0; goto end; } } - write_len = lttng_ust_ctl_write_one_packet_to_channel(stream->chan->uchan, - &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contents.size - - stream->ust_metadata_pushed); + write_len = lttng_ust_ctl_write_one_packet_to_channel( + stream->chan->uchan, + &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed); LTTNG_ASSERT(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); ret = write_len; goto end; } + stream->ust_metadata_pushed += write_len; + stream->chan->metadata_pushed_wait_queue.wake_all(); - LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= - stream->ust_metadata_pushed); + LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; /* @@ -2635,7 +2571,6 @@ end: return ret; } - /* * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. @@ -2646,9 +2581,9 @@ end: * * The RCU read side lock must be held by the caller. */ -enum sync_metadata_status lttng_ustconsumer_sync_metadata( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *metadata_stream) +enum sync_metadata_status +lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *metadata_stream) { int ret; enum sync_metadata_status status; @@ -2664,7 +2599,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { status = SYNC_METADATA_STATUS_ERROR; @@ -2684,7 +2619,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( */ if (consumer_stream_is_deleted(metadata_stream)) { DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", - metadata_stream->key); + metadata_stream->key); status = SYNC_METADATA_STATUS_NO_DATA; goto end; } @@ -2702,7 +2637,8 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( ret = lttng_ust_ctl_snapshot(metadata_stream->ustream); if (ret < 0) { - ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret); + ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", + ret); status = SYNC_METADATA_STATUS_ERROR; goto end; } @@ -2715,7 +2651,7 @@ end: * Return 0 on success else a negative value. */ static int notify_if_more_data(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; struct lttng_ust_ctl_consumer_stream *ustream; @@ -2791,18 +2727,17 @@ static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream) } static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; - ret = lttng_ust_ctl_get_subbuf_size( - stream->ustream, &subbuf->info.data.subbuf_size); + ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size); if (ret) { goto end; } - ret = lttng_ust_ctl_get_padded_subbuf_size( - stream->ustream, &subbuf->info.data.padded_subbuf_size); + ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, + &subbuf->info.data.padded_subbuf_size); if (ret) { goto end; } @@ -2812,7 +2747,7 @@ end: } static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; @@ -2828,7 +2763,7 @@ end: } static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; @@ -2837,43 +2772,40 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - ret = lttng_ust_ctl_get_packet_size( - stream->ustream, &subbuf->info.data.packet_size); + ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size); if (ret < 0) { PERROR("Failed to get sub-buffer packet size"); goto end; } - ret = lttng_ust_ctl_get_content_size( - stream->ustream, &subbuf->info.data.content_size); + ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size); if (ret < 0) { PERROR("Failed to get sub-buffer content size"); goto end; } - ret = lttng_ust_ctl_get_timestamp_begin( - stream->ustream, &subbuf->info.data.timestamp_begin); + ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream, + &subbuf->info.data.timestamp_begin); if (ret < 0) { PERROR("Failed to get sub-buffer begin timestamp"); goto end; } - ret = lttng_ust_ctl_get_timestamp_end( - stream->ustream, &subbuf->info.data.timestamp_end); + ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end); if (ret < 0) { PERROR("Failed to get sub-buffer end timestamp"); goto end; } - ret = lttng_ust_ctl_get_events_discarded( - stream->ustream, &subbuf->info.data.events_discarded); + ret = lttng_ust_ctl_get_events_discarded(stream->ustream, + &subbuf->info.data.events_discarded); if (ret) { PERROR("Failed to get sub-buffer events discarded count"); goto end; } ret = lttng_ust_ctl_get_sequence_number(stream->ustream, - &subbuf->info.data.sequence_number.value); + &subbuf->info.data.sequence_number.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -2884,15 +2816,14 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, subbuf->info.data.sequence_number.is_set = true; } - ret = lttng_ust_ctl_get_stream_id( - stream->ustream, &subbuf->info.data.stream_id); + ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id); if (ret < 0) { PERROR("Failed to get stream id"); goto end; } ret = lttng_ust_ctl_get_instance_id(stream->ustream, - &subbuf->info.data.stream_instance_id.value); + &subbuf->info.data.stream_instance_id.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -2907,13 +2838,12 @@ end: } static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) + struct stream_subbuffer *subbuffer) { int ret; const char *addr; - ret = stream->read_subbuffer_ops.extract_subbuffer_info( - stream, subbuffer); + ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer); if (ret) { goto end; } @@ -2923,16 +2853,15 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, goto end; } - subbuffer->buffer.buffer = lttng_buffer_view_init( - addr, 0, subbuffer->info.data.padded_subbuf_size); - LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL); + subbuffer->buffer.buffer = + lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); + LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr); end: return ret; } -static enum get_next_subbuffer_status get_next_subbuffer( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; enum get_next_subbuffer_status status; @@ -2943,7 +2872,7 @@ static enum get_next_subbuffer_status get_next_subbuffer( status = GET_NEXT_SUBBUFFER_STATUS_OK; break; case -ENODATA: - case -EAGAIN: + case -EAGAIN: /* * The caller only expects -ENODATA when there is no data to * read, but the kernel tracer returns -EAGAIN when there is @@ -2967,9 +2896,9 @@ end: return status; } -static enum get_next_subbuffer_status get_next_subbuffer_metadata( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status +get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; bool cache_empty; @@ -3012,7 +2941,7 @@ static enum get_next_subbuffer_status get_next_subbuffer_metadata( } else { pthread_mutex_lock(&stream->chan->metadata_cache->lock); cache_empty = stream->chan->metadata_cache->contents.size == - stream->ust_metadata_pushed; + stream->ust_metadata_pushed; pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } } while (!got_subbuffer); @@ -3067,7 +2996,7 @@ end: } static int put_next_subbuffer(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer __attribute__((unused))) + struct stream_subbuffer *subbuffer __attribute__((unused))) { const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream); @@ -3076,42 +3005,35 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream, } static int signal_metadata(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx __attribute__((unused))) + struct lttng_consumer_local_data *ctx __attribute__((unused))) { ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static int lttng_ustconsumer_set_stream_ops( - struct lttng_consumer_stream *stream) +static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream) { int ret = 0; stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_metadata; - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_metadata_subbuffer_info; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache_consumed_position; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; - ret = consumer_stream_enable_metadata_bucketization( - stream); + ret = consumer_stream_enable_metadata_bucketization(stream); if (ret) { goto end; } } } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer; - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_data_subbuffer_info; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info; stream->read_subbuffer_ops.on_sleep = notify_if_more_data; if (stream->chan->is_live) { - stream->read_subbuffer_ops.send_live_beacon = - consumer_flush_ust_index; + stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index; } } @@ -3136,7 +3058,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) * no current trace chunk on the parent channel. */ if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && - stream->chan->trace_chunk) { + stream->chan->trace_chunk) { ret = consumer_stream_create_output_files(stream, true); if (ret) { goto error; @@ -3192,12 +3114,14 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) * whetnever ust_metadata_pushed is incremented, the associated * metadata has been consumed from the metadata stream. */ - DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, - contiguous, pushed); + DBG("UST consumer metadata pending check: contiguous %" PRIu64 + " vs pushed %" PRIu64, + contiguous, + pushed); LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0); if ((contiguous != pushed) || - (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { - ret = 1; /* Data is pending */ + (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { + ret = 1; /* Data is pending */ goto end; } } else { @@ -3209,7 +3133,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) */ ret = lttng_ust_ctl_put_subbuf(stream->ustream); LTTNG_ASSERT(ret == 0); - ret = 1; /* Data is pending */ + ret = 1; /* Data is pending */ goto end; } } @@ -3279,18 +3203,17 @@ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht) DBG("UST consumer closing all metadata streams"); - rcu_read_lock(); - cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, - node.node) { - - health_code_update(); + { + lttng::urcu::read_lock_guard read_lock; - pthread_mutex_lock(&stream->chan->lock); - lttng_ustconsumer_close_metadata(stream->chan); - pthread_mutex_unlock(&stream->chan->lock); + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + health_code_update(); + pthread_mutex_lock(&stream->chan->lock); + lttng_ustconsumer_close_metadata(stream->chan); + pthread_mutex_unlock(&stream->chan->lock); + } } - rcu_read_unlock(); } void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) @@ -3314,7 +3237,9 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, + bool invoked_by_timer, + int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -3350,17 +3275,18 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.uid = channel->ust_app_uid; request.key = channel->key; - DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, - request.session_id, request.session_id_per_pid, request.uid, - request.key); + DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64 + ", app UID %u and channel key %" PRIu64, + request.session_id, + request.session_id_per_pid, + request.uid, + request.key); pthread_mutex_lock(&ctx->metadata_socket_lock); health_code_update(); - ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, - sizeof(request)); + ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request)); if (ret < 0) { ERR("Asking metadata to sessiond"); goto end; @@ -3369,11 +3295,9 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); /* Receive the metadata from sessiond */ - ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, - sizeof(msg)); + ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - DBG("Consumer received unexpected message size %d (expects %zu)", - ret, sizeof(msg)); + DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg)); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); /* * The ret value might 0 meaning an orderly shutdown but this is ok @@ -3386,8 +3310,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, if (msg.cmd_type == LTTNG_ERR_UND) { /* No registry found */ - (void) consumer_send_status_msg(ctx->consumer_metadata_socket, - ret_code); + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code); ret = 0; goto end; } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) { @@ -3409,8 +3332,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); /* Tell session daemon we are ready to receive the metadata. */ - ret = consumer_send_status_msg(ctx->consumer_metadata_socket, - LTTCOMM_CONSUMERD_SUCCESS); + ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0 || len == 0) { /* * Somehow, the session daemon is not responding anymore or there is @@ -3422,7 +3344,13 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, version, channel, timer, wait); + key, + offset, + len, + version, + channel, + invoked_by_timer, + wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive @@ -3442,8 +3370,7 @@ end: /* * Return the ustctl call for the get stream id. */ -int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, - uint64_t *stream_id) +int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream_id);