X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;fp=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=ea3e3619106654d556bdbf543afca0ce75308bd2;hp=872bc1abf8f89c322e217b365138e88c5341d7f8;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 872bc1abf..ea3e36191 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -8,41 +8,42 @@ */ #define _LGPL_SOURCE +#include "ust-consumer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include #include + +#include +#include #include #include +#include +#include +#include #include #include #include #include #include #include -#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ust-consumer.hpp" - -#define INT_MAX_STR_LEN 12 /* includes \0 */ +#define INT_MAX_STR_LEN 12 /* includes \0 */ extern struct lttng_consumer_global_data the_consumer_data; extern int consumer_poll_timeout; @@ -55,7 +56,7 @@ LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE(); * Returns 0 on success or else a negative value. */ static int add_channel(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret = 0; @@ -87,9 +88,11 @@ error: * * Return NULL on error else the newly allocated stream object. */ -static struct lttng_consumer_stream *allocate_stream(int cpu, int key, - struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *_alloc_ret) +static struct lttng_consumer_stream *allocate_stream(int cpu, + int key, + struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx, + int *_alloc_ret) { int alloc_ret; struct lttng_consumer_stream *stream = NULL; @@ -97,18 +100,17 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - stream = consumer_stream_create( - channel, - channel->key, - key, - channel->name, - channel->relayd_id, - channel->session_id, - channel->trace_chunk, - cpu, - &alloc_ret, - channel->type, - channel->monitor); + stream = consumer_stream_create(channel, + channel->key, + key, + channel->name, + channel->relayd_id, + channel->session_id, + channel->trace_chunk, + cpu, + &alloc_ret, + channel->type, + channel->monitor); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -142,7 +144,7 @@ error: * Returns 0 on success else a negative value. */ static int send_stream_to_thread(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; struct lttng_pipe *stream_pipe; @@ -168,8 +170,8 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", - stream->metadata_flag ? "metadata" : "data", - lttng_pipe_get_writefd(stream_pipe)); + stream->metadata_flag ? "metadata" : "data", + lttng_pipe_get_writefd(stream_pipe)); if (stream->metadata_flag) { consumer_del_stream_for_metadata(stream); } else { @@ -182,10 +184,9 @@ error: return ret; } -static -int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) +static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) { - char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ + char cpu_nr[INT_MAX_STR_LEN]; /* int max len */ int ret; strncpy(stream_shm_path, shm_path, PATH_MAX); @@ -195,8 +196,7 @@ int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu) PERROR("snprintf"); goto end; } - strncat(stream_shm_path, cpu_nr, - PATH_MAX - strlen(stream_shm_path) - 1); + strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1); ret = 0; end: return ret; @@ -209,7 +209,7 @@ end: * Return 0 on success else a negative value. */ static int create_ust_streams(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret, cpu = 0; struct lttng_ust_ctl_consumer_stream *ustream; @@ -269,11 +269,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, */ cds_list_add_tail(&stream->send_node, &channel->streams.head); - ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, - &stream->max_sb_size); + ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size); if (ret < 0) { - ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", - stream->name); + ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name); goto error; } @@ -286,7 +284,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64, - stream->name, stream->key, stream->relayd_stream_id); + stream->name, + stream->key, + stream->relayd_stream_id); /* Set next CPU stream. */ channel->streams.count = ++cpu; @@ -297,8 +297,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, if (channel->monitor) { /* Set metadata poll pipe if we created one */ memcpy(stream->ust_metadata_poll_pipe, - ust_metadata_pipe, - sizeof(ust_metadata_pipe)); + ust_metadata_pipe, + sizeof(ust_metadata_pipe)); } } pthread_mutex_unlock(&stream->lock); @@ -315,8 +315,9 @@ error_alloc: return ret; } -static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, - const struct lttng_credentials *session_credentials) +static int open_ust_stream_fd(struct lttng_consumer_channel *channel, + int cpu, + const struct lttng_credentials *session_credentials) { char shm_path[PATH_MAX]; int ret; @@ -329,9 +330,10 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu, goto error_shm_path; } return run_as_open(shm_path, - O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, - lttng_credentials_get_uid(session_credentials), - lttng_credentials_get_gid(session_credentials)); + O_RDWR | O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR, + lttng_credentials_get_uid(session_credentials), + lttng_credentials_get_gid(session_credentials)); error_shm_path: return -1; @@ -344,8 +346,8 @@ error_shm_path: * Return 0 on success or else a negative value. */ static int create_ust_channel(struct lttng_consumer_channel *channel, - struct lttng_ust_ctl_consumer_channel_attr *attr, - struct lttng_ust_ctl_consumer_channel **ust_chanp) + struct lttng_ust_ctl_consumer_channel_attr *attr, + struct lttng_ust_ctl_consumer_channel **ust_chanp) { int ret, nr_stream_fds, i, j; int *stream_fds; @@ -357,11 +359,16 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel->buffer_credentials.is_set); DBG3("Creating channel to ustctl with attr: [overwrite: %d, " - "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " - "switch_timer_interval: %u, read_timer_interval: %u, " - "output: %d, type: %d", attr->overwrite, attr->subbuf_size, - attr->num_subbuf, attr->switch_timer_interval, - attr->read_timer_interval, attr->output, attr->type); + "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", " + "switch_timer_interval: %u, read_timer_interval: %u, " + "output: %d, type: %d", + attr->overwrite, + attr->subbuf_size, + attr->num_subbuf, + attr->switch_timer_interval, + attr->read_timer_interval, + attr->output, + attr->type); if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) nr_stream_fds = 1; @@ -373,8 +380,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, goto error_alloc; } for (i = 0; i < nr_stream_fds; i++) { - stream_fds[i] = open_ust_stream_fd(channel, i, - &channel->buffer_credentials.value); + stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value); if (stream_fds[i] < 0) { ret = -1; goto error_open; @@ -403,16 +409,15 @@ error_open: if (channel->shm_path[0]) { char shm_path[PATH_MAX]; - closeret = get_stream_shm_path(shm_path, - channel->shm_path, j); + closeret = get_stream_shm_path(shm_path, channel->shm_path, j); if (closeret) { ERR("Cannot get stream shm path"); } closeret = run_as_unlink(shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials))); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials))); if (closeret) { PERROR("unlink %s", shm_path); } @@ -421,11 +426,11 @@ error_open: /* Try to rmdir all directories under shm_path root. */ if (channel->root_shm_path[0]) { (void) run_as_rmdir_recursive(channel->root_shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - channel->buffer_credentials)), - LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + channel->buffer_credentials)), + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(stream_fds); error_alloc: @@ -462,8 +467,9 @@ error: * Return 0 on success or else a negative value. */ static int send_channel_to_sessiond_and_relayd(int sock, - struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *relayd_error) + struct lttng_consumer_channel *channel, + struct lttng_consumer_local_data *ctx, + int *relayd_error) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; @@ -476,13 +482,13 @@ static int send_channel_to_sessiond_and_relayd(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); if (channel->relayd_id != (uint64_t) -1ULL) { - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Try to send the stream to the relayd if one is available. */ DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd", - stream->key, channel->name); + stream->key, + channel->name); ret = consumer_send_relayd_stream(stream, stream->chan->pathname); if (ret < 0) { /* @@ -522,8 +528,7 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* The channel was sent successfully to the sessiond at this point. */ - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Send stream to session daemon. */ @@ -559,8 +564,8 @@ error: * MUST be destroyed by consumer_del_channel(). */ static int ask_channel(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, - struct lttng_ust_ctl_consumer_channel_attr *attr) + struct lttng_consumer_channel *channel, + struct lttng_ust_ctl_consumer_channel_attr *attr) { int ret; @@ -618,7 +623,7 @@ end: * On error, return a negative value else 0 on success. */ static int send_streams_to_thread(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *stream, *stmp; @@ -627,9 +632,7 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, LTTNG_ASSERT(ctx); /* Send streams to the corresponding thread. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - + cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { health_code_update(); /* Sending the stream to the thread. */ @@ -674,9 +677,13 @@ static int flush_channel(uint64_t chan_key) /* For each stream of the channel id, flush it. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, - &channel->key, &iter.iter, stream, node_channel_id.node) { - + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -691,15 +698,17 @@ static int flush_channel(uint64_t chan_key) if (!stream->quiescent) { ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0); if (ret) { - ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 ", channel name = '%s'", - chan_key, channel->name); + ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 + ", channel name = '%s'", + chan_key, + channel->name); ret = LTTNG_ERR_BUFFER_FLUSH_FAILED; pthread_mutex_unlock(&stream->lock); goto error; } stream->quiescent = true; } -next: + next: pthread_mutex_unlock(&stream->lock); } @@ -743,9 +752,13 @@ static int clear_quiescent_channel(uint64_t chan_key) /* For each stream of the channel id, clear quiescent state. */ cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, - &channel->key, &iter.iter, stream, node_channel_id.node) { - + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -877,14 +890,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) /* Send metadata stream to relayd if needed. */ if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) { - ret = consumer_send_relayd_stream(metadata->metadata_stream, - metadata->pathname); + ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname); if (ret < 0) { ret = LTTCOMM_CONSUMERD_ERROR_METADATA; goto error; } - ret = consumer_send_relayd_streams_sent( - metadata->metadata_stream->net_seq_idx); + ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx); if (ret < 0) { ret = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; @@ -932,8 +943,10 @@ end: * Returns 0 on success, < 0 on error */ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, - uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) + uint64_t key, + char *path, + uint64_t relayd_id, + struct lttng_consumer_local_data *ctx) { int ret = 0; struct lttng_consumer_stream *metadata_stream; @@ -942,8 +955,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, LTTNG_ASSERT(ctx); ASSERT_RCU_READ_LOCKED(); - DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", - key, path); + DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); rcu_read_lock(); @@ -979,8 +991,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); } else { - ret = consumer_stream_create_output_files(metadata_stream, - false); + ret = consumer_stream_create_output_files(metadata_stream, false); } if (ret < 0) { goto error_stream; @@ -1008,9 +1019,7 @@ error: return ret; } -static -int get_current_subbuf_addr(struct lttng_consumer_stream *stream, - const char **addr) +static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr) { int ret; unsigned long mmap_offset; @@ -1018,8 +1027,7 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream, mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream); if (!mmap_base) { - ERR("Failed to get mmap base for stream `%s`", - stream->name); + ERR("Failed to get mmap base for stream `%s`", stream->name); ret = -EPERM; goto error; } @@ -1034,7 +1042,6 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream, *addr = mmap_base + mmap_offset; error: return ret; - } /* @@ -1044,9 +1051,11 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(struct lttng_consumer_channel *channel, - uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream, - struct lttng_consumer_local_data *ctx) + uint64_t key, + char *path, + uint64_t relayd_id, + uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -1066,7 +1075,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(!channel->monitor); DBG("UST consumer snapshot channel %" PRIu64, key); - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); /* Lock stream because we are about to change its state. */ @@ -1092,13 +1101,11 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_close_stream; } } else { - ret = consumer_stream_create_output_files(stream, - false); + ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { goto error_close_stream; } - DBG("UST consumer snapshot stream (%" PRIu64 ")", - stream->key); + DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key); } /* @@ -1108,8 +1115,10 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (!stream->quiescent) { ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0); if (ret < 0) { - ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 ", channel name = '%s'", - channel->key, channel->name); + ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 + ", channel name = '%s'", + channel->key, + channel->name); goto error_unlock; } } @@ -1138,9 +1147,8 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, * daemon should never send a maximum stream size that is lower than * subbuffer size. */ - consumed_pos = consumer_get_consume_start_pos(consumed_pos, - produced_pos, nb_packets_per_stream, - stream->max_sb_size); + consumed_pos = consumer_get_consume_start_pos( + consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size); while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; @@ -1181,10 +1189,9 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_put_subbuf; } - subbuf_view = lttng_buffer_view_init( - subbuf_addr, 0, padded_len); + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap( - stream, &subbuf_view, padded_len - len); + stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1225,14 +1232,11 @@ error_unlock: return ret; } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) +static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream) { ASSERT_LOCKED(stream->lock); - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); + DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; } @@ -1244,9 +1248,14 @@ void metadata_stream_reset_cache_consumed_position( * the metadata cache flush to concurrently progress in order to * complete. */ -int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, - uint64_t len, uint64_t version, - struct lttng_consumer_channel *channel, int timer, int wait) +int lttng_ustconsumer_recv_metadata(int sock, + uint64_t key, + uint64_t offset, + uint64_t len, + uint64_t version, + struct lttng_consumer_channel *channel, + int timer, + int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; @@ -1275,8 +1284,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, pthread_mutex_lock(&channel->metadata_cache->lock); cache_write_status = consumer_metadata_cache_write( - channel->metadata_cache, offset, len, version, - metadata_str); + channel->metadata_cache, offset, len, version, metadata_str); pthread_mutex_unlock(&channel->metadata_cache->lock); switch (cache_write_status) { case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: @@ -1300,8 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, */ if (channel->metadata_stream != NULL) { pthread_mutex_lock(&channel->metadata_stream->lock); - metadata_stream_reset_cache_consumed_position( - channel->metadata_stream); + metadata_stream_reset_cache_consumed_position(channel->metadata_stream); pthread_mutex_unlock(&channel->metadata_stream->lock); } else { /* Validate we are in snapshot mode. */ @@ -1355,7 +1362,8 @@ end: * Return 1 on success else a negative value or 0. */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll) + int sock, + struct pollfd *consumer_sockpoll) { int ret_func; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -1370,14 +1378,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret_recv != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", - ret_recv, sizeof(msg)); + ret_recv, + sizeof(msg)); /* * The ret value might 0 meaning an orderly shutdown but this is ok * since the caller handles this. */ if (ret_recv > 0) { - lttng_consumer_send_error(ctx, - LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret_recv = -1; } return ret_recv; @@ -1400,15 +1408,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, uint32_t major = msg.u.relayd_sock.major; uint32_t minor = msg.u.relayd_sock.minor; enum lttcomm_sock_proto protocol = - (enum lttcomm_sock_proto) msg.u.relayd_sock - .relayd_socket_protocol; + (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol; /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, - consumer_sockpoll, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id, major, - minor, protocol); + msg.u.relayd_sock.type, + ctx, + sock, + consumer_sockpoll, + msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, + major, + minor, + protocol); goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_RELAYD: @@ -1457,11 +1469,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, is_data_pending = consumer_data_pending(id); /* Send back returned value to session daemon */ - ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, - sizeof(is_data_pending)); + ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending)); if (ret_send < 0) { - DBG("Error when sending the data pending ret code: %zd", - ret_send); + DBG("Error when sending the data pending ret code: %zd", ret_send); goto error_fatal; } @@ -1483,28 +1493,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Create a plain object and reserve a channel key. */ channel = consumer_allocate_channel( - msg.u.ask_channel.key, - msg.u.ask_channel.session_id, - msg.u.ask_channel.chunk_id.is_set ? - &chunk_id : NULL, - msg.u.ask_channel.pathname, - msg.u.ask_channel.name, - msg.u.ask_channel.relayd_id, - (enum lttng_event_output) msg.u.ask_channel.output, - msg.u.ask_channel.tracefile_size, - msg.u.ask_channel.tracefile_count, - msg.u.ask_channel.session_id_per_pid, - msg.u.ask_channel.monitor, - msg.u.ask_channel.live_timer_interval, - msg.u.ask_channel.is_live, - msg.u.ask_channel.root_shm_path, - msg.u.ask_channel.shm_path); + msg.u.ask_channel.key, + msg.u.ask_channel.session_id, + msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL, + msg.u.ask_channel.pathname, + msg.u.ask_channel.name, + msg.u.ask_channel.relayd_id, + (enum lttng_event_output) msg.u.ask_channel.output, + msg.u.ask_channel.tracefile_size, + msg.u.ask_channel.tracefile_count, + msg.u.ask_channel.session_id_per_pid, + msg.u.ask_channel.monitor, + msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.is_live, + msg.u.ask_channel.root_shm_path, + msg.u.ask_channel.shm_path); if (!channel) { goto end_channel_error; } - LTTNG_OPTIONAL_SET(&channel->buffer_credentials, - buffer_credentials); + LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials); /* * Assign UST application UID to the channel. This value is ignored for @@ -1521,7 +1529,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, attr.read_timer_interval = msg.u.ask_channel.read_timer_interval; attr.chan_id = msg.u.ask_channel.chan_id; memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid)); - attr.blocking_timeout= msg.u.ask_channel.blocking_timeout; + attr.blocking_timeout = msg.u.ask_channel.blocking_timeout; /* Match channel buffer type to the UST abi. */ switch (msg.u.ask_channel.output) { @@ -1562,8 +1570,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) { int ret_allocate; - ret_allocate = consumer_metadata_cache_allocate( - channel); + ret_allocate = consumer_metadata_cache_allocate(channel); if (ret_allocate < 0) { ERR("Allocating metadata cache"); goto end_channel_error; @@ -1573,11 +1580,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { int monitor_start_ret; - consumer_timer_live_start(channel, - msg.u.ask_channel.live_timer_interval); + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); monitor_start_ret = consumer_timer_monitor_start( - channel, - msg.u.ask_channel.monitor_timer_interval); + channel, msg.u.ask_channel.monitor_timer_interval); if (monitor_start_ret < 0) { ERR("Starting channel monitoring timer failed"); goto end_channel_error; @@ -1643,8 +1648,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); /* Send the channel to sessiond (and relayd, if applicable). */ - ret = send_channel_to_sessiond_and_relayd( - sock, found_channel, ctx, &relayd_err); + ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err); if (ret < 0) { if (relayd_err) { /* @@ -1682,11 +1686,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } /* List MUST be empty after or else it could be reused. */ LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head)); -end_get_channel: + end_get_channel: goto end_msg_sessiond; -error_get_channel_fatal: + error_get_channel_fatal: goto error_fatal; -end_get_channel_nosignal: + end_get_channel_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_CHANNEL: @@ -1727,8 +1731,7 @@ end_get_channel_nosignal: { int ret; - ret = clear_quiescent_channel( - msg.u.clear_quiescent_channel.key); + ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key); if (ret != 0) { ret_code = (lttcomm_return_code) ret; } @@ -1744,8 +1747,7 @@ end_get_channel_nosignal: uint64_t version = msg.u.push_metadata.version; struct lttng_consumer_channel *found_channel; - DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, - len); + DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); found_channel = consumer_find_channel(key); if (!found_channel) { @@ -1791,8 +1793,8 @@ end_get_channel_nosignal: health_code_update(); - ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len, - version, found_channel, 0, 1); + ret = lttng_ustconsumer_recv_metadata( + sock, key, offset, len, version, found_channel, 0, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -1800,9 +1802,9 @@ end_get_channel_nosignal: ret_code = (lttcomm_return_code) ret; goto end_push_metadata_msg_sessiond; } -end_push_metadata_msg_sessiond: + end_push_metadata_msg_sessiond: goto end_msg_sessiond; -error_push_metadata_fatal: + error_push_metadata_fatal: goto error_fatal; } case LTTNG_CONSUMER_SETUP_METADATA: @@ -1830,10 +1832,10 @@ error_push_metadata_fatal: int ret_snapshot; ret_snapshot = snapshot_metadata(found_channel, - key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - ctx); + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + ctx); if (ret_snapshot < 0) { ERR("Snapshot metadata failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1841,13 +1843,13 @@ error_push_metadata_fatal: } else { int ret_snapshot; - ret_snapshot = snapshot_channel(found_channel, - key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel - .nb_packets_per_stream, - ctx); + ret_snapshot = snapshot_channel( + found_channel, + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.nb_packets_per_stream, + ctx); if (ret_snapshot < 0) { ERR("Snapshot channel failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1873,8 +1875,7 @@ error_push_metadata_fatal: uint64_t id = msg.u.discarded_events.session_id; uint64_t key = msg.u.discarded_events.channel_key; - DBG("UST consumer discarded events command for session id %" - PRIu64, id); + DBG("UST consumer discarded events command for session id %" PRIu64, id); rcu_read_lock(); pthread_mutex_lock(&the_consumer_data.lock); @@ -1889,9 +1890,13 @@ error_push_metadata_fatal: */ discarded_events = 0; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, &id, - &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, + &id, + &iter.iter, + stream, + node_session_id.node) + { if (stream->chan->key == key) { discarded_events = stream->chan->discarded_events; break; @@ -1900,8 +1905,10 @@ error_push_metadata_fatal: pthread_mutex_unlock(&the_consumer_data.lock); rcu_read_unlock(); - DBG("UST consumer discarded events command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("UST consumer discarded events command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); health_code_update(); @@ -1916,7 +1923,7 @@ error_push_metadata_fatal: } case LTTNG_CONSUMER_LOST_PACKETS: { - int ret; + int ret; uint64_t lost_packets; struct lttng_ht_iter iter; struct lttng_ht *ht; @@ -1924,8 +1931,7 @@ error_push_metadata_fatal: uint64_t id = msg.u.lost_packets.session_id; uint64_t key = msg.u.lost_packets.channel_key; - DBG("UST consumer lost packets command for session id %" - PRIu64, id); + DBG("UST consumer lost packets command for session id %" PRIu64, id); rcu_read_lock(); pthread_mutex_lock(&the_consumer_data.lock); @@ -1937,27 +1943,32 @@ error_push_metadata_fatal: * to extract the information we need, we default to 0 if not * found (no packets lost if the channel is not yet in use). */ - lost_packets = 0; + lost_packets = 0; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, &id, - &iter.iter, stream, node_session_id.node) { + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, + &id, + &iter.iter, + stream, + node_session_id.node) + { if (stream->chan->key == key) { - lost_packets = stream->chan->lost_packets; + lost_packets = stream->chan->lost_packets; break; } } pthread_mutex_unlock(&the_consumer_data.lock); rcu_read_unlock(); - DBG("UST consumer lost packets command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("UST consumer lost packets command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); health_code_update(); /* Send back returned value to session daemon */ - ret = lttcomm_send_unix_sock(sock, &lost_packets, - sizeof(lost_packets)); + ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets)); if (ret < 0) { PERROR("send lost packets"); goto error_fatal; @@ -1967,8 +1978,7 @@ error_push_metadata_fatal: } case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: { - int channel_monitor_pipe, ret_send, - ret_set_channel_monitor_pipe; + int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe; ssize_t ret_recv; ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -1978,8 +1988,7 @@ error_push_metadata_fatal: goto error_fatal; } - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &channel_monitor_pipe, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1); if (ret_recv != sizeof(channel_monitor_pipe)) { ERR("Failed to receive channel monitor pipe"); goto error_fatal; @@ -1987,8 +1996,7 @@ error_push_metadata_fatal: DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); ret_set_channel_monitor_pipe = - consumer_timer_thread_set_channel_monitor_pipe( - channel_monitor_pipe); + consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe); if (!ret_set_channel_monitor_pipe) { int flags; int ret_fcntl; @@ -2002,8 +2010,7 @@ error_push_metadata_fatal: } flags = ret_fcntl; - ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, - flags | O_NONBLOCK); + ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK); if (ret_fcntl == -1) { PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); goto error_fatal; @@ -2032,8 +2039,7 @@ error_push_metadata_fatal: * this channel. */ rotate_channel = lttng_consumer_rotate_channel( - found_channel, key, - msg.u.rotate_channel.relayd_id); + found_channel, key, msg.u.rotate_channel.relayd_id); if (rotate_channel < 0) { ERR("Rotate channel failed"); ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; @@ -2059,14 +2065,13 @@ error_push_metadata_fatal: int ret_rotate_read_streams; ret_rotate_read_streams = - lttng_consumer_rotate_ready_streams( - found_channel, key); + lttng_consumer_rotate_ready_streams(found_channel, key); if (ret_rotate_read_streams < 0) { ERR("Rotate channel failed"); } } break; -end_rotate_channel_nosignal: + end_rotate_channel_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_CLEAR_CHANNEL: @@ -2082,8 +2087,7 @@ end_rotate_channel_nosignal: } else { int ret_clear_channel; - ret_clear_channel = lttng_consumer_clear_channel( - found_channel); + ret_clear_channel = lttng_consumer_clear_channel(found_channel); if (ret_clear_channel) { ERR("Clear channel failed key %" PRIu64, key); ret_code = (lttcomm_return_code) ret_clear_channel; @@ -2103,8 +2107,9 @@ end_rotate_channel_nosignal: int ret_send_status; lttng_uuid sessiond_uuid; - std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid), - sessiond_uuid.begin()); + std::copy(std::begin(msg.u.init.sessiond_uuid), + std::end(msg.u.init.sessiond_uuid), + sessiond_uuid.begin()); ret_code = lttng_consumer_init_command(ctx, sessiond_uuid); health_code_update(); ret_send_status = consumer_send_status_msg(sock, ret_code); @@ -2117,17 +2122,16 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), + .uid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.gid), }; - const bool is_local_trace = - !msg.u.create_trace_chunk.relayd_id.is_set; - const uint64_t relayd_id = - msg.u.create_trace_chunk.relayd_id.value; - const char *chunk_override_name = - *msg.u.create_trace_chunk.override_name ? - msg.u.create_trace_chunk.override_name : - NULL; + const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + NULL; struct lttng_directory_handle *chunk_directory_handle = NULL; /* @@ -2140,8 +2144,7 @@ end_rotate_channel_nosignal: ssize_t ret_recv; /* Acnowledge the reception of the command. */ - ret_send_status = consumer_send_status_msg( - sock, LTTCOMM_CONSUMERD_SUCCESS); + ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret_send_status < 0) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; @@ -2150,17 +2153,15 @@ end_rotate_channel_nosignal: /* * Receive trace chunk domain dirfd. */ - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &chunk_dirfd, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); if (ret_recv != sizeof(chunk_dirfd)) { ERR("Failed to receive trace chunk domain directory file descriptor"); goto error_fatal; } - DBG("Received trace chunk domain directory fd (%d)", - chunk_dirfd); - chunk_directory_handle = lttng_directory_handle_create_from_dirfd( - chunk_dirfd); + DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd); + chunk_directory_handle = + lttng_directory_handle_create_from_dirfd(chunk_dirfd); if (!chunk_directory_handle) { ERR("Failed to initialize chunk domain directory handle from directory file descriptor"); if (close(chunk_dirfd)) { @@ -2171,48 +2172,39 @@ end_rotate_channel_nosignal: } ret_code = lttng_consumer_create_trace_chunk( - !is_local_trace ? &relayd_id : NULL, - msg.u.create_trace_chunk.session_id, - msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk - .creation_timestamp, - chunk_override_name, - msg.u.create_trace_chunk.credentials.is_set ? - &credentials : - NULL, - chunk_directory_handle); + !is_local_trace ? &relayd_id : NULL, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk.creation_timestamp, + chunk_override_name, + msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL, + chunk_directory_handle); lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { enum lttng_trace_chunk_command_type close_command = - (lttng_trace_chunk_command_type) - msg.u.close_trace_chunk.close_command.value; - const uint64_t relayd_id = - msg.u.close_trace_chunk.relayd_id.value; + (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value; + const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; struct lttcomm_consumer_close_trace_chunk_reply reply; char closed_trace_chunk_path[LTTNG_PATH_MAX] = {}; int ret; ret_code = lttng_consumer_close_trace_chunk( - msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : - NULL, - msg.u.close_trace_chunk.session_id, - msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp, - msg.u.close_trace_chunk.close_command.is_set ? - &close_command : - NULL, closed_trace_chunk_path); + msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL, + closed_trace_chunk_path); reply.ret_code = ret_code; reply.path_length = strlen(closed_trace_chunk_path) + 1; ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); if (ret != sizeof(reply)) { goto error_fatal; } - ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, - reply.path_length); + ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length); if (ret != reply.path_length) { goto error_fatal; } @@ -2220,26 +2212,22 @@ end_rotate_channel_nosignal: } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { - const uint64_t relayd_id = - msg.u.trace_chunk_exists.relayd_id.value; + const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value; ret_code = lttng_consumer_trace_chunk_exists( - msg.u.trace_chunk_exists.relayd_id.is_set ? - &relayd_id : NULL, - msg.u.trace_chunk_exists.session_id, - msg.u.trace_chunk_exists.chunk_id); + msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: { const uint64_t key = msg.u.open_channel_packets.key; - struct lttng_consumer_channel *found_channel = - consumer_find_channel(key); + struct lttng_consumer_channel *found_channel = consumer_find_channel(key); if (found_channel) { pthread_mutex_lock(&found_channel->lock); - ret_code = lttng_consumer_open_channel_packets( - found_channel); + ret_code = lttng_consumer_open_channel_packets(found_channel); pthread_mutex_unlock(&found_channel->lock); } else { /* @@ -2312,8 +2300,7 @@ end: return ret_func; } -int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, - int producer_active) +int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2339,8 +2326,7 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream) * * Returns 0 on success, < 0 on error. */ -int lttng_ustconsumer_sample_snapshot_positions( - struct lttng_consumer_stream *stream) +int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2353,8 +2339,8 @@ int lttng_ustconsumer_sample_snapshot_positions( * * Returns 0 on success, < 0 on error */ -int lttng_ustconsumer_get_produced_snapshot( - struct lttng_consumer_stream *stream, unsigned long *pos) +int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2368,8 +2354,8 @@ int lttng_ustconsumer_get_produced_snapshot( * * Returns 0 on success, < 0 on error */ -int lttng_ustconsumer_get_consumed_snapshot( - struct lttng_consumer_stream *stream, unsigned long *pos) +int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2378,8 +2364,7 @@ int lttng_ustconsumer_get_consumed_snapshot( return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos); } -int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, - int producer) +int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2395,8 +2380,7 @@ int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream) return lttng_ust_ctl_clear_buffer(stream->ustream); } -int lttng_ustconsumer_get_current_timestamp( - struct lttng_consumer_stream *stream, uint64_t *ts) +int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2405,8 +2389,7 @@ int lttng_ustconsumer_get_current_timestamp( return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts); } -int lttng_ustconsumer_get_sequence_number( - struct lttng_consumer_stream *stream, uint64_t *seq) +int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream->ustream); @@ -2462,10 +2445,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ERR("Cannot get stream shm path"); } ret = run_as_unlink(shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials))); + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( + chan->buffer_credentials))); if (ret) { PERROR("unlink %s", shm_path); } @@ -2483,12 +2466,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan) lttng_ust_ctl_destroy_channel(chan->uchan); /* Try to rmdir all directories under shm_path root. */ if (chan->root_shm_path[0]) { - (void) run_as_rmdir_recursive(chan->root_shm_path, - lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR( - chan->buffer_credentials)), - LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); + (void) run_as_rmdir_recursive( + chan->root_shm_path, + lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)), + lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)), + LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG); } free(chan->stream_fds); } @@ -2526,15 +2508,13 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) * Returns the number of bytes pushed from the cache into the ring buffer, or a * negative value on error. */ -static -int commit_one_metadata_packet(struct lttng_consumer_stream *stream) +static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) { ssize_t write_len; int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->contents.size == - stream->ust_metadata_pushed) { + if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) { /* * In the context of a user space metadata channel, a * change in version can be detected in two ways: @@ -2559,21 +2539,20 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) * occur as part of the pre-consume) until the metadata size * exceeded the cache size. */ - if (stream->metadata_version != - stream->chan->metadata_cache->version) { + if (stream->metadata_version != stream->chan->metadata_cache->version) { metadata_stream_reset_cache_consumed_position(stream); consumer_stream_metadata_set_version(stream, - stream->chan->metadata_cache->version); + stream->chan->metadata_cache->version); } else { ret = 0; goto end; } } - write_len = lttng_ust_ctl_write_one_packet_to_channel(stream->chan->uchan, - &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], - stream->chan->metadata_cache->contents.size - - stream->ust_metadata_pushed); + write_len = lttng_ust_ctl_write_one_packet_to_channel( + stream->chan->uchan, + &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed], + stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed); LTTNG_ASSERT(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); @@ -2582,8 +2561,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) } stream->ust_metadata_pushed += write_len; - LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= - stream->ust_metadata_pushed); + LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; /* @@ -2602,7 +2580,6 @@ end: return ret; } - /* * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. @@ -2613,9 +2590,9 @@ end: * * The RCU read side lock must be held by the caller. */ -enum sync_metadata_status lttng_ustconsumer_sync_metadata( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *metadata_stream) +enum sync_metadata_status +lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *metadata_stream) { int ret; enum sync_metadata_status status; @@ -2651,7 +2628,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( */ if (consumer_stream_is_deleted(metadata_stream)) { DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", - metadata_stream->key); + metadata_stream->key); status = SYNC_METADATA_STATUS_NO_DATA; goto end; } @@ -2669,7 +2646,8 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( ret = lttng_ust_ctl_snapshot(metadata_stream->ustream); if (ret < 0) { - ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret); + ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", + ret); status = SYNC_METADATA_STATUS_ERROR; goto end; } @@ -2682,7 +2660,7 @@ end: * Return 0 on success else a negative value. */ static int notify_if_more_data(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx) { int ret; struct lttng_ust_ctl_consumer_stream *ustream; @@ -2758,18 +2736,17 @@ static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream) } static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; - ret = lttng_ust_ctl_get_subbuf_size( - stream->ustream, &subbuf->info.data.subbuf_size); + ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size); if (ret) { goto end; } - ret = lttng_ust_ctl_get_padded_subbuf_size( - stream->ustream, &subbuf->info.data.padded_subbuf_size); + ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, + &subbuf->info.data.padded_subbuf_size); if (ret) { goto end; } @@ -2779,7 +2756,7 @@ end: } static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; @@ -2795,7 +2772,7 @@ end: } static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) + struct stream_subbuffer *subbuf) { int ret; @@ -2804,43 +2781,40 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - ret = lttng_ust_ctl_get_packet_size( - stream->ustream, &subbuf->info.data.packet_size); + ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size); if (ret < 0) { PERROR("Failed to get sub-buffer packet size"); goto end; } - ret = lttng_ust_ctl_get_content_size( - stream->ustream, &subbuf->info.data.content_size); + ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size); if (ret < 0) { PERROR("Failed to get sub-buffer content size"); goto end; } - ret = lttng_ust_ctl_get_timestamp_begin( - stream->ustream, &subbuf->info.data.timestamp_begin); + ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream, + &subbuf->info.data.timestamp_begin); if (ret < 0) { PERROR("Failed to get sub-buffer begin timestamp"); goto end; } - ret = lttng_ust_ctl_get_timestamp_end( - stream->ustream, &subbuf->info.data.timestamp_end); + ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end); if (ret < 0) { PERROR("Failed to get sub-buffer end timestamp"); goto end; } - ret = lttng_ust_ctl_get_events_discarded( - stream->ustream, &subbuf->info.data.events_discarded); + ret = lttng_ust_ctl_get_events_discarded(stream->ustream, + &subbuf->info.data.events_discarded); if (ret) { PERROR("Failed to get sub-buffer events discarded count"); goto end; } ret = lttng_ust_ctl_get_sequence_number(stream->ustream, - &subbuf->info.data.sequence_number.value); + &subbuf->info.data.sequence_number.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -2851,15 +2825,14 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, subbuf->info.data.sequence_number.is_set = true; } - ret = lttng_ust_ctl_get_stream_id( - stream->ustream, &subbuf->info.data.stream_id); + ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id); if (ret < 0) { PERROR("Failed to get stream id"); goto end; } ret = lttng_ust_ctl_get_instance_id(stream->ustream, - &subbuf->info.data.stream_instance_id.value); + &subbuf->info.data.stream_instance_id.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -2874,13 +2847,12 @@ end: } static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) + struct stream_subbuffer *subbuffer) { int ret; const char *addr; - ret = stream->read_subbuffer_ops.extract_subbuffer_info( - stream, subbuffer); + ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer); if (ret) { goto end; } @@ -2890,16 +2862,15 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, goto end; } - subbuffer->buffer.buffer = lttng_buffer_view_init( - addr, 0, subbuffer->info.data.padded_subbuf_size); + subbuffer->buffer.buffer = + lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL); end: return ret; } -static enum get_next_subbuffer_status get_next_subbuffer( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; enum get_next_subbuffer_status status; @@ -2910,7 +2881,7 @@ static enum get_next_subbuffer_status get_next_subbuffer( status = GET_NEXT_SUBBUFFER_STATUS_OK; break; case -ENODATA: - case -EAGAIN: + case -EAGAIN: /* * The caller only expects -ENODATA when there is no data to * read, but the kernel tracer returns -EAGAIN when there is @@ -2934,9 +2905,9 @@ end: return status; } -static enum get_next_subbuffer_status get_next_subbuffer_metadata( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status +get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; bool cache_empty; @@ -2979,7 +2950,7 @@ static enum get_next_subbuffer_status get_next_subbuffer_metadata( } else { pthread_mutex_lock(&stream->chan->metadata_cache->lock); cache_empty = stream->chan->metadata_cache->contents.size == - stream->ust_metadata_pushed; + stream->ust_metadata_pushed; pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } } while (!got_subbuffer); @@ -3034,7 +3005,7 @@ end: } static int put_next_subbuffer(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer __attribute__((unused))) + struct stream_subbuffer *subbuffer __attribute__((unused))) { const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream); @@ -3043,42 +3014,35 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream, } static int signal_metadata(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx __attribute__((unused))) + struct lttng_consumer_local_data *ctx __attribute__((unused))) { ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static int lttng_ustconsumer_set_stream_ops( - struct lttng_consumer_stream *stream) +static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream) { int ret = 0; stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_metadata; - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_metadata_subbuffer_info; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache_consumed_position; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; - ret = consumer_stream_enable_metadata_bucketization( - stream); + ret = consumer_stream_enable_metadata_bucketization(stream); if (ret) { goto end; } } } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer; - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_data_subbuffer_info; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info; stream->read_subbuffer_ops.on_sleep = notify_if_more_data; if (stream->chan->is_live) { - stream->read_subbuffer_ops.send_live_beacon = - consumer_flush_ust_index; + stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index; } } @@ -3103,7 +3067,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) * no current trace chunk on the parent channel. */ if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && - stream->chan->trace_chunk) { + stream->chan->trace_chunk) { ret = consumer_stream_create_output_files(stream, true); if (ret) { goto error; @@ -3159,12 +3123,14 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) * whetnever ust_metadata_pushed is incremented, the associated * metadata has been consumed from the metadata stream. */ - DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64, - contiguous, pushed); + DBG("UST consumer metadata pending check: contiguous %" PRIu64 + " vs pushed %" PRIu64, + contiguous, + pushed); LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0); if ((contiguous != pushed) || - (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { - ret = 1; /* Data is pending */ + (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) { + ret = 1; /* Data is pending */ goto end; } } else { @@ -3176,7 +3142,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) */ ret = lttng_ust_ctl_put_subbuf(stream->ustream); LTTNG_ASSERT(ret == 0); - ret = 1; /* Data is pending */ + ret = 1; /* Data is pending */ goto end; } } @@ -3247,15 +3213,12 @@ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht) DBG("UST consumer closing all metadata streams"); rcu_read_lock(); - cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, - node.node) { - + cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { health_code_update(); pthread_mutex_lock(&stream->chan->lock); lttng_ustconsumer_close_metadata(stream->chan); pthread_mutex_unlock(&stream->chan->lock); - } rcu_read_unlock(); } @@ -3281,7 +3244,9 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, + int timer, + int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -3317,17 +3282,18 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.uid = channel->ust_app_uid; request.key = channel->key; - DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, - request.session_id, request.session_id_per_pid, request.uid, - request.key); + DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64 + ", app UID %u and channel key %" PRIu64, + request.session_id, + request.session_id_per_pid, + request.uid, + request.key); pthread_mutex_lock(&ctx->metadata_socket_lock); health_code_update(); - ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, - sizeof(request)); + ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request)); if (ret < 0) { ERR("Asking metadata to sessiond"); goto end; @@ -3336,11 +3302,9 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); /* Receive the metadata from sessiond */ - ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, - sizeof(msg)); + ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - DBG("Consumer received unexpected message size %d (expects %zu)", - ret, sizeof(msg)); + DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg)); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); /* * The ret value might 0 meaning an orderly shutdown but this is ok @@ -3353,8 +3317,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, if (msg.cmd_type == LTTNG_ERR_UND) { /* No registry found */ - (void) consumer_send_status_msg(ctx->consumer_metadata_socket, - ret_code); + (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code); ret = 0; goto end; } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) { @@ -3376,8 +3339,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); /* Tell session daemon we are ready to receive the metadata. */ - ret = consumer_send_status_msg(ctx->consumer_metadata_socket, - LTTCOMM_CONSUMERD_SUCCESS); + ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS); if (ret < 0 || len == 0) { /* * Somehow, the session daemon is not responding anymore or there is @@ -3388,8 +3350,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, version, channel, timer, wait); + ret = lttng_ustconsumer_recv_metadata( + ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive @@ -3409,8 +3371,7 @@ end: /* * Return the ustctl call for the get stream id. */ -int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, - uint64_t *stream_id) +int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id) { LTTNG_ASSERT(stream); LTTNG_ASSERT(stream_id);