X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=5339553cb5fd7fb7b54d61f8b74814f21e1faf6c;hb=cd9adb8b829564212158943a0d279bb35322ab30;hp=fb40f844ed34370ce262594af15115ed8c9a8d76;hpb=947bd0978b3d17b34eb1158cc6439eb1d4e5b6c3;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index fb40f844e..5339553cb 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2017 Jérémie Galarneau * @@ -8,101 +8,62 @@ */ #define _LGPL_SOURCE +#include "ust-consumer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include + +#include +#include #include #include +#include +#include +#include #include #include #include #include #include #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ust-consumer.h" - -#define INT_MAX_STR_LEN 12 /* includes \0 */ +#define INT_MAX_STR_LEN 12 /* includes \0 */ extern struct lttng_consumer_global_data the_consumer_data; extern int consumer_poll_timeout; LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE(); -/* - * Free channel object and all streams associated with it. This MUST be used - * only and only if the channel has _NEVER_ been added to the global channel - * hash table. - */ -static void destroy_channel(struct lttng_consumer_channel *channel) -{ - struct lttng_consumer_stream *stream, *stmp; - - LTTNG_ASSERT(channel); - - DBG("UST consumer cleaning stream list"); - - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - - health_code_update(); - - cds_list_del(&stream->send_node); - lttng_ust_ctl_destroy_stream(stream->ustream); - lttng_trace_chunk_put(stream->trace_chunk); - free(stream); - } - - /* - * If a channel is available meaning that was created before the streams - * were, delete it. - */ - if (channel->uchan) { - lttng_ustconsumer_del_channel(channel); - lttng_ustconsumer_free_channel(channel); - } - - if (channel->trace_chunk) { - lttng_trace_chunk_put(channel->trace_chunk); - } - - free(channel); -} - /* * Add channel to internal consumer state. * * Returns 0 on success or else a negative value. */ static int add_channel(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret = 0; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - if (ctx->on_recv_channel != NULL) { + if (ctx->on_recv_channel != nullptr) { ret = ctx->on_recv_channel(channel); if (ret == 0) { ret = consumer_add_channel(channel, ctx); @@ -127,29 +88,30 @@ error: * * Return NULL on error else the newly allocated stream object. */ -static struct lttng_consumer_stream *allocate_stream(int cpu, int key, - struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *_alloc_ret) +static struct lttng_consumer_stream *allocate_stream(int cpu, + int key, + struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx, + int *_alloc_ret) { int alloc_ret; - struct lttng_consumer_stream *stream = NULL; + struct lttng_consumer_stream *stream = nullptr; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - stream = consumer_stream_create( - channel, - channel->key, - key, - channel->name, - channel->relayd_id, - channel->session_id, - channel->trace_chunk, - cpu, - &alloc_ret, - channel->type, - channel->monitor); - if (stream == NULL) { + stream = consumer_stream_create(channel, + channel->key, + key, + channel->name, + channel->relayd_id, + channel->session_id, + channel->trace_chunk, + cpu, + &alloc_ret, + channel->type, + channel->monitor); + if (stream == nullptr) { switch (alloc_ret) { case -ENOENT: /* @@ -182,7 +144,7 @@ error: * Returns 0 on success else a negative value. */ static int send_stream_to_thread(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; struct lttng_pipe *stream_pipe; @@ -203,13 +165,13 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, * global. */ stream->globally_visible = 1; - cds_list_del(&stream->send_node); + cds_list_del_init(&stream->send_node); ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", - stream->metadata_flag ? "metadata" : "data", - lttng_pipe_get_writefd(stream_pipe)); + stream->metadata_flag ? "metadata" : "data", + lttng_pipe_get_writefd(stream_pipe)); if (stream->metadata_flag) { consumer_del_stream_for_metadata(stream); } else { @@ -222,10 +184,9 @@ error: return ret; } -static -int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) +static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) { - char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ + char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ int ret; strncpy(stream_shm_path, shm_path, PATH_MAX); @@ -235,8 +196,7 @@ int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) PERROR("snprintf"); goto end; } - strncat(stream_shm_path, cpu_nr, - PATH_MAX - strlen(stream_shm_path) - 1); + strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1); ret = 0; end: return ret; @@ -249,12 +209,12 @@ end: * Return 0 on success else a negative value. */ static int create_ust_streams(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret, cpu = 0; struct lttng_ust_ctl_consumer_stream *ustream; struct lttng_consumer_stream *stream; - pthread_mutex_t *current_stream_lock = NULL; + pthread_mutex_t *current_stream_lock = nullptr; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); @@ -309,11 +269,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, */ cds_list_add_tail(&stream->send_node, &channel->streams.head); - ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, - &stream->max_sb_size); + ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size); if (ret < 0) { - ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", - stream->name); + ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name); goto error; } @@ -326,7 +284,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64, - stream->name, stream->key, stream->relayd_stream_id); + stream->name, + stream->key, + stream->relayd_stream_id); /* Set next CPU stream. */ channel->streams.count = ++cpu; @@ -337,12 +297,12 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, if (channel->monitor) { /* Set metadata poll pipe if we created one */ memcpy(stream->ust_metadata_poll_pipe, - ust_metadata_pipe, - sizeof(ust_metadata_pipe)); + ust_metadata_pipe, + sizeof(ust_metadata_pipe)); } } pthread_mutex_unlock(&stream->lock); - current_stream_lock = NULL; + current_stream_lock = nullptr; } return 0; @@ -355,8 +315,9 @@ error_alloc: return ret; } -static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, - const struct lttng_credentials *session_credentials) +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, + int cpu, + const struct lttng_credentials *session_credentials) { char shm_path[PATH_MAX]; int ret; @@ -369,9 +330,10 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, goto error_shm_path; } return run_as_open(shm_path, - O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - lttng_credentials_get_uid(session_credentials), - lttng_credentials_get_gid(session_credentials)); + O_RDWR | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR, + lttng_credentials_get_uid(session_credentials), + lttng_credentials_get_gid(session_credentials)); error_shm_path: return -1; @@ -384,8 +346,8 @@ error_shm_path: * Return 0 on success or else a negative value. */ static int create_ust_channel(struct lttng_consumer_channel *channel, - struct lttng_ust_ctl_consumer_channel_attr *attr, - struct lttng_ust_ctl_consumer_channel **ust_chanp) + struct lttng_ust_ctl_consumer_channel_attr *attr, + struct lttng_ust_ctl_consumer_channel **ust_chanp) { int ret, nr_stream_fds, i, j; int *stream_fds; @@ -397,24 +359,28 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel->buffer_credentials.is_set); DBG3("Creating channel to ustctl with attr: [overwrite: %d, " - "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " - "switch_timer_interval: %u, read_timer_interval: %u, " - "output: %d, type: %d", attr->overwrite, attr->subbuf_size, - attr->num_subbuf, attr->switch_timer_interval, - attr->read_timer_interval, attr->output, attr->type); + "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " + "switch_timer_interval: %u, read_timer_interval: %u, " + "output: %d, type: %d", + attr->overwrite, + attr->subbuf_size, + attr->num_subbuf, + attr->switch_timer_interval, + attr->read_timer_interval, + attr->output, + attr->type); if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) nr_stream_fds = 1; else nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel(); - stream_fds = (int *) zmalloc(nr_stream_fds * sizeof(*stream_fds)); + stream_fds = calloc(nr_stream_fds); if (!stream_fds) { ret = -1; goto error_alloc; } for (i = 0; i < nr_stream_fds; i++) { - stream_fds[i] = open_ust_stream_fd(channel, i, - &channel->buffer_credentials.value); + stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value); if (stream_fds[i] < 0) { ret = -1; goto error_open; @@ -443,16 +409,15 @@ error_open: if (channel->shm_path[0]) { char shm_path[PATH_MAX]; - closeret = get_stream_shm_path(shm_path, - channel->shm_path, j); + closeret = get_stream_shm_path(shm_path, channel->shm_path, j); if (closeret) { ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials))); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials))); if (closeret) { PERROR("unlink %s", shm_path); } @@ -461,11 +426,11 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -502,8 +467,9 @@ error: * Return 0 on success or else a negative value. */ static int send_channel_to_sessiond_and_relayd(int sock, - struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *relayd_error) + struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx, + int *relayd_error) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; @@ -516,13 +482,13 @@ static int send_channel_to_sessiond_and_relayd(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); if (channel->relayd_id != (uint64_t) -1ULL) { - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Try to send the stream to the relayd if one is available. */ DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd", - stream->key, channel->name); + stream->key, + channel->name); ret = consumer_send_relayd_stream(stream, stream->chan->pathname); if (ret < 0) { /* @@ -562,8 +528,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* The channel was sent successfully to the sessiond at this point. */ - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Send stream to session daemon. */ @@ -574,7 +539,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* Tell sessiond there is no more stream. */ - ret = lttng_ust_ctl_send_stream_to_sessiond(sock, NULL); + ret = lttng_ust_ctl_send_stream_to_sessiond(sock, nullptr); if (ret < 0) { goto error; } @@ -599,8 +564,8 @@ error: * MUST be destroyed by consumer_del_channel(). */ static int ask_channel(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, - struct lttng_ust_ctl_consumer_channel_attr *attr) + struct lttng_consumer_channel *channel, + struct lttng_ust_ctl_consumer_channel_attr *attr) { int ret; @@ -658,7 +623,7 @@ end: * On error, return a negative value else 0 on success. */ static int send_streams_to_thread(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *stream, *stmp; @@ -667,9 +632,7 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, LTTNG_ASSERT(ctx); /* Send streams to the corresponding thread. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - + cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { health_code_update(); /* Sending the stream to the thread. */ @@ -714,9 +677,13 @@ static int flush_channel(uint64_t chan_key) /* For each stream of the channel id, flush it. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, - &channel->key, &iter.iter, stream, node_channel_id.node) { - + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -731,17 +698,27 @@ static int flush_channel(uint64_t chan_key) if (!stream->quiescent) { ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0); if (ret) { - ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 ", channel name = '%s'", - chan_key, channel->name); + ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 + ", channel name = '%s'", + chan_key, + channel->name); ret = LTTNG_ERR_BUFFER_FLUSH_FAILED; pthread_mutex_unlock(&stream->lock); goto error; } stream->quiescent = true; } -next: + next: pthread_mutex_unlock(&stream->lock); } + + /* + * Send one last buffer statistics update to the session daemon. This + * ensures that the session daemon gets at least one statistics update + * per channel even in the case of short-lived channels, such as when a + * short-lived app is traced in per-pid mode. + */ + sample_and_send_channel_buffer_stats(channel); error: rcu_read_unlock(); return ret; @@ -775,9 +752,13 @@ static int clear_quiescent_channel(uint64_t chan_key) /* For each stream of the channel id, clear quiescent state. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, - &channel->key, &iter.iter, stream, node_channel_id.node) { - + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -876,6 +857,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) int ret; struct lttng_consumer_channel *metadata; + ASSERT_RCU_READ_LOCKED(); + DBG("UST consumer setup metadata key %" PRIu64, key); metadata = consumer_find_channel(key); @@ -907,14 +890,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) /* Send metadata stream to relayd if needed. */ if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { - ret = consumer_send_relayd_stream(metadata->metadata_stream, - metadata->pathname); + ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname); if (ret < 0) { ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error; } - ret = consumer_send_relayd_streams_sent( - metadata->metadata_stream->net_seq_idx); + ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx); if (ret < 0) { ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; @@ -947,9 +928,8 @@ error: * the stream is still in the local stream list of the channel. This call * will make sure to clean that list. */ - consumer_stream_destroy(metadata->metadata_stream, NULL); - cds_list_del(&metadata->metadata_stream->send_node); - metadata->metadata_stream = NULL; + consumer_stream_destroy(metadata->metadata_stream, nullptr); + metadata->metadata_stream = nullptr; send_streams_error: error_no_stream: end: @@ -963,17 +943,19 @@ end: * Returns 0 on success, < 0 on error */ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, - uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) + uint64_t key, + char *path, + uint64_t relayd_id, + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *metadata_stream; LTTNG_ASSERT(path); LTTNG_ASSERT(ctx); + ASSERT_RCU_READ_LOCKED(); - DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", - key, path); + DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); rcu_read_lock(); @@ -1009,8 +991,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); } else { - ret = consumer_stream_create_output_files(metadata_stream, - false); + ret = consumer_stream_create_output_files(metadata_stream, false); } if (ret < 0) { goto error_stream; @@ -1030,18 +1011,15 @@ error_stream: * Clean up the stream completely because the next snapshot will use a * new metadata stream. */ - consumer_stream_destroy(metadata_stream, NULL); - cds_list_del(&metadata_stream->send_node); - metadata_channel->metadata_stream = NULL; + consumer_stream_destroy(metadata_stream, nullptr); + metadata_channel->metadata_stream = nullptr; error: rcu_read_unlock(); return ret; } -static -int get_current_subbuf_addr(struct lttng_consumer_stream *stream, - const char **addr) +static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr) { int ret; unsigned long mmap_offset; @@ -1049,8 +1027,7 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream, mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream); if (!mmap_base) { - ERR("Failed to get mmap base for stream `%s`", - stream->name); + ERR("Failed to get mmap base for stream `%s`", stream->name); ret = -EPERM; goto error; } @@ -1065,7 +1042,6 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream, *addr = mmap_base + mmap_offset; error: return ret; - } /* @@ -1075,9 +1051,11 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(struct lttng_consumer_channel *channel, - uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream, - struct lttng_consumer_local_data *ctx) + uint64_t key, + char *path, + uint64_t relayd_id, + uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -1086,6 +1064,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(path); LTTNG_ASSERT(ctx); + ASSERT_RCU_READ_LOCKED(); rcu_read_lock(); @@ -1096,7 +1075,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(!channel->monitor); DBG("UST consumer snapshot channel %" PRIu64, key); - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Lock stream because we are about to change its state. */ @@ -1119,16 +1098,14 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { - goto error_unlock; + goto error_close_stream; } } else { - ret = consumer_stream_create_output_files(stream, - false); + ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto error_unlock; + goto error_close_stream; } - DBG("UST consumer snapshot stream (%" PRIu64 ")", - stream->key); + DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key); } /* @@ -1138,8 +1115,10 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (!stream->quiescent) { ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0); if (ret < 0) { - ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 ", channel name = '%s'", - channel->key, channel->name); + ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 + ", channel name = '%s'", + channel->key, + channel->name); goto error_unlock; } } @@ -1147,19 +1126,19 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking UST snapshot"); - goto error_unlock; + goto error_close_stream; } ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced UST snapshot position"); - goto error_unlock; + goto error_close_stream; } ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd UST snapshot position"); - goto error_unlock; + goto error_close_stream; } /* @@ -1168,9 +1147,8 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, * daemon should never send a maximum stream size that is lower than * subbuffer size. */ - consumed_pos = consumer_get_consume_start_pos(consumed_pos, - produced_pos, nb_packets_per_stream, - stream->max_sb_size); + consumed_pos = consumer_get_consume_start_pos( + consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size); while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; @@ -1211,10 +1189,9 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_put_subbuf; } - subbuf_view = lttng_buffer_view_init( - subbuf_addr, 0, padded_len); + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap( - stream, &subbuf_view, padded_len - len); + stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1236,7 +1213,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, } /* Simply close the stream so we can use it on the next snapshot. */ - consumer_stream_close(stream); + consumer_stream_close_output(stream); pthread_mutex_unlock(&stream->lock); } @@ -1248,21 +1225,18 @@ error_put_subbuf: ERR("Snapshot lttng_ust_ctl_put_subbuf"); } error_close_stream: - consumer_stream_close(stream); + consumer_stream_close_output(stream); error_unlock: pthread_mutex_unlock(&stream->lock); rcu_read_unlock(); return ret; } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) +static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream) { ASSERT_LOCKED(stream->lock); - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); + DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; } @@ -1274,9 +1248,14 @@ void metadata_stream_reset_cache_consumed_position( * the metadata cache flush to concurrently progress in order to * complete. */ -int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, - uint64_t len, uint64_t version, - struct lttng_consumer_channel *channel, int timer, int wait) +int lttng_ustconsumer_recv_metadata(int sock, + uint64_t key, + uint64_t offset, + uint64_t len, + uint64_t version, + struct lttng_consumer_channel *channel, + int timer, + int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; @@ -1284,7 +1263,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); - metadata_str = (char *) zmalloc(len * sizeof(char)); + metadata_str = calloc(len); if (!metadata_str) { PERROR("zmalloc metadata string"); ret_code = LTTCOMM_CONSUMERD_ENOMEM; @@ -1305,8 +1284,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, pthread_mutex_lock(&channel->metadata_cache->lock); cache_write_status = consumer_metadata_cache_write( - channel->metadata_cache, offset, len, version, - metadata_str); + channel->metadata_cache, offset, len, version, metadata_str); pthread_mutex_unlock(&channel->metadata_cache->lock); switch (cache_write_status) { case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: @@ -1328,10 +1306,9 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * channel is under a snapshot session type. No need to update * the stream position in that scenario. */ - if (channel->metadata_stream != NULL) { + if (channel->metadata_stream != nullptr) { pthread_mutex_lock(&channel->metadata_stream->lock); - metadata_stream_reset_cache_consumed_position( - channel->metadata_stream); + metadata_stream_reset_cache_consumed_position(channel->metadata_stream); pthread_mutex_unlock(&channel->metadata_stream->lock); } else { /* Validate we are in snapshot mode. */ @@ -1385,12 +1362,13 @@ end: * Return 1 on success else a negative value or 0. */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll) + int sock, + struct pollfd *consumer_sockpoll) { int ret_func; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttcomm_consumer_msg msg; - struct lttng_consumer_channel *channel = NULL; + struct lttng_consumer_channel *channel = nullptr; health_code_update(); @@ -1400,14 +1378,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret_recv != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", - ret_recv, sizeof(msg)); + ret_recv, + sizeof(msg)); /* * The ret value might 0 meaning an orderly shutdown but this is ok * since the caller handles this. */ if (ret_recv > 0) { - lttng_consumer_send_error(ctx, - LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret_recv = -1; } return ret_recv; @@ -1427,11 +1405,22 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + uint32_t major = msg.u.relayd_sock.major; + uint32_t minor = msg.u.relayd_sock.minor; + enum lttcomm_sock_proto protocol = + (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol; + /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.type, + ctx, + sock, + consumer_sockpoll, + msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, + major, + minor, + protocol); goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_RELAYD: @@ -1443,7 +1432,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(index); - if (relayd == NULL) { + if (relayd == nullptr) { DBG("Unable to find relayd %" PRIu64, index); ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } @@ -1480,11 +1469,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, is_data_pending = consumer_data_pending(id); /* Send back returned value to session daemon */ - ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, - sizeof(is_data_pending)); + ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending)); if (ret_send < 0) { - DBG("Error when sending the data pending ret code: %zd", - ret_send); + DBG("Error when sending the data pending ret code: %zd", ret_send); goto error_fatal; } @@ -1506,28 +1493,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Create a plain object and reserve a channel key. */ channel = consumer_allocate_channel( - msg.u.ask_channel.key, - msg.u.ask_channel.session_id, - msg.u.ask_channel.chunk_id.is_set ? - &chunk_id : NULL, - msg.u.ask_channel.pathname, - msg.u.ask_channel.name, - msg.u.ask_channel.relayd_id, - (enum lttng_event_output) msg.u.ask_channel.output, - msg.u.ask_channel.tracefile_size, - msg.u.ask_channel.tracefile_count, - msg.u.ask_channel.session_id_per_pid, - msg.u.ask_channel.monitor, - msg.u.ask_channel.live_timer_interval, - msg.u.ask_channel.is_live, - msg.u.ask_channel.root_shm_path, - msg.u.ask_channel.shm_path); + msg.u.ask_channel.key, + msg.u.ask_channel.session_id, + msg.u.ask_channel.chunk_id.is_set ? &chunk_id : nullptr, + msg.u.ask_channel.pathname, + msg.u.ask_channel.name, + msg.u.ask_channel.relayd_id, + (enum lttng_event_output) msg.u.ask_channel.output, + msg.u.ask_channel.tracefile_size, + msg.u.ask_channel.tracefile_count, + msg.u.ask_channel.session_id_per_pid, + msg.u.ask_channel.monitor, + msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.is_live, + msg.u.ask_channel.root_shm_path, + msg.u.ask_channel.shm_path); if (!channel) { goto end_channel_error; } - LTTNG_OPTIONAL_SET(&channel->buffer_credentials, - buffer_credentials); + LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials); /* * Assign UST application UID to the channel. This value is ignored for @@ -1544,7 +1529,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, attr.read_timer_interval = msg.u.ask_channel.read_timer_interval; attr.chan_id = msg.u.ask_channel.chan_id; memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid)); - attr.blocking_timeout= msg.u.ask_channel.blocking_timeout; + attr.blocking_timeout = msg.u.ask_channel.blocking_timeout; /* Match channel buffer type to the UST abi. */ switch (msg.u.ask_channel.output) { @@ -1585,8 +1570,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) { int ret_allocate; - ret_allocate = consumer_metadata_cache_allocate( - channel); + ret_allocate = consumer_metadata_cache_allocate(channel); if (ret_allocate < 0) { ERR("Allocating metadata cache"); goto end_channel_error; @@ -1596,11 +1580,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { int monitor_start_ret; - consumer_timer_live_start(channel, - msg.u.ask_channel.live_timer_interval); + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); monitor_start_ret = consumer_timer_monitor_start( - channel, - msg.u.ask_channel.monitor_timer_interval); + channel, msg.u.ask_channel.monitor_timer_interval); if (monitor_start_ret < 0) { ERR("Starting channel monitoring timer failed"); goto end_channel_error; @@ -1666,8 +1648,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* Send the channel to sessiond (and relayd, if applicable). */ - ret = send_channel_to_sessiond_and_relayd( - sock, found_channel, ctx, &relayd_err); + ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err); if (ret < 0) { if (relayd_err) { /* @@ -1705,11 +1686,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } /* List MUST be empty after or else it could be reused. */ LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head)); -end_get_channel: + end_get_channel: goto end_msg_sessiond; -error_get_channel_fatal: + error_get_channel_fatal: goto error_fatal; -end_get_channel_nosignal: + end_get_channel_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_CHANNEL: @@ -1750,8 +1731,7 @@ end_get_channel_nosignal: { int ret; - ret = clear_quiescent_channel( - msg.u.clear_quiescent_channel.key); + ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key); if (ret != 0) { ret_code = (lttcomm_return_code) ret; } @@ -1767,8 +1747,7 @@ end_get_channel_nosignal: uint64_t version = msg.u.push_metadata.version; struct lttng_consumer_channel *found_channel; - DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, - len); + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); found_channel = consumer_find_channel(key); if (!found_channel) { @@ -1814,8 +1793,8 @@ end_get_channel_nosignal: health_code_update(); - ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len, - version, found_channel, 0, 1); + ret = lttng_ustconsumer_recv_metadata( + sock, key, offset, len, version, found_channel, 0, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -1823,9 +1802,9 @@ end_get_channel_nosignal: ret_code = (lttcomm_return_code) ret; goto end_push_metadata_msg_sessiond; } -end_push_metadata_msg_sessiond: + end_push_metadata_msg_sessiond: goto end_msg_sessiond; -error_push_metadata_fatal: + error_push_metadata_fatal: goto error_fatal; } case LTTNG_CONSUMER_SETUP_METADATA: @@ -1853,10 +1832,10 @@ error_push_metadata_fatal: int ret_snapshot; ret_snapshot = snapshot_metadata(found_channel, - key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - ctx); + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + ctx); if (ret_snapshot < 0) { ERR("Snapshot metadata failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1864,13 +1843,13 @@ error_push_metadata_fatal: } else { int ret_snapshot; - ret_snapshot = snapshot_channel(found_channel, - key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel - .nb_packets_per_stream, - ctx); + ret_snapshot = snapshot_channel( + found_channel, + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.nb_packets_per_stream, + ctx); if (ret_snapshot < 0) { ERR("Snapshot channel failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1896,8 +1875,7 @@ error_push_metadata_fatal: uint64_t id = msg.u.discarded_events.session_id; uint64_t key = msg.u.discarded_events.channel_key; - DBG("UST consumer discarded events command for session id %" - PRIu64, id); + DBG("UST consumer discarded events command for session id %" PRIu64, id); rcu_read_lock(); pthread_mutex_lock(&the_consumer_data.lock); @@ -1912,9 +1890,13 @@ error_push_metadata_fatal: */ discarded_events = 0; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, &id, - &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, + &id, + &iter.iter, + stream, + node_session_id.node) + { if (stream->chan->key == key) { discarded_events = stream->chan->discarded_events; break; @@ -1923,8 +1905,10 @@ error_push_metadata_fatal: pthread_mutex_unlock(&the_consumer_data.lock); rcu_read_unlock(); - DBG("UST consumer discarded events command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("UST consumer discarded events command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); health_code_update(); @@ -1939,7 +1923,7 @@ error_push_metadata_fatal: } case LTTNG_CONSUMER_LOST_PACKETS: { - int ret; + int ret; uint64_t lost_packets; struct lttng_ht_iter iter; struct lttng_ht *ht; @@ -1947,8 +1931,7 @@ error_push_metadata_fatal: uint64_t id = msg.u.lost_packets.session_id; uint64_t key = msg.u.lost_packets.channel_key; - DBG("UST consumer lost packets command for session id %" - PRIu64, id); + DBG("UST consumer lost packets command for session id %" PRIu64, id); rcu_read_lock(); pthread_mutex_lock(&the_consumer_data.lock); @@ -1960,27 +1943,32 @@ error_push_metadata_fatal: * to extract the information we need, we default to 0 if not * found (no packets lost if the channel is not yet in use). */ - lost_packets = 0; + lost_packets = 0; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, &id, - &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, + &id, + &iter.iter, + stream, + node_session_id.node) + { if (stream->chan->key == key) { - lost_packets = stream->chan->lost_packets; + lost_packets = stream->chan->lost_packets; break; } } pthread_mutex_unlock(&the_consumer_data.lock); rcu_read_unlock(); - DBG("UST consumer lost packets command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("UST consumer lost packets command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); health_code_update(); /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &lost_packets, - sizeof(lost_packets)); + ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets)); if (ret < 0) { PERROR("send lost packets"); goto error_fatal; @@ -1990,8 +1978,7 @@ error_push_metadata_fatal: } case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: { - int channel_monitor_pipe, ret_send, - ret_set_channel_monitor_pipe; + int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe; ssize_t ret_recv; ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -2001,8 +1988,7 @@ error_push_metadata_fatal: goto error_fatal; } - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &channel_monitor_pipe, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1); if (ret_recv != sizeof(channel_monitor_pipe)) { ERR("Failed to receive channel monitor pipe"); goto error_fatal; @@ -2010,8 +1996,7 @@ error_push_metadata_fatal: DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); ret_set_channel_monitor_pipe = - consumer_timer_thread_set_channel_monitor_pipe( - channel_monitor_pipe); + consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe); if (!ret_set_channel_monitor_pipe) { int flags; int ret_fcntl; @@ -2025,8 +2010,7 @@ error_push_metadata_fatal: } flags = ret_fcntl; - ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, - flags | O_NONBLOCK); + ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK); if (ret_fcntl == -1) { PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); goto error_fatal; @@ -2055,9 +2039,7 @@ error_push_metadata_fatal: * this channel. */ rotate_channel = lttng_consumer_rotate_channel( - found_channel, key, - msg.u.rotate_channel.relayd_id, - msg.u.rotate_channel.metadata, ctx); + found_channel, key, msg.u.rotate_channel.relayd_id); if (rotate_channel < 0) { ERR("Rotate channel failed"); ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; @@ -2083,15 +2065,13 @@ error_push_metadata_fatal: int ret_rotate_read_streams; ret_rotate_read_streams = - lttng_consumer_rotate_ready_streams( - found_channel, key, - ctx); + lttng_consumer_rotate_ready_streams(found_channel, key); if (ret_rotate_read_streams < 0) { ERR("Rotate channel failed"); } } break; -end_rotate_channel_nosignal: + end_rotate_channel_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_CLEAR_CHANNEL: @@ -2107,8 +2087,7 @@ end_rotate_channel_nosignal: } else { int ret_clear_channel; - ret_clear_channel = lttng_consumer_clear_channel( - found_channel); + ret_clear_channel = lttng_consumer_clear_channel(found_channel); if (ret_clear_channel) { ERR("Clear channel failed key %" PRIu64, key); ret_code = (lttcomm_return_code) ret_clear_channel; @@ -2126,9 +2105,12 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_INIT: { int ret_send_status; + lttng_uuid sessiond_uuid; - ret_code = lttng_consumer_init_command(ctx, - msg.u.init.sessiond_uuid); + std::copy(std::begin(msg.u.init.sessiond_uuid), + std::end(msg.u.init.sessiond_uuid), + sessiond_uuid.begin()); + ret_code = lttng_consumer_init_command(ctx, sessiond_uuid); health_code_update(); ret_send_status = consumer_send_status_msg(sock, ret_code); if (ret_send_status < 0) { @@ -2140,18 +2122,17 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), + .uid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.gid), }; - const bool is_local_trace = - !msg.u.create_trace_chunk.relayd_id.is_set; - const uint64_t relayd_id = - msg.u.create_trace_chunk.relayd_id.value; - const char *chunk_override_name = - *msg.u.create_trace_chunk.override_name ? - msg.u.create_trace_chunk.override_name : - NULL; - struct lttng_directory_handle *chunk_directory_handle = NULL; + const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + nullptr; + struct lttng_directory_handle *chunk_directory_handle = nullptr; /* * The session daemon will only provide a chunk directory file @@ -2163,8 +2144,7 @@ end_rotate_channel_nosignal: ssize_t ret_recv; /* Acnowledge the reception of the command. */ - ret_send_status = consumer_send_status_msg( - sock, LTTCOMM_CONSUMERD_SUCCESS); + ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret_send_status < 0) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; @@ -2173,17 +2153,15 @@ end_rotate_channel_nosignal: /* * Receive trace chunk domain dirfd. */ - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &chunk_dirfd, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); if (ret_recv != sizeof(chunk_dirfd)) { ERR("Failed to receive trace chunk domain directory file descriptor"); goto error_fatal; } - DBG("Received trace chunk domain directory fd (%d)", - chunk_dirfd); - chunk_directory_handle = lttng_directory_handle_create_from_dirfd( - chunk_dirfd); + DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd); + chunk_directory_handle = + lttng_directory_handle_create_from_dirfd(chunk_dirfd); if (!chunk_directory_handle) { ERR("Failed to initialize chunk domain directory handle from directory file descriptor"); if (close(chunk_dirfd)) { @@ -2194,48 +2172,39 @@ end_rotate_channel_nosignal: } ret_code = lttng_consumer_create_trace_chunk( - !is_local_trace ? &relayd_id : NULL, - msg.u.create_trace_chunk.session_id, - msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk - .creation_timestamp, - chunk_override_name, - msg.u.create_trace_chunk.credentials.is_set ? - &credentials : - NULL, - chunk_directory_handle); + !is_local_trace ? &relayd_id : nullptr, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk.creation_timestamp, + chunk_override_name, + msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr, + chunk_directory_handle); lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { enum lttng_trace_chunk_command_type close_command = - (lttng_trace_chunk_command_type) - msg.u.close_trace_chunk.close_command.value; - const uint64_t relayd_id = - msg.u.close_trace_chunk.relayd_id.value; + (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value; + const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; struct lttcomm_consumer_close_trace_chunk_reply reply; char closed_trace_chunk_path[LTTNG_PATH_MAX] = {}; int ret; ret_code = lttng_consumer_close_trace_chunk( - msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : - NULL, - msg.u.close_trace_chunk.session_id, - msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp, - msg.u.close_trace_chunk.close_command.is_set ? - &close_command : - NULL, closed_trace_chunk_path); + msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr, + closed_trace_chunk_path); reply.ret_code = ret_code; reply.path_length = strlen(closed_trace_chunk_path) + 1; ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); if (ret != sizeof(reply)) { goto error_fatal; } - ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, - reply.path_length); + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length); if (ret != reply.path_length) { goto error_fatal; } @@ -2243,26 +2212,22 @@ end_rotate_channel_nosignal: } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { - const uint64_t relayd_id = - msg.u.trace_chunk_exists.relayd_id.value; + const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value; ret_code = lttng_consumer_trace_chunk_exists( - msg.u.trace_chunk_exists.relayd_id.is_set ? - &relayd_id : NULL, - msg.u.trace_chunk_exists.session_id, - msg.u.trace_chunk_exists.chunk_id); + msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: { const uint64_t key = msg.u.open_channel_packets.key; - struct lttng_consumer_channel *found_channel = - consumer_find_channel(key); + struct lttng_consumer_channel *found_channel = consumer_find_channel(key); if (found_channel) { pthread_mutex_lock(&found_channel->lock); - ret_code = lttng_consumer_open_channel_packets( - found_channel); + ret_code = lttng_consumer_open_channel_packets(found_channel); pthread_mutex_unlock(&found_channel->lock); } else { /* @@ -2308,17 +2273,13 @@ end_msg_sessiond: end_channel_error: if (channel) { - /* - * Free channel here since no one has a reference to it. We don't - * free after that because a stream can store this pointer. - */ - destroy_channel(channel); + consumer_del_channel(channel); } /* We have to send a status channel message indicating an error. */ { int ret_send_status; - ret_send_status = consumer_send_status_channel(sock, NULL); + ret_send_status = consumer_send_status_channel(sock, nullptr); if (ret_send_status < 0) { /* Stop everything if session daemon can not be notified. */ goto error_fatal; @@ -2339,8 +2300,7 @@ end: return ret_func; } -int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, - int producer_active) +int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2366,8 +2326,7 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream) * * Returns 0 on success, < 0 on error. */ -int lttng_ustconsumer_sample_snapshot_positions( - struct lttng_consumer_stream *stream) +int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2380,8 +2339,8 @@ int lttng_ustconsumer_sample_snapshot_positions( * * Returns 0 on success, < 0 on error */ -int lttng_ustconsumer_get_produced_snapshot( - struct lttng_consumer_stream *stream, unsigned long *pos) +int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2395,8 +2354,8 @@ int lttng_ustconsumer_get_produced_snapshot( * * Returns 0 on success, < 0 on error */ -int lttng_ustconsumer_get_consumed_snapshot( - struct lttng_consumer_stream *stream, unsigned long *pos) +int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2405,8 +2364,7 @@ int lttng_ustconsumer_get_consumed_snapshot( return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos); } -int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, - int producer) +int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2422,8 +2380,7 @@ int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream) return lttng_ust_ctl_clear_buffer(stream->ustream); } -int lttng_ustconsumer_get_current_timestamp( - struct lttng_consumer_stream *stream, uint64_t *ts) +int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2432,8 +2389,7 @@ int lttng_ustconsumer_get_current_timestamp( return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts); } -int lttng_ustconsumer_get_sequence_number( - struct lttng_consumer_stream *stream, uint64_t *seq) +int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2458,8 +2414,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) stream->quiescent = true; } } - pthread_mutex_unlock(&stream->lock); + stream->hangup_flush_done = 1; + pthread_mutex_unlock(&stream->lock); } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) @@ -2488,10 +2445,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ERR("Cannot get stream shm path"); } ret = run_as_unlink(shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials))); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials))); if (ret) { PERROR("unlink %s", shm_path); } @@ -2509,12 +2466,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) lttng_ust_ctl_destroy_channel(chan->uchan); /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { - (void) run_as_rmdir_recursive(chan->root_shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); + (void) run_as_rmdir_recursive( + chan->root_shm_path, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)), + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -2552,15 +2508,13 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) * Returns the number of bytes pushed from the cache into the ring buffer, or a * negative value on error. */ -static -int commit_one_metadata_packet(struct lttng_consumer_stream *stream) +static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) { ssize_t write_len; int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->contents.size == - stream->ust_metadata_pushed) { + if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) { /* * In the context of a user space metadata channel, a * change in version can be detected in two ways: @@ -2585,21 +2539,20 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) * occur as part of the pre-consume) until the metadata size * exceeded the cache size. */ - if (stream->metadata_version != - stream->chan->metadata_cache->version) { + if (stream->metadata_version != stream->chan->metadata_cache->version) { metadata_stream_reset_cache_consumed_position(stream); consumer_stream_metadata_set_version(stream, - stream->chan->metadata_cache->version); + stream->chan->metadata_cache->version); } else { ret = 0; goto end; } } - write_len = lttng_ust_ctl_write_one_packet_to_channel(stream->chan->uchan, - &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contents.size - - stream->ust_metadata_pushed); + write_len = lttng_ust_ctl_write_one_packet_to_channel( + stream->chan->uchan, + &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed); LTTNG_ASSERT(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); @@ -2608,8 +2561,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } stream->ust_metadata_pushed += write_len; - LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= - stream->ust_metadata_pushed); + LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; /* @@ -2628,7 +2580,6 @@ end: return ret; } - /* * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. @@ -2639,9 +2590,9 @@ end: * * The RCU read side lock must be held by the caller. */ -enum sync_metadata_status lttng_ustconsumer_sync_metadata( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *metadata_stream) +enum sync_metadata_status +lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *metadata_stream) { int ret; enum sync_metadata_status status; @@ -2649,6 +2600,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( LTTNG_ASSERT(ctx); LTTNG_ASSERT(metadata_stream); + ASSERT_RCU_READ_LOCKED(); metadata_channel = metadata_stream->chan; pthread_mutex_unlock(&metadata_stream->lock); @@ -2676,7 +2628,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( */ if (consumer_stream_is_deleted(metadata_stream)) { DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", - metadata_stream->key); + metadata_stream->key); status = SYNC_METADATA_STATUS_NO_DATA; goto end; } @@ -2694,7 +2646,8 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( ret = lttng_ust_ctl_snapshot(metadata_stream->ustream); if (ret < 0) { - ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret); + ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", + ret); status = SYNC_METADATA_STATUS_ERROR; goto end; } @@ -2707,7 +2660,7 @@ end: * Return 0 on success else a negative value. */ static int notify_if_more_data(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; struct lttng_ust_ctl_consumer_stream *ustream; @@ -2783,18 +2736,17 @@ static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream) } static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; - ret = lttng_ust_ctl_get_subbuf_size( - stream->ustream, &subbuf->info.data.subbuf_size); + ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size); if (ret) { goto end; } - ret = lttng_ust_ctl_get_padded_subbuf_size( - stream->ustream, &subbuf->info.data.padded_subbuf_size); + ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, + &subbuf->info.data.padded_subbuf_size); if (ret) { goto end; } @@ -2804,7 +2756,7 @@ end: } static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; @@ -2820,7 +2772,7 @@ end: } static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; @@ -2829,43 +2781,40 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - ret = lttng_ust_ctl_get_packet_size( - stream->ustream, &subbuf->info.data.packet_size); + ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size); if (ret < 0) { PERROR("Failed to get sub-buffer packet size"); goto end; } - ret = lttng_ust_ctl_get_content_size( - stream->ustream, &subbuf->info.data.content_size); + ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size); if (ret < 0) { PERROR("Failed to get sub-buffer content size"); goto end; } - ret = lttng_ust_ctl_get_timestamp_begin( - stream->ustream, &subbuf->info.data.timestamp_begin); + ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream, + &subbuf->info.data.timestamp_begin); if (ret < 0) { PERROR("Failed to get sub-buffer begin timestamp"); goto end; } - ret = lttng_ust_ctl_get_timestamp_end( - stream->ustream, &subbuf->info.data.timestamp_end); + ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end); if (ret < 0) { PERROR("Failed to get sub-buffer end timestamp"); goto end; } - ret = lttng_ust_ctl_get_events_discarded( - stream->ustream, &subbuf->info.data.events_discarded); + ret = lttng_ust_ctl_get_events_discarded(stream->ustream, + &subbuf->info.data.events_discarded); if (ret) { PERROR("Failed to get sub-buffer events discarded count"); goto end; } ret = lttng_ust_ctl_get_sequence_number(stream->ustream, - &subbuf->info.data.sequence_number.value); + &subbuf->info.data.sequence_number.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -2876,15 +2825,14 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, subbuf->info.data.sequence_number.is_set = true; } - ret = lttng_ust_ctl_get_stream_id( - stream->ustream, &subbuf->info.data.stream_id); + ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id); if (ret < 0) { PERROR("Failed to get stream id"); goto end; } ret = lttng_ust_ctl_get_instance_id(stream->ustream, - &subbuf->info.data.stream_instance_id.value); + &subbuf->info.data.stream_instance_id.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -2899,13 +2847,12 @@ end: } static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) + struct stream_subbuffer *subbuffer) { int ret; const char *addr; - ret = stream->read_subbuffer_ops.extract_subbuffer_info( - stream, subbuffer); + ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer); if (ret) { goto end; } @@ -2915,16 +2862,15 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, goto end; } - subbuffer->buffer.buffer = lttng_buffer_view_init( - addr, 0, subbuffer->info.data.padded_subbuf_size); - LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL); + subbuffer->buffer.buffer = + lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); + LTTNG_ASSERT(subbuffer->buffer.buffer.data != nullptr); end: return ret; } -static enum get_next_subbuffer_status get_next_subbuffer( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; enum get_next_subbuffer_status status; @@ -2935,7 +2881,7 @@ static enum get_next_subbuffer_status get_next_subbuffer( status = GET_NEXT_SUBBUFFER_STATUS_OK; break; case -ENODATA: - case -EAGAIN: + case -EAGAIN: /* * The caller only expects -ENODATA when there is no data to * read, but the kernel tracer returns -EAGAIN when there is @@ -2959,9 +2905,9 @@ end: return status; } -static enum get_next_subbuffer_status get_next_subbuffer_metadata( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status +get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; bool cache_empty; @@ -3004,7 +2950,7 @@ static enum get_next_subbuffer_status get_next_subbuffer_metadata( } else { pthread_mutex_lock(&stream->chan->metadata_cache->lock); cache_empty = stream->chan->metadata_cache->contents.size == - stream->ust_metadata_pushed; + stream->ust_metadata_pushed; pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } } while (!got_subbuffer); @@ -3059,7 +3005,7 @@ end: } static int put_next_subbuffer(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) + struct stream_subbuffer *subbuffer __attribute__((unused))) { const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream); @@ -3068,42 +3014,35 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream, } static int signal_metadata(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx __attribute__((unused))) { ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static int lttng_ustconsumer_set_stream_ops( - struct lttng_consumer_stream *stream) +static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream) { int ret = 0; stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_metadata; - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_metadata_subbuffer_info; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache_consumed_position; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; - ret = consumer_stream_enable_metadata_bucketization( - stream); + ret = consumer_stream_enable_metadata_bucketization(stream); if (ret) { goto end; } } } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer; - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_data_subbuffer_info; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info; stream->read_subbuffer_ops.on_sleep = notify_if_more_data; if (stream->chan->is_live) { - stream->read_subbuffer_ops.send_live_beacon = - consumer_flush_ust_index; + stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index; } } @@ -3128,7 +3067,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) * no current trace chunk on the parent channel. */ if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && - stream->chan->trace_chunk) { + stream->chan->trace_chunk) { ret = consumer_stream_create_output_files(stream, true); if (ret) { goto error; @@ -3184,12 +3123,14 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) * whetnever ust_metadata_pushed is incremented, the associated * metadata has been consumed from the metadata stream. */ - DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, - contiguous, pushed); + DBG("UST consumer metadata pending check: contiguous %" PRIu64 + " vs pushed %" PRIu64, + contiguous, + pushed); LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0); if ((contiguous != pushed) || - (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { - ret = 1; /* Data is pending */ + (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { + ret = 1; /* Data is pending */ goto end; } } else { @@ -3201,7 +3142,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) */ ret = lttng_ust_ctl_put_subbuf(stream->ustream); LTTNG_ASSERT(ret == 0); - ret = 1; /* Data is pending */ + ret = 1; /* Data is pending */ goto end; } } @@ -3272,15 +3213,12 @@ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht) DBG("UST consumer closing all metadata streams"); rcu_read_lock(); - cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, - node.node) { - + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { health_code_update(); pthread_mutex_lock(&stream->chan->lock); lttng_ustconsumer_close_metadata(stream->chan); pthread_mutex_unlock(&stream->chan->lock); - } rcu_read_unlock(); } @@ -3306,7 +3244,9 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, + int timer, + int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -3342,17 +3282,18 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.uid = channel->ust_app_uid; request.key = channel->key; - DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, - request.session_id, request.session_id_per_pid, request.uid, - request.key); + DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64 + ", app UID %u and channel key %" PRIu64, + request.session_id, + request.session_id_per_pid, + request.uid, + request.key); pthread_mutex_lock(&ctx->metadata_socket_lock); health_code_update(); - ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, - sizeof(request)); + ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request)); if (ret < 0) { ERR("Asking metadata to sessiond"); goto end; @@ -3361,11 +3302,9 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); /* Receive the metadata from sessiond */ - ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, - sizeof(msg)); + ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - DBG("Consumer received unexpected message size %d (expects %zu)", - ret, sizeof(msg)); + DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg)); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); /* * The ret value might 0 meaning an orderly shutdown but this is ok @@ -3378,8 +3317,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, if (msg.cmd_type == LTTNG_ERR_UND) { /* No registry found */ - (void) consumer_send_status_msg(ctx->consumer_metadata_socket, - ret_code); + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code); ret = 0; goto end; } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) { @@ -3401,8 +3339,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); /* Tell session daemon we are ready to receive the metadata. */ - ret = consumer_send_status_msg(ctx->consumer_metadata_socket, - LTTCOMM_CONSUMERD_SUCCESS); + ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0 || len == 0) { /* * Somehow, the session daemon is not responding anymore or there is @@ -3413,8 +3350,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, version, channel, timer, wait); + ret = lttng_ustconsumer_recv_metadata( + ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive @@ -3434,8 +3371,7 @@ end: /* * Return the ustctl call for the get stream id. */ -int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, - uint64_t *stream_id) +int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream_id);