X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.cpp;fp=src%2Fbin%2Flttng-sessiond%2Fconsumer.cpp;h=17d2248bec8f9f2c35a1bb6e90f92b694867335c;hp=c0f8eb5894e8fae8cb070109d4820ebec62ddff2;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/bin/lttng-sessiond/consumer.cpp b/src/bin/lttng-sessiond/consumer.cpp index c0f8eb589..17d2248be 100644 --- a/src/bin/lttng-sessiond/consumer.cpp +++ b/src/bin/lttng-sessiond/consumer.cpp @@ -7,25 +7,25 @@ */ #define _LGPL_SOURCE -#include -#include -#include -#include -#include -#include -#include +#include "consumer.hpp" +#include "health-sessiond.hpp" +#include "lttng-sessiond.hpp" +#include "ust-app.hpp" +#include "utils.hpp" #include #include -#include #include #include +#include -#include "consumer.hpp" -#include "health-sessiond.hpp" -#include "ust-app.hpp" -#include "utils.hpp" -#include "lttng-sessiond.hpp" +#include +#include +#include +#include +#include +#include +#include /* * Return allocated full pathname of the session using the consumer trace path @@ -35,7 +35,8 @@ * returned. */ char *setup_channel_trace_path(struct consumer_output *consumer, - const char *session_path, size_t *consumer_path_offset) + const char *session_path, + size_t *consumer_path_offset) { int ret; char *pathname; @@ -55,21 +56,22 @@ char *setup_channel_trace_path(struct consumer_output *consumer, } /* Get correct path name destination */ - if (consumer->type == CONSUMER_DST_NET && - consumer->relay_major_version == 2 && - consumer->relay_minor_version < 11) { - ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s/%s/%s", - consumer->dst.net.base_dir, - consumer->chunk_path, consumer->domain_subdir, - session_path); + if (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 && + consumer->relay_minor_version < 11) { + ret = snprintf(pathname, + LTTNG_PATH_MAX, + "%s%s/%s/%s", + consumer->dst.net.base_dir, + consumer->chunk_path, + consumer->domain_subdir, + session_path); *consumer_path_offset = 0; } else { - ret = snprintf(pathname, LTTNG_PATH_MAX, "%s/%s", - consumer->domain_subdir, session_path); + ret = snprintf( + pathname, LTTNG_PATH_MAX, "%s/%s", consumer->domain_subdir, session_path); *consumer_path_offset = strlen(consumer->domain_subdir) + 1; } - DBG3("Consumer trace path relative to current trace chunk: \"%s\"", - pathname); + DBG3("Consumer trace path relative to current trace chunk: \"%s\"", pathname); if (ret < 0) { PERROR("Failed to format channel path"); goto error; @@ -92,8 +94,7 @@ error: * * Return 0 on success else a negative value on error. */ -int consumer_socket_send( - struct consumer_socket *socket, const void *msg, size_t len) +int consumer_socket_send(struct consumer_socket *socket, const void *msg, size_t len) { int fd; ssize_t size; @@ -213,7 +214,8 @@ end: * negative value is sent back and both parameters are untouched. */ int consumer_recv_status_channel(struct consumer_socket *sock, - uint64_t *key, unsigned int *stream_count) + uint64_t *key, + unsigned int *stream_count) { int ret; struct lttcomm_consumer_status_channel reply; @@ -246,8 +248,7 @@ end: * * On success return positive value. On error, negative value. */ -int consumer_send_destroy_relayd(struct consumer_socket *sock, - struct consumer_output *consumer) +int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer) { int ret; struct lttcomm_consumer_msg msg; @@ -291,8 +292,7 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) /* Destroy any relayd connection */ if (consumer->type == CONSUMER_DST_NET) { rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { int ret; /* Send destroy relayd command */ @@ -312,8 +312,7 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) * * Return 0 on success, else negative value on error */ -int consumer_create_socket(struct consumer_data *data, - struct consumer_output *output) +int consumer_create_socket(struct consumer_data *data, struct consumer_output *output) { int ret = 0; struct consumer_socket *socket; @@ -347,8 +346,7 @@ int consumer_create_socket(struct consumer_data *data, socket->type = data->type; - DBG3("Consumer socket created (fd: %d) and added to output", - data->cmd_sock); + DBG3("Consumer socket created (fd: %d) and added to output", data->cmd_sock); error: return ret; @@ -362,7 +360,7 @@ error: * object reference is not needed anymore. */ struct consumer_socket *consumer_find_socket_by_bitness(int bits, - const struct consumer_output *consumer) + const struct consumer_output *consumer) { int consumer_fd; struct consumer_socket *socket = NULL; @@ -383,8 +381,7 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits, socket = consumer_find_socket(consumer_fd, consumer); if (!socket) { - ERR("Consumer socket fd %d not found in consumer obj %p", - consumer_fd, consumer); + ERR("Consumer socket fd %d not found in consumer obj %p", consumer_fd, consumer); } end: @@ -396,8 +393,7 @@ end: * be acquired before calling this function and across use of the * returned consumer_socket. */ -struct consumer_socket *consumer_find_socket(int key, - const struct consumer_output *consumer) +struct consumer_socket *consumer_find_socket(int key, const struct consumer_output *consumer) { struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; @@ -410,8 +406,7 @@ struct consumer_socket *consumer_find_socket(int key, return NULL; } - lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key), - &iter); + lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (node != NULL) { socket = lttng::utils::container_of(node, &consumer_socket::node); @@ -446,8 +441,7 @@ error: * Add consumer socket to consumer output object. Read side lock must be * acquired before calling this function. */ -void consumer_add_socket(struct consumer_socket *sock, - struct consumer_output *consumer) +void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer) { LTTNG_ASSERT(sock); LTTNG_ASSERT(consumer); @@ -460,8 +454,7 @@ void consumer_add_socket(struct consumer_socket *sock, * Delete consumer socket to consumer output object. Read side lock must be * acquired before calling this function. */ -void consumer_del_socket(struct consumer_socket *sock, - struct consumer_output *consumer) +void consumer_del_socket(struct consumer_socket *sock, struct consumer_output *consumer) { int ret; struct lttng_ht_iter iter; @@ -482,8 +475,7 @@ static void destroy_socket_rcu(struct rcu_head *head) { struct lttng_ht_node_ulong *node = lttng::utils::container_of(head, <tng_ht_node_ulong::head); - struct consumer_socket *socket = - lttng::utils::container_of(node, &consumer_socket::node); + struct consumer_socket *socket = lttng::utils::container_of(node, &consumer_socket::node); free(socket); } @@ -553,7 +545,7 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) } rcu_read_lock(); - cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { + cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { consumer_del_socket(socket, obj); consumer_destroy_socket(socket); } @@ -565,8 +557,7 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) */ static void consumer_release_output(struct urcu_ref *ref) { - struct consumer_output *obj = - lttng::utils::container_of(ref, &consumer_output::ref); + struct consumer_output *obj = lttng::utils::container_of(ref, &consumer_output::ref); consumer_destroy_output_sockets(obj); @@ -613,8 +604,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src) } output->enabled = src->enabled; output->net_seq_index = src->net_seq_index; - memcpy(output->domain_subdir, src->domain_subdir, - sizeof(output->domain_subdir)); + memcpy(output->domain_subdir, src->domain_subdir, sizeof(output->domain_subdir)); output->snapshot = src->snapshot; output->relay_major_version = src->relay_major_version; output->relay_minor_version = src->relay_minor_version; @@ -637,8 +627,7 @@ error_put: * * Return 0 on success or else a negative value. */ -int consumer_copy_sockets(struct consumer_output *dst, - struct consumer_output *src) +int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src) { int ret = 0; struct lttng_ht_iter iter; @@ -648,7 +637,7 @@ int consumer_copy_sockets(struct consumer_output *dst, LTTNG_ASSERT(src); rcu_read_lock(); - cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) { + cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { /* Ignore socket that are already there. */ copy_sock = consumer_find_socket(*socket->fd_ptr, dst); if (copy_sock) { @@ -685,8 +674,8 @@ error: * error. */ int consumer_set_network_uri(const struct ltt_session *session, - struct consumer_output *output, - struct lttng_uri *uri) + struct consumer_output *output, + struct lttng_uri *uri) { int ret; struct lttng_uri *dst_uri = NULL; @@ -703,8 +692,7 @@ int consumer_set_network_uri(const struct ltt_session *session, /* Assign default port. */ uri->port = DEFAULT_NETWORK_CONTROL_PORT; } else { - if (output->dst.net.data_isset && uri->port == - output->dst.net.data.port) { + if (output->dst.net.data_isset && uri->port == output->dst.net.data.port) { ret = -LTTNG_ERR_INVALID; goto error; } @@ -718,8 +706,8 @@ int consumer_set_network_uri(const struct ltt_session *session, /* Assign default port. */ uri->port = DEFAULT_NETWORK_DATA_PORT; } else { - if (output->dst.net.control_isset && uri->port == - output->dst.net.control.port) { + if (output->dst.net.control_isset && + uri->port == output->dst.net.control.port) { ret = -LTTNG_ERR_INVALID; goto error; } @@ -776,14 +764,17 @@ int consumer_set_network_uri(const struct ltt_session *session, goto error; } ret = snprintf(output->dst.net.base_dir, - sizeof(output->dst.net.base_dir), - "/%s/%s/", session->hostname, uri->subdir); + sizeof(output->dst.net.base_dir), + "/%s/%s/", + session->hostname, + uri->subdir); } else { if (session->has_auto_generated_name) { ret = snprintf(output->dst.net.base_dir, - sizeof(output->dst.net.base_dir), - "/%s/%s/", session->hostname, - session->name); + sizeof(output->dst.net.base_dir), + "/%s/%s/", + session->hostname, + session->name); } else { char session_creation_datetime[16]; size_t strftime_ret; @@ -795,18 +786,20 @@ int consumer_set_network_uri(const struct ltt_session *session, goto error; } strftime_ret = strftime(session_creation_datetime, - sizeof(session_creation_datetime), - "%Y%m%d-%H%M%S", timeinfo); + sizeof(session_creation_datetime), + "%Y%m%d-%H%M%S", + timeinfo); if (strftime_ret == 0) { ERR("Failed to format session creation timestamp while setting network URI"); ret = -LTTNG_ERR_FATAL; goto error; } ret = snprintf(output->dst.net.base_dir, - sizeof(output->dst.net.base_dir), - "/%s/%s-%s/", session->hostname, - session->name, - session_creation_datetime); + sizeof(output->dst.net.base_dir), + "/%s/%s-%s/", + session->hostname, + session->name, + session_creation_datetime); } } if (ret >= sizeof(output->dst.net.base_dir)) { @@ -819,8 +812,7 @@ int consumer_set_network_uri(const struct ltt_session *session, goto error; } - DBG3("Consumer set network uri base_dir path %s", - output->dst.net.base_dir); + DBG3("Consumer set network uri base_dir path %s", output->dst.net.base_dir); end: return 0; @@ -835,8 +827,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_fds(struct consumer_socket *sock, const int *fds, - size_t nb_fd) +int consumer_send_fds(struct consumer_socket *sock, const int *fds, size_t nb_fd) { int ret; @@ -862,8 +853,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_msg(struct consumer_socket *sock, - const struct lttcomm_consumer_msg *msg) +int consumer_send_msg(struct consumer_socket *sock, const struct lttcomm_consumer_msg *msg) { int ret; @@ -887,8 +877,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_channel(struct consumer_socket *sock, - struct lttcomm_consumer_msg *msg) +int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg) { int ret; @@ -909,33 +898,33 @@ error: * information. */ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, - uint64_t subbuf_size, - uint64_t num_subbuf, - int overwrite, - unsigned int switch_timer_interval, - unsigned int read_timer_interval, - unsigned int live_timer_interval, - bool is_in_live_session, - unsigned int monitor_timer_interval, - int output, - int type, - uint64_t session_id, - const char *pathname, - const char *name, - uint64_t relayd_id, - uint64_t key, - const lttng_uuid& uuid, - uint32_t chan_id, - uint64_t tracefile_size, - uint64_t tracefile_count, - uint64_t session_id_per_pid, - unsigned int monitor, - uint32_t ust_app_uid, - int64_t blocking_timeout, - const char *root_shm_path, - const char *shm_path, - struct lttng_trace_chunk *trace_chunk, - const struct lttng_credentials *buffer_credentials) + uint64_t subbuf_size, + uint64_t num_subbuf, + int overwrite, + unsigned int switch_timer_interval, + unsigned int read_timer_interval, + unsigned int live_timer_interval, + bool is_in_live_session, + unsigned int monitor_timer_interval, + int output, + int type, + uint64_t session_id, + const char *pathname, + const char *name, + uint64_t relayd_id, + uint64_t key, + const lttng_uuid& uuid, + uint32_t chan_id, + uint64_t tracefile_size, + uint64_t tracefile_count, + uint64_t session_id_per_pid, + unsigned int monitor, + uint32_t ust_app_uid, + int64_t blocking_timeout, + const char *root_shm_path, + const char *shm_path, + struct lttng_trace_chunk *trace_chunk, + const struct lttng_credentials *buffer_credentials) { LTTNG_ASSERT(msg); @@ -952,14 +941,12 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id); } - msg->u.ask_channel.buffer_credentials.uid = - lttng_credentials_get_uid(buffer_credentials); - msg->u.ask_channel.buffer_credentials.gid = - lttng_credentials_get_gid(buffer_credentials); + msg->u.ask_channel.buffer_credentials.uid = lttng_credentials_get_uid(buffer_credentials); + msg->u.ask_channel.buffer_credentials.gid = lttng_credentials_get_gid(buffer_credentials); msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; msg->u.ask_channel.subbuf_size = subbuf_size; - msg->u.ask_channel.num_subbuf = num_subbuf ; + msg->u.ask_channel.num_subbuf = num_subbuf; msg->u.ask_channel.overwrite = overwrite; msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; @@ -982,22 +969,22 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid); if (pathname) { - strncpy(msg->u.ask_channel.pathname, pathname, - sizeof(msg->u.ask_channel.pathname)); - msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0'; + strncpy(msg->u.ask_channel.pathname, pathname, sizeof(msg->u.ask_channel.pathname)); + msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname) - 1] = '\0'; } strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name)); msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0'; if (root_shm_path) { - strncpy(msg->u.ask_channel.root_shm_path, root_shm_path, + strncpy(msg->u.ask_channel.root_shm_path, + root_shm_path, sizeof(msg->u.ask_channel.root_shm_path)); - msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] = '\0'; + msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] = + '\0'; } if (shm_path) { - strncpy(msg->u.ask_channel.shm_path, shm_path, - sizeof(msg->u.ask_channel.shm_path)); + strncpy(msg->u.ask_channel.shm_path, shm_path, sizeof(msg->u.ask_channel.shm_path)); msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0'; } } @@ -1006,21 +993,21 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, * Init channel communication message structure. */ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, - uint64_t channel_key, - uint64_t session_id, - const char *pathname, - uint64_t relayd_id, - const char *name, - unsigned int nb_init_streams, - enum lttng_event_output output, - int type, - uint64_t tracefile_size, - uint64_t tracefile_count, - unsigned int monitor, - unsigned int live_timer_interval, - bool is_in_live_session, - unsigned int monitor_timer_interval, - struct lttng_trace_chunk *trace_chunk) + uint64_t channel_key, + uint64_t session_id, + const char *pathname, + uint64_t relayd_id, + const char *name, + unsigned int nb_init_streams, + enum lttng_event_output output, + int type, + uint64_t tracefile_size, + uint64_t tracefile_count, + unsigned int monitor, + unsigned int live_timer_interval, + bool is_in_live_session, + unsigned int monitor_timer_interval, + struct lttng_trace_chunk *trace_chunk) { LTTNG_ASSERT(msg); @@ -1051,8 +1038,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.is_live = is_in_live_session; msg->u.channel.monitor_timer_interval = monitor_timer_interval; - strncpy(msg->u.channel.pathname, pathname, - sizeof(msg->u.channel.pathname)); + strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0'; strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name)); @@ -1063,9 +1049,9 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, * Init stream communication message structure. */ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, - uint64_t channel_key, - uint64_t stream_key, - int32_t cpu) + uint64_t channel_key, + uint64_t stream_key, + int32_t cpu) { LTTNG_ASSERT(msg); @@ -1078,8 +1064,9 @@ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, } void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, - uint64_t channel_key, uint64_t net_seq_idx) + enum lttng_consumer_command cmd, + uint64_t channel_key, + uint64_t net_seq_idx) { LTTNG_ASSERT(msg); @@ -1094,8 +1081,10 @@ void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, * Send stream communication structure to the consumer. */ int consumer_send_stream(struct consumer_socket *sock, - struct consumer_output *dst, struct lttcomm_consumer_msg *msg, - const int *fds, size_t nb_fd) + struct consumer_output *dst, + struct lttcomm_consumer_msg *msg, + const int *fds, + size_t nb_fd) { int ret; @@ -1126,12 +1115,17 @@ error: * On success return positive value. On error, negative value. */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, - struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, - enum lttng_stream_type type, uint64_t session_id, - const char *session_name, const char *hostname, - const char *base_path, int session_live_timer, - const uint64_t *current_chunk_id, time_t session_creation_time, - bool session_name_contains_creation_time) + struct lttcomm_relayd_sock *rsock, + struct consumer_output *consumer, + enum lttng_stream_type type, + uint64_t session_id, + const char *session_name, + const char *hostname, + const char *base_path, + int session_live_timer, + const uint64_t *current_chunk_id, + time_t session_creation_time, + bool session_name_contains_creation_time) { int ret; int fd; @@ -1153,21 +1147,26 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, char output_path[LTTNG_PATH_MAX] = {}; uint64_t relayd_session_id; - ret = relayd_create_session(rsock, &relayd_session_id, - session_name, hostname, base_path, - session_live_timer, consumer->snapshot, - session_id, the_sessiond_uuid, current_chunk_id, - session_creation_time, - session_name_contains_creation_time, - output_path); + ret = relayd_create_session(rsock, + &relayd_session_id, + session_name, + hostname, + base_path, + session_live_timer, + consumer->snapshot, + session_id, + the_sessiond_uuid, + current_chunk_id, + session_creation_time, + session_name_contains_creation_time, + output_path); if (ret < 0) { /* Close the control socket. */ (void) relayd_close(rsock); goto error; } msg.u.relayd_sock.relayd_session_id = relayd_session_id; - DBG("Created session on relay, output path reply: %s", - output_path); + DBG("Created session on relay, output path reply: %s", output_path); } msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; @@ -1202,9 +1201,8 @@ error: return ret; } -static -int consumer_send_pipe(struct consumer_socket *consumer_sock, - enum lttng_consumer_command cmd, int pipe) +static int +consumer_send_pipe(struct consumer_socket *consumer_sock, enum lttng_consumer_command cmd, int pipe) { int ret; struct lttcomm_consumer_msg msg; @@ -1217,8 +1215,7 @@ int consumer_send_pipe(struct consumer_socket *consumer_sock, command_name = "SET_CHANNEL_MONITOR_PIPE"; break; default: - ERR("Unexpected command received in %s (cmd = %d)", __func__, - (int) cmd); + ERR("Unexpected command received in %s (cmd = %d)", __func__, (int) cmd); abort(); } @@ -1234,9 +1231,7 @@ int consumer_send_pipe(struct consumer_socket *consumer_sock, goto error; } - DBG3("Sending %s pipe %d to consumer on socket %d", - pipe_name, - pipe, *consumer_sock->fd_ptr); + DBG3("Sending %s pipe %d to consumer on socket %d", pipe_name, pipe, *consumer_sock->fd_ptr); ret = consumer_send_fds(consumer_sock, &pipe, 1); if (ret < 0) { goto error; @@ -1248,22 +1243,19 @@ error: return ret; } -int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, - int pipe) +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, int pipe) { - return consumer_send_pipe(consumer_sock, - LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe); + return consumer_send_pipe(consumer_sock, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe); } /* * Ask the consumer if the data is pending for the specific session id. * Returns 1 if data is pending, 0 otherwise, or < 0 on error. */ -int consumer_is_data_pending(uint64_t session_id, - struct consumer_output *consumer) +int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer) { int ret; - int32_t ret_code = 0; /* Default is that the data is NOT pending */ + int32_t ret_code = 0; /* Default is that the data is NOT pending */ struct consumer_socket *socket; struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; @@ -1278,8 +1270,7 @@ int consumer_is_data_pending(uint64_t session_id, /* Send command for each consumer */ rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { pthread_mutex_lock(socket->lock); ret = consumer_socket_send(socket, &msg, sizeof(msg)); if (ret < 0) { @@ -1306,7 +1297,8 @@ int consumer_is_data_pending(uint64_t session_id, rcu_read_unlock(); DBG("Consumer data is %s pending for session id %" PRIu64, - ret_code == 1 ? "" : "NOT", session_id); + ret_code == 1 ? "" : "NOT", + session_id); return ret_code; error_unlock: @@ -1384,8 +1376,7 @@ end: * * Return 0 on success else a negative value. */ -int consumer_close_metadata(struct consumer_socket *socket, - uint64_t metadata_key) +int consumer_close_metadata(struct consumer_socket *socket, uint64_t metadata_key) { int ret; struct lttcomm_consumer_msg msg; @@ -1417,8 +1408,7 @@ end: * * Return 0 on success else a negative value. */ -int consumer_setup_metadata(struct consumer_socket *socket, - uint64_t metadata_key) +int consumer_setup_metadata(struct consumer_socket *socket, uint64_t metadata_key) { int ret; struct lttcomm_consumer_msg msg; @@ -1452,8 +1442,11 @@ end: * Return 0 on success else a negative value. */ int consumer_push_metadata(struct consumer_socket *socket, - uint64_t metadata_key, char *metadata_str, size_t len, - size_t target_offset, uint64_t version) + uint64_t metadata_key, + char *metadata_str, + size_t len, + size_t target_offset, + uint64_t version) { int ret; struct lttcomm_consumer_msg msg; @@ -1478,8 +1471,7 @@ int consumer_push_metadata(struct consumer_socket *socket, goto end; } - DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, - len); + DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, len); ret = consumer_socket_send(socket, metadata_str, len); if (ret < 0) { @@ -1504,9 +1496,11 @@ end: * Returns LTTNG_OK on success or else an LTTng error code. */ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, - uint64_t key, const struct consumer_output *output, int metadata, - const char *channel_path, - uint64_t nb_packets_per_stream) + uint64_t key, + const struct consumer_output *output, + int metadata, + const char *channel_path, + uint64_t nb_packets_per_stream) { int ret; enum lttng_error_code status = LTTNG_OK; @@ -1524,20 +1518,19 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, msg.u.snapshot_channel.metadata = metadata; if (output->type == CONSUMER_DST_NET) { - msg.u.snapshot_channel.relayd_id = - output->net_seq_index; + msg.u.snapshot_channel.relayd_id = output->net_seq_index; msg.u.snapshot_channel.use_relayd = 1; } else { msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL; } ret = lttng_strncpy(msg.u.snapshot_channel.pathname, - channel_path, - sizeof(msg.u.snapshot_channel.pathname)); + channel_path, + sizeof(msg.u.snapshot_channel.pathname)); if (ret < 0) { ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"", - sizeof(msg.u.snapshot_channel.pathname), - strlen(channel_path), - channel_path); + sizeof(msg.u.snapshot_channel.pathname), + strlen(channel_path), + channel_path); status = LTTNG_ERR_SNAPSHOT_FAIL; goto error; } @@ -1566,8 +1559,10 @@ error: /* * Ask the consumer the number of discarded events for a channel. */ -int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, - struct consumer_output *consumer, uint64_t *discarded) +int consumer_get_discarded_events(uint64_t session_id, + uint64_t channel_key, + struct consumer_output *consumer, + uint64_t *discarded) { int ret; struct consumer_socket *socket; @@ -1587,8 +1582,7 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, /* Send command for each consumer */ rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { uint64_t consumer_discarded = 0; pthread_mutex_lock(socket->lock); ret = consumer_socket_send(socket, &msg, sizeof(msg)); @@ -1601,8 +1595,7 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, * No need for a recv reply status because the answer to the * command is the reply status message. */ - ret = consumer_socket_recv(socket, &consumer_discarded, - sizeof(consumer_discarded)); + ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded)); if (ret < 0) { ERR("get discarded events"); pthread_mutex_unlock(socket->lock); @@ -1612,8 +1605,7 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, *discarded += consumer_discarded; } ret = 0; - DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, - *discarded, session_id); + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id); end: rcu_read_unlock(); @@ -1623,8 +1615,10 @@ end: /* * Ask the consumer the number of lost packets for a channel. */ -int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, - struct consumer_output *consumer, uint64_t *lost) +int consumer_get_lost_packets(uint64_t session_id, + uint64_t channel_key, + struct consumer_output *consumer, + uint64_t *lost) { int ret; struct consumer_socket *socket; @@ -1644,8 +1638,7 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, /* Send command for each consumer */ rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { uint64_t consumer_lost = 0; pthread_mutex_lock(socket->lock); ret = consumer_socket_send(socket, &msg, sizeof(msg)); @@ -1658,8 +1651,7 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, * No need for a recv reply status because the answer to the * command is the reply status message. */ - ret = consumer_socket_recv(socket, &consumer_lost, - sizeof(consumer_lost)); + ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); if (ret < 0) { ERR("get lost packets"); pthread_mutex_unlock(socket->lock); @@ -1669,8 +1661,7 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, *lost += consumer_lost; } ret = 0; - DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, - *lost, session_id); + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id); end: rcu_read_unlock(); @@ -1684,9 +1675,10 @@ end: * when the rotation started. On the relay, this allows to keep track in which * chunk each stream is currently writing to (for the rotate_pending operation). */ -int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, - struct consumer_output *output, - bool is_metadata_channel) +int consumer_rotate_channel(struct consumer_socket *socket, + uint64_t key, + struct consumer_output *output, + bool is_metadata_channel) { int ret; struct lttcomm_consumer_msg msg; @@ -1781,8 +1773,7 @@ error_socket: return ret; } -int consumer_init(struct consumer_socket *socket, - const lttng_uuid& sessiond_uuid) +int consumer_init(struct consumer_socket *socket, const lttng_uuid& sessiond_uuid) { int ret; struct lttcomm_consumer_msg msg = { @@ -1812,9 +1803,10 @@ error: * Called with the consumer socket lock held. */ int consumer_create_trace_chunk(struct consumer_socket *socket, - uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk, - const char *domain_subdir) + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *chunk, + const char *domain_subdir) { int ret; enum lttng_trace_chunk_status chunk_status; @@ -1840,44 +1832,38 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, LTTNG_ASSERT(chunk); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, - relayd_id); + LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, relayd_id); } - chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, - &chunk_name_overridden); + chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, &chunk_name_overridden); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK && - chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) { + chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) { ERR("Failed to get name of trace chunk"); ret = -LTTNG_ERR_FATAL; goto error; } if (chunk_name_overridden) { ret = lttng_strncpy(msg.u.create_trace_chunk.override_name, - chunk_name, - sizeof(msg.u.create_trace_chunk.override_name)); + chunk_name, + sizeof(msg.u.create_trace_chunk.override_name)); if (ret) { ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol", - chunk_name); + chunk_name); ret = -LTTNG_ERR_FATAL; goto error; } } - chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, - &creation_timestamp); + chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -LTTNG_ERR_FATAL; goto error; } - msg.u.create_trace_chunk.creation_timestamp = - (uint64_t) creation_timestamp; + msg.u.create_trace_chunk.creation_timestamp = (uint64_t) creation_timestamp; /* Only used for logging purposes. */ - ret = time_to_iso8601_str(creation_timestamp, - creation_timestamp_buffer, - sizeof(creation_timestamp_buffer)); - creation_timestamp_str = !ret ? creation_timestamp_buffer : - "(formatting error)"; + ret = time_to_iso8601_str( + creation_timestamp, creation_timestamp_buffer, sizeof(creation_timestamp_buffer)); + creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)"; chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { @@ -1893,13 +1879,12 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, if (chunk_has_local_output) { chunk_status = lttng_trace_chunk_borrow_chunk_directory_handle( - chunk, &chunk_directory_handle); + chunk, &chunk_directory_handle); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -LTTNG_ERR_FATAL; goto error; } - chunk_status = lttng_trace_chunk_get_credentials( - chunk, &chunk_credentials); + chunk_status = lttng_trace_chunk_get_credentials(chunk, &chunk_credentials); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { /* * Not associating credentials to a sessiond chunk is a @@ -1908,17 +1893,15 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, ret = -LTTNG_ERR_FATAL; goto error; } - tc_status = lttng_trace_chunk_create_subdirectory( - chunk, domain_subdir); + tc_status = lttng_trace_chunk_create_subdirectory(chunk, domain_subdir); if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) { PERROR("Failed to create chunk domain output directory \"%s\"", - domain_subdir); + domain_subdir); ret = -LTTNG_ERR_FATAL; goto error; } - domain_handle = lttng_directory_handle_create_from_handle( - domain_subdir, - chunk_directory_handle); + domain_handle = lttng_directory_handle_create_from_handle(domain_subdir, + chunk_directory_handle); if (!domain_handle) { ret = -LTTNG_ERR_FATAL; goto error; @@ -1932,22 +1915,22 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, * The ownership of the chunk directory handle's is maintained * by the trace chunk. */ - domain_dirfd = lttng_directory_handle_get_dirfd( - domain_handle); + domain_dirfd = lttng_directory_handle_get_dirfd(domain_handle); LTTNG_ASSERT(domain_dirfd >= 0); msg.u.create_trace_chunk.credentials.value.uid = - lttng_credentials_get_uid(&chunk_credentials); + lttng_credentials_get_uid(&chunk_credentials); msg.u.create_trace_chunk.credentials.value.gid = - lttng_credentials_get_gid(&chunk_credentials); + lttng_credentials_get_gid(&chunk_credentials); msg.u.create_trace_chunk.credentials.is_set = 1; } DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 - ", creation_timestamp = %s", - relayd_id, session_id, chunk_id, - creation_timestamp_str); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", creation_timestamp = %s", + relayd_id, + session_id, + chunk_id, + creation_timestamp_str); health_code_update(); ret = consumer_send_msg(socket, &msg); health_code_update(); @@ -1979,9 +1962,10 @@ error: * Called with the consumer socket lock held. */ int consumer_close_trace_chunk(struct consumer_socket *socket, - uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk, - char *closed_trace_chunk_path) + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *chunk, + char *closed_trace_chunk_path) { int ret; enum lttng_trace_chunk_status chunk_status; @@ -2002,16 +1986,14 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, lttng_dynamic_buffer_init(&path_reception_buffer); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET( - &msg.u.close_trace_chunk.relayd_id, relayd_id); + LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, relayd_id); } - chunk_status = lttng_trace_chunk_get_close_command( - chunk, &close_command); + chunk_status = lttng_trace_chunk_get_close_command(chunk, &close_command); switch (chunk_status) { case LTTNG_TRACE_CHUNK_STATUS_OK: LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command, - (uint32_t) close_command); + (uint32_t) close_command); break; case LTTNG_TRACE_CHUNK_STATUS_NONE: break; @@ -2030,8 +2012,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.chunk_id = chunk_id; - chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, - &close_timestamp); + chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp); /* * A trace chunk should be closed locally before being closed remotely. * Otherwise, the close timestamp would never be transmitted to the @@ -2041,13 +2022,14 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; if (msg.u.close_trace_chunk.close_command.is_set) { - close_command_name = lttng_trace_chunk_command_type_get_name( - close_command); + close_command_name = lttng_trace_chunk_command_type_get_name(close_command); } DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 - ", close command = \"%s\"", - relayd_id, session_id, chunk_id, close_command_name); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = \"%s\"", + relayd_id, + session_id, + chunk_id, + close_command_name); health_code_update(); ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg)); @@ -2061,20 +2043,20 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, goto error; } if (reply.path_length >= LTTNG_PATH_MAX) { - ERR("Invalid path returned by relay daemon: %" PRIu32 "bytes exceeds maximal allowed length of %d bytes", - reply.path_length, LTTNG_PATH_MAX); + ERR("Invalid path returned by relay daemon: %" PRIu32 + "bytes exceeds maximal allowed length of %d bytes", + reply.path_length, + LTTNG_PATH_MAX); ret = -LTTNG_ERR_INVALID_PROTOCOL; goto error; } - ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, - reply.path_length); + ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, reply.path_length); if (ret) { ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command"); ret = -LTTNG_ERR_NOMEM; goto error; } - ret = consumer_socket_recv(socket, path_reception_buffer.data, - path_reception_buffer.size); + ret = consumer_socket_recv(socket, path_reception_buffer.data, path_reception_buffer.size); if (ret < 0) { ERR("Communication error while receiving path of closed trace chunk"); ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; @@ -2090,8 +2072,9 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, * closed_trace_chunk_path is assumed to have a length >= * LTTNG_PATH_MAX */ - memcpy(closed_trace_chunk_path, path_reception_buffer.data, - path_reception_buffer.size); + memcpy(closed_trace_chunk_path, + path_reception_buffer.data, + path_reception_buffer.size); } error: lttng_dynamic_buffer_reset(&path_reception_buffer); @@ -2106,9 +2089,10 @@ error: * Returns 0 on success, or a negative value on error. */ int consumer_trace_chunk_exists(struct consumer_socket *socket, - uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk, - enum consumer_trace_chunk_exists_status *result) + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *chunk, + enum consumer_trace_chunk_exists_status *result) { int ret; enum lttng_trace_chunk_status chunk_status; @@ -2124,8 +2108,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, LTTNG_ASSERT(socket); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, - relayd_id); + LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, relayd_id); } chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); @@ -2141,8 +2124,10 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, msg.u.trace_chunk_exists.chunk_id = chunk_id; DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 - ", chunk_id = %" PRIu64, relayd_id, session_id, chunk_id); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64, + relayd_id, + session_id, + chunk_id); health_code_update(); ret = consumer_send_msg(socket, &msg); @@ -2164,8 +2149,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, ret = -1; goto error; } - DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", - consumer_reply_str); + DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", consumer_reply_str); ret = 0; error: health_code_update();