X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.cpp;h=13c18325d75d363e5da76674734b4e3aedcd0c07;hp=95fc2c34875ce9fff37d3454a8c96e7de1e3a7fb;hb=HEAD;hpb=3c3390532736cfb5198f863d0d2b218e21fcf76d diff --git a/src/bin/lttng-sessiond/consumer.cpp b/src/bin/lttng-sessiond/consumer.cpp index 95fc2c348..13c18325d 100644 --- a/src/bin/lttng-sessiond/consumer.cpp +++ b/src/bin/lttng-sessiond/consumer.cpp @@ -7,25 +7,26 @@ */ #define _LGPL_SOURCE +#include "consumer.hpp" +#include "health-sessiond.hpp" +#include "lttng-sessiond.hpp" +#include "ust-app.hpp" +#include "utils.hpp" + +#include +#include +#include +#include +#include +#include + +#include #include #include #include #include #include #include -#include - -#include -#include -#include -#include -#include - -#include "consumer.h" -#include "health-sessiond.h" -#include "ust-app.h" -#include "utils.h" -#include "lttng-sessiond.h" /* * Return allocated full pathname of the session using the consumer trace path @@ -35,7 +36,8 @@ * returned. */ char *setup_channel_trace_path(struct consumer_output *consumer, - const char *session_path, size_t *consumer_path_offset) + const char *session_path, + size_t *consumer_path_offset) { int ret; char *pathname; @@ -49,27 +51,28 @@ char *setup_channel_trace_path(struct consumer_output *consumer, * Allocate the string ourself to make sure we never exceed * LTTNG_PATH_MAX. */ - pathname = (char *) zmalloc(LTTNG_PATH_MAX); + pathname = calloc(LTTNG_PATH_MAX); if (!pathname) { goto error; } /* Get correct path name destination */ - if (consumer->type == CONSUMER_DST_NET && - consumer->relay_major_version == 2 && - consumer->relay_minor_version < 11) { - ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s/%s/%s", - consumer->dst.net.base_dir, - consumer->chunk_path, consumer->domain_subdir, - session_path); + if (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 && + consumer->relay_minor_version < 11) { + ret = snprintf(pathname, + LTTNG_PATH_MAX, + "%s%s/%s/%s", + consumer->dst.net.base_dir, + consumer->chunk_path, + consumer->domain_subdir, + session_path); *consumer_path_offset = 0; } else { - ret = snprintf(pathname, LTTNG_PATH_MAX, "%s/%s", - consumer->domain_subdir, session_path); + ret = snprintf( + pathname, LTTNG_PATH_MAX, "%s/%s", consumer->domain_subdir, session_path); *consumer_path_offset = strlen(consumer->domain_subdir) + 1; } - DBG3("Consumer trace path relative to current trace chunk: \"%s\"", - pathname); + DBG3("Consumer trace path relative to current trace chunk: \"%s\"", pathname); if (ret < 0) { PERROR("Failed to format channel path"); goto error; @@ -81,7 +84,7 @@ char *setup_channel_trace_path(struct consumer_output *consumer, return pathname; error: free(pathname); - return NULL; + return nullptr; } /* @@ -92,8 +95,7 @@ error: * * Return 0 on success else a negative value on error. */ -int consumer_socket_send( - struct consumer_socket *socket, const void *msg, size_t len) +int consumer_socket_send(struct consumer_socket *socket, const void *msg, size_t len) { int fd; ssize_t size; @@ -213,7 +215,8 @@ end: * negative value is sent back and both parameters are untouched. */ int consumer_recv_status_channel(struct consumer_socket *sock, - uint64_t *key, unsigned int *stream_count) + uint64_t *key, + unsigned int *stream_count) { int ret; struct lttcomm_consumer_status_channel reply; @@ -246,8 +249,7 @@ end: * * On success return positive value. On error, negative value. */ -int consumer_send_destroy_relayd(struct consumer_socket *sock, - struct consumer_output *consumer) +int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer) { int ret; struct lttcomm_consumer_msg msg; @@ -290,19 +292,17 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) /* Destroy any relayd connection */ if (consumer->type == CONSUMER_DST_NET) { - rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { - int ret; + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + /* Send destroy relayd command. */ + const int ret = consumer_send_destroy_relayd(socket, consumer); - /* Send destroy relayd command */ - ret = consumer_send_destroy_relayd(socket, consumer); if (ret < 0) { DBG("Unable to send destroy relayd command to consumer"); /* Continue since we MUST delete everything at this point. */ } } - rcu_read_unlock(); } } @@ -312,15 +312,16 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) * * Return 0 on success, else negative value on error */ -int consumer_create_socket(struct consumer_data *data, - struct consumer_output *output) +int consumer_create_socket(struct consumer_data *data, struct consumer_output *output) { int ret = 0; struct consumer_socket *socket; LTTNG_ASSERT(data); - if (output == NULL || data->cmd_sock < 0) { + lttng::urcu::read_lock_guard read_lock; + + if (output == nullptr || data->cmd_sock < 0) { /* * Not an error. Possible there is simply not spawned consumer or it's * disabled for the tracing session asking the socket. @@ -328,27 +329,22 @@ int consumer_create_socket(struct consumer_data *data, goto error; } - rcu_read_lock(); socket = consumer_find_socket(data->cmd_sock, output); - rcu_read_unlock(); - if (socket == NULL) { + if (socket == nullptr) { socket = consumer_allocate_socket(&data->cmd_sock); - if (socket == NULL) { + if (socket == nullptr) { ret = -1; goto error; } socket->registered = 0; socket->lock = &data->lock; - rcu_read_lock(); consumer_add_socket(socket, output); - rcu_read_unlock(); } socket->type = data->type; - DBG3("Consumer socket created (fd: %d) and added to output", - data->cmd_sock); + DBG3("Consumer socket created (fd: %d) and added to output", data->cmd_sock); error: return ret; @@ -362,10 +358,12 @@ error: * object reference is not needed anymore. */ struct consumer_socket *consumer_find_socket_by_bitness(int bits, - const struct consumer_output *consumer) + const struct consumer_output *consumer) { int consumer_fd; - struct consumer_socket *socket = NULL; + struct consumer_socket *socket = nullptr; + + ASSERT_RCU_READ_LOCKED(); switch (bits) { case 64: @@ -381,8 +379,7 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits, socket = consumer_find_socket(consumer_fd, consumer); if (!socket) { - ERR("Consumer socket fd %d not found in consumer obj %p", - consumer_fd, consumer); + ERR("Consumer socket fd %d not found in consumer obj %p", consumer_fd, consumer); } end: @@ -394,23 +391,23 @@ end: * be acquired before calling this function and across use of the * returned consumer_socket. */ -struct consumer_socket *consumer_find_socket(int key, - const struct consumer_output *consumer) +struct consumer_socket *consumer_find_socket(int key, const struct consumer_output *consumer) { struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; - struct consumer_socket *socket = NULL; + struct consumer_socket *socket = nullptr; + + ASSERT_RCU_READ_LOCKED(); /* Negative keys are lookup failures */ - if (key < 0 || consumer == NULL) { - return NULL; + if (key < 0 || consumer == nullptr) { + return nullptr; } - lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key), - &iter); + lttng_ht_lookup(consumer->socks, (void *) ((unsigned long) key), &iter); node = lttng_ht_iter_get_node_ulong(&iter); - if (node != NULL) { - socket = caa_container_of(node, struct consumer_socket, node); + if (node != nullptr) { + socket = lttng::utils::container_of(node, &consumer_socket::node); } return socket; @@ -421,12 +418,12 @@ struct consumer_socket *consumer_find_socket(int key, */ struct consumer_socket *consumer_allocate_socket(int *fd) { - struct consumer_socket *socket = NULL; + struct consumer_socket *socket = nullptr; LTTNG_ASSERT(fd); - socket = (consumer_socket *) zmalloc(sizeof(struct consumer_socket)); - if (socket == NULL) { + socket = zmalloc(); + if (socket == nullptr) { PERROR("zmalloc consumer socket"); goto error; } @@ -442,11 +439,11 @@ error: * Add consumer socket to consumer output object. Read side lock must be * acquired before calling this function. */ -void consumer_add_socket(struct consumer_socket *sock, - struct consumer_output *consumer) +void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer) { LTTNG_ASSERT(sock); LTTNG_ASSERT(consumer); + ASSERT_RCU_READ_LOCKED(); lttng_ht_add_unique_ulong(consumer->socks, &sock->node); } @@ -455,14 +452,14 @@ void consumer_add_socket(struct consumer_socket *sock, * Delete consumer socket to consumer output object. Read side lock must be * acquired before calling this function. */ -void consumer_del_socket(struct consumer_socket *sock, - struct consumer_output *consumer) +void consumer_del_socket(struct consumer_socket *sock, struct consumer_output *consumer) { int ret; struct lttng_ht_iter iter; LTTNG_ASSERT(sock); LTTNG_ASSERT(consumer); + ASSERT_RCU_READ_LOCKED(); iter.iter.node = &sock->node.node; ret = lttng_ht_del(consumer->socks, &iter); @@ -475,16 +472,17 @@ void consumer_del_socket(struct consumer_socket *sock, static void destroy_socket_rcu(struct rcu_head *head) { struct lttng_ht_node_ulong *node = - caa_container_of(head, struct lttng_ht_node_ulong, head); - struct consumer_socket *socket = - caa_container_of(node, struct consumer_socket, node); + lttng::utils::container_of(head, <tng_ht_node_ulong::head); + struct consumer_socket *socket = lttng::utils::container_of(node, &consumer_socket::node); free(socket); } /* - * Destroy and free socket pointer in a call RCU. Read side lock must be - * acquired before calling this function. + * Destroy and free socket pointer in a call RCU. The call must either: + * - have acquired the read side lock before calling this function, or + * - guarantee the validity of the `struct consumer_socket` object for the + * duration of the call. */ void consumer_destroy_socket(struct consumer_socket *sock) { @@ -510,16 +508,16 @@ void consumer_destroy_socket(struct consumer_socket *sock) */ struct consumer_output *consumer_create_output(enum consumer_dst_type type) { - struct consumer_output *output = NULL; + struct consumer_output *output = nullptr; - output = (consumer_output *) zmalloc(sizeof(struct consumer_output)); - if (output == NULL) { + output = zmalloc(); + if (output == nullptr) { PERROR("zmalloc consumer_output"); goto error; } /* By default, consumer output is enabled */ - output->enabled = 1; + output->enabled = true; output->type = type; output->net_seq_index = (uint64_t) -1ULL; urcu_ref_init(&output->ref); @@ -544,12 +542,14 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) return; } - rcu_read_lock(); - cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { - consumer_del_socket(socket, obj); - consumer_destroy_socket(socket); + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { + consumer_del_socket(socket, obj); + consumer_destroy_socket(socket); + } } - rcu_read_unlock(); } /* @@ -557,8 +557,7 @@ void consumer_destroy_output_sockets(struct consumer_output *obj) */ static void consumer_release_output(struct urcu_ref *ref) { - struct consumer_output *obj = - caa_container_of(ref, struct consumer_output, ref); + struct consumer_output *obj = lttng::utils::container_of(ref, &consumer_output::ref); consumer_destroy_output_sockets(obj); @@ -600,13 +599,12 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src) LTTNG_ASSERT(src); output = consumer_create_output(src->type); - if (output == NULL) { + if (output == nullptr) { goto end; } output->enabled = src->enabled; output->net_seq_index = src->net_seq_index; - memcpy(output->domain_subdir, src->domain_subdir, - sizeof(output->domain_subdir)); + memcpy(output->domain_subdir, src->domain_subdir, sizeof(output->domain_subdir)); output->snapshot = src->snapshot; output->relay_major_version = src->relay_major_version; output->relay_minor_version = src->relay_minor_version; @@ -621,7 +619,7 @@ end: error_put: consumer_output_put(output); - return NULL; + return nullptr; } /* @@ -629,8 +627,7 @@ error_put: * * Return 0 on success or else a negative value. */ -int consumer_copy_sockets(struct consumer_output *dst, - struct consumer_output *src) +int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src) { int ret = 0; struct lttng_ht_iter iter; @@ -639,32 +636,33 @@ int consumer_copy_sockets(struct consumer_output *dst, LTTNG_ASSERT(dst); LTTNG_ASSERT(src); - rcu_read_lock(); - cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) { - /* Ignore socket that are already there. */ - copy_sock = consumer_find_socket(*socket->fd_ptr, dst); - if (copy_sock) { - continue; - } + { + lttng::urcu::read_lock_guard read_lock; - /* Create new socket object. */ - copy_sock = consumer_allocate_socket(socket->fd_ptr); - if (copy_sock == NULL) { - rcu_read_unlock(); - ret = -ENOMEM; - goto error; - } + cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { + /* Ignore socket that are already there. */ + copy_sock = consumer_find_socket(*socket->fd_ptr, dst); + if (copy_sock) { + continue; + } - copy_sock->registered = socket->registered; - /* - * This is valid because this lock is shared accross all consumer - * object being the global lock of the consumer data structure of the - * session daemon. - */ - copy_sock->lock = socket->lock; - consumer_add_socket(copy_sock, dst); + /* Create new socket object. */ + copy_sock = consumer_allocate_socket(socket->fd_ptr); + if (copy_sock == nullptr) { + ret = -ENOMEM; + goto error; + } + + copy_sock->registered = socket->registered; + /* + * This is valid because this lock is shared accross all consumer + * object being the global lock of the consumer data structure of the + * session daemon. + */ + copy_sock->lock = socket->lock; + consumer_add_socket(copy_sock, dst); + } } - rcu_read_unlock(); error: return ret; @@ -677,11 +675,11 @@ error: * error. */ int consumer_set_network_uri(const struct ltt_session *session, - struct consumer_output *output, - struct lttng_uri *uri) + struct consumer_output *output, + struct lttng_uri *uri) { int ret; - struct lttng_uri *dst_uri = NULL; + struct lttng_uri *dst_uri = nullptr; /* Code flow error safety net. */ LTTNG_ASSERT(output); @@ -695,8 +693,7 @@ int consumer_set_network_uri(const struct ltt_session *session, /* Assign default port. */ uri->port = DEFAULT_NETWORK_CONTROL_PORT; } else { - if (output->dst.net.data_isset && uri->port == - output->dst.net.data.port) { + if (output->dst.net.data_isset && uri->port == output->dst.net.data.port) { ret = -LTTNG_ERR_INVALID; goto error; } @@ -710,8 +707,8 @@ int consumer_set_network_uri(const struct ltt_session *session, /* Assign default port. */ uri->port = DEFAULT_NETWORK_DATA_PORT; } else { - if (output->dst.net.control_isset && uri->port == - output->dst.net.control.port) { + if (output->dst.net.control_isset && + uri->port == output->dst.net.control.port) { ret = -LTTNG_ERR_INVALID; goto error; } @@ -768,14 +765,17 @@ int consumer_set_network_uri(const struct ltt_session *session, goto error; } ret = snprintf(output->dst.net.base_dir, - sizeof(output->dst.net.base_dir), - "/%s/%s/", session->hostname, uri->subdir); + sizeof(output->dst.net.base_dir), + "/%s/%s/", + session->hostname, + uri->subdir); } else { if (session->has_auto_generated_name) { ret = snprintf(output->dst.net.base_dir, - sizeof(output->dst.net.base_dir), - "/%s/%s/", session->hostname, - session->name); + sizeof(output->dst.net.base_dir), + "/%s/%s/", + session->hostname, + session->name); } else { char session_creation_datetime[16]; size_t strftime_ret; @@ -787,18 +787,20 @@ int consumer_set_network_uri(const struct ltt_session *session, goto error; } strftime_ret = strftime(session_creation_datetime, - sizeof(session_creation_datetime), - "%Y%m%d-%H%M%S", timeinfo); + sizeof(session_creation_datetime), + "%Y%m%d-%H%M%S", + timeinfo); if (strftime_ret == 0) { ERR("Failed to format session creation timestamp while setting network URI"); ret = -LTTNG_ERR_FATAL; goto error; } ret = snprintf(output->dst.net.base_dir, - sizeof(output->dst.net.base_dir), - "/%s/%s-%s/", session->hostname, - session->name, - session_creation_datetime); + sizeof(output->dst.net.base_dir), + "/%s/%s-%s/", + session->hostname, + session->name, + session_creation_datetime); } } if (ret >= sizeof(output->dst.net.base_dir)) { @@ -811,8 +813,7 @@ int consumer_set_network_uri(const struct ltt_session *session, goto error; } - DBG3("Consumer set network uri base_dir path %s", - output->dst.net.base_dir); + DBG3("Consumer set network uri base_dir path %s", output->dst.net.base_dir); end: return 0; @@ -827,8 +828,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_fds(struct consumer_socket *sock, const int *fds, - size_t nb_fd) +int consumer_send_fds(struct consumer_socket *sock, const int *fds, size_t nb_fd) { int ret; @@ -854,8 +854,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_msg(struct consumer_socket *sock, - const struct lttcomm_consumer_msg *msg) +int consumer_send_msg(struct consumer_socket *sock, const struct lttcomm_consumer_msg *msg) { int ret; @@ -879,8 +878,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_channel(struct consumer_socket *sock, - struct lttcomm_consumer_msg *msg) +int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg) { int ret; @@ -901,33 +899,33 @@ error: * information. */ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, - uint64_t subbuf_size, - uint64_t num_subbuf, - int overwrite, - unsigned int switch_timer_interval, - unsigned int read_timer_interval, - unsigned int live_timer_interval, - bool is_in_live_session, - unsigned int monitor_timer_interval, - int output, - int type, - uint64_t session_id, - const char *pathname, - const char *name, - uint64_t relayd_id, - uint64_t key, - unsigned char *uuid, - uint32_t chan_id, - uint64_t tracefile_size, - uint64_t tracefile_count, - uint64_t session_id_per_pid, - unsigned int monitor, - uint32_t ust_app_uid, - int64_t blocking_timeout, - const char *root_shm_path, - const char *shm_path, - struct lttng_trace_chunk *trace_chunk, - const struct lttng_credentials *buffer_credentials) + uint64_t subbuf_size, + uint64_t num_subbuf, + int overwrite, + unsigned int switch_timer_interval, + unsigned int read_timer_interval, + unsigned int live_timer_interval, + bool is_in_live_session, + unsigned int monitor_timer_interval, + int output, + int type, + uint64_t session_id, + const char *pathname, + const char *name, + uint64_t relayd_id, + uint64_t key, + const lttng_uuid& uuid, + uint32_t chan_id, + uint64_t tracefile_size, + uint64_t tracefile_count, + uint64_t session_id_per_pid, + unsigned int monitor, + uint32_t ust_app_uid, + int64_t blocking_timeout, + const char *root_shm_path, + const char *shm_path, + struct lttng_trace_chunk *trace_chunk, + const struct lttng_credentials *buffer_credentials) { LTTNG_ASSERT(msg); @@ -944,14 +942,12 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id); } - msg->u.ask_channel.buffer_credentials.uid = - lttng_credentials_get_uid(buffer_credentials); - msg->u.ask_channel.buffer_credentials.gid = - lttng_credentials_get_gid(buffer_credentials); + msg->u.ask_channel.buffer_credentials.uid = lttng_credentials_get_uid(buffer_credentials); + msg->u.ask_channel.buffer_credentials.gid = lttng_credentials_get_gid(buffer_credentials); msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; msg->u.ask_channel.subbuf_size = subbuf_size; - msg->u.ask_channel.num_subbuf = num_subbuf ; + msg->u.ask_channel.num_subbuf = num_subbuf; msg->u.ask_channel.overwrite = overwrite; msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; @@ -971,25 +967,25 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.ust_app_uid = ust_app_uid; msg->u.ask_channel.blocking_timeout = blocking_timeout; - memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); + std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid); if (pathname) { - strncpy(msg->u.ask_channel.pathname, pathname, - sizeof(msg->u.ask_channel.pathname)); - msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname)-1] = '\0'; + strncpy(msg->u.ask_channel.pathname, pathname, sizeof(msg->u.ask_channel.pathname)); + msg->u.ask_channel.pathname[sizeof(msg->u.ask_channel.pathname) - 1] = '\0'; } strncpy(msg->u.ask_channel.name, name, sizeof(msg->u.ask_channel.name)); msg->u.ask_channel.name[sizeof(msg->u.ask_channel.name) - 1] = '\0'; if (root_shm_path) { - strncpy(msg->u.ask_channel.root_shm_path, root_shm_path, + strncpy(msg->u.ask_channel.root_shm_path, + root_shm_path, sizeof(msg->u.ask_channel.root_shm_path)); - msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] = '\0'; + msg->u.ask_channel.root_shm_path[sizeof(msg->u.ask_channel.root_shm_path) - 1] = + '\0'; } if (shm_path) { - strncpy(msg->u.ask_channel.shm_path, shm_path, - sizeof(msg->u.ask_channel.shm_path)); + strncpy(msg->u.ask_channel.shm_path, shm_path, sizeof(msg->u.ask_channel.shm_path)); msg->u.ask_channel.shm_path[sizeof(msg->u.ask_channel.shm_path) - 1] = '\0'; } } @@ -998,23 +994,21 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, * Init channel communication message structure. */ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, - uint64_t channel_key, - uint64_t session_id, - const char *pathname, - uid_t uid, - gid_t gid, - uint64_t relayd_id, - const char *name, - unsigned int nb_init_streams, - enum lttng_event_output output, - int type, - uint64_t tracefile_size, - uint64_t tracefile_count, - unsigned int monitor, - unsigned int live_timer_interval, - bool is_in_live_session, - unsigned int monitor_timer_interval, - struct lttng_trace_chunk *trace_chunk) + uint64_t channel_key, + uint64_t session_id, + const char *pathname, + uint64_t relayd_id, + const char *name, + unsigned int nb_init_streams, + enum lttng_event_output output, + int type, + uint64_t tracefile_size, + uint64_t tracefile_count, + unsigned int monitor, + unsigned int live_timer_interval, + bool is_in_live_session, + unsigned int monitor_timer_interval, + struct lttng_trace_chunk *trace_chunk) { LTTNG_ASSERT(msg); @@ -1045,8 +1039,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.is_live = is_in_live_session; msg->u.channel.monitor_timer_interval = monitor_timer_interval; - strncpy(msg->u.channel.pathname, pathname, - sizeof(msg->u.channel.pathname)); + strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); msg->u.channel.pathname[sizeof(msg->u.channel.pathname) - 1] = '\0'; strncpy(msg->u.channel.name, name, sizeof(msg->u.channel.name)); @@ -1057,9 +1050,9 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, * Init stream communication message structure. */ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, - uint64_t channel_key, - uint64_t stream_key, - int32_t cpu) + uint64_t channel_key, + uint64_t stream_key, + int32_t cpu) { LTTNG_ASSERT(msg); @@ -1072,8 +1065,9 @@ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, } void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, - uint64_t channel_key, uint64_t net_seq_idx) + enum lttng_consumer_command cmd, + uint64_t channel_key, + uint64_t net_seq_idx) { LTTNG_ASSERT(msg); @@ -1088,8 +1082,10 @@ void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, * Send stream communication structure to the consumer. */ int consumer_send_stream(struct consumer_socket *sock, - struct consumer_output *dst, struct lttcomm_consumer_msg *msg, - const int *fds, size_t nb_fd) + struct consumer_output *dst, + struct lttcomm_consumer_msg *msg, + const int *fds, + size_t nb_fd) { int ret; @@ -1120,12 +1116,17 @@ error: * On success return positive value. On error, negative value. */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, - struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, - enum lttng_stream_type type, uint64_t session_id, - const char *session_name, const char *hostname, - const char *base_path, int session_live_timer, - const uint64_t *current_chunk_id, time_t session_creation_time, - bool session_name_contains_creation_time) + struct lttcomm_relayd_sock *rsock, + struct consumer_output *consumer, + enum lttng_stream_type type, + uint64_t session_id, + const char *session_name, + const char *hostname, + const char *base_path, + int session_live_timer, + const uint64_t *current_chunk_id, + time_t session_creation_time, + bool session_name_contains_creation_time) { int ret; int fd; @@ -1147,21 +1148,26 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, char output_path[LTTNG_PATH_MAX] = {}; uint64_t relayd_session_id; - ret = relayd_create_session(rsock, &relayd_session_id, - session_name, hostname, base_path, - session_live_timer, consumer->snapshot, - session_id, the_sessiond_uuid, current_chunk_id, - session_creation_time, - session_name_contains_creation_time, - output_path); + ret = relayd_create_session(rsock, + &relayd_session_id, + session_name, + hostname, + base_path, + session_live_timer, + consumer->snapshot, + session_id, + the_sessiond_uuid, + current_chunk_id, + session_creation_time, + session_name_contains_creation_time, + output_path); if (ret < 0) { /* Close the control socket. */ (void) relayd_close(rsock); goto error; } msg.u.relayd_sock.relayd_session_id = relayd_session_id; - DBG("Created session on relay, output path reply: %s", - output_path); + DBG("Created session on relay, output path reply: %s", output_path); } msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; @@ -1173,7 +1179,9 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, msg.u.relayd_sock.net_index = consumer->net_seq_index; msg.u.relayd_sock.type = type; msg.u.relayd_sock.session_id = session_id; - memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock)); + msg.u.relayd_sock.major = rsock->major; + msg.u.relayd_sock.minor = rsock->minor; + msg.u.relayd_sock.relayd_socket_protocol = rsock->sock.proto; DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd_ptr); ret = consumer_send_msg(consumer_sock, &msg); @@ -1194,9 +1202,8 @@ error: return ret; } -static -int consumer_send_pipe(struct consumer_socket *consumer_sock, - enum lttng_consumer_command cmd, int pipe) +static int +consumer_send_pipe(struct consumer_socket *consumer_sock, enum lttng_consumer_command cmd, int pipe) { int ret; struct lttcomm_consumer_msg msg; @@ -1209,8 +1216,7 @@ int consumer_send_pipe(struct consumer_socket *consumer_sock, command_name = "SET_CHANNEL_MONITOR_PIPE"; break; default: - ERR("Unexpected command received in %s (cmd = %d)", __func__, - (int) cmd); + ERR("Unexpected command received in %s (cmd = %d)", __func__, (int) cmd); abort(); } @@ -1226,9 +1232,7 @@ int consumer_send_pipe(struct consumer_socket *consumer_sock, goto error; } - DBG3("Sending %s pipe %d to consumer on socket %d", - pipe_name, - pipe, *consumer_sock->fd_ptr); + DBG3("Sending %s pipe %d to consumer on socket %d", pipe_name, pipe, *consumer_sock->fd_ptr); ret = consumer_send_fds(consumer_sock, &pipe, 1); if (ret < 0) { goto error; @@ -1240,22 +1244,19 @@ error: return ret; } -int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, - int pipe) +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, int pipe) { - return consumer_send_pipe(consumer_sock, - LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe); + return consumer_send_pipe(consumer_sock, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe); } /* * Ask the consumer if the data is pending for the specific session id. * Returns 1 if data is pending, 0 otherwise, or < 0 on error. */ -int consumer_is_data_pending(uint64_t session_id, - struct consumer_output *consumer) +int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer) { int ret; - int32_t ret_code = 0; /* Default is that the data is NOT pending */ + int32_t ret_code = 0; /* Default is that the data is NOT pending */ struct consumer_socket *socket; struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; @@ -1268,41 +1269,42 @@ int consumer_is_data_pending(uint64_t session_id, msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; msg.u.data_pending.session_id = session_id; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto error_unlock; - } + { + /* Send command for each consumer. */ + lttng::urcu::read_lock_guard read_lock; - /* - * No need for a recv reply status because the answer to the command is - * the reply status message. - */ + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } + + /* + * No need for a recv reply status because the answer to the command is + * the reply status message. + */ + ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } - ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); - if (ret < 0) { pthread_mutex_unlock(socket->lock); - goto error_unlock; - } - pthread_mutex_unlock(socket->lock); - if (ret_code == 1) { - break; + if (ret_code == 1) { + break; + } } } - rcu_read_unlock(); DBG("Consumer data is %s pending for session id %" PRIu64, - ret_code == 1 ? "" : "NOT", session_id); + ret_code == 1 ? "" : "NOT", + session_id); return ret_code; error_unlock: - rcu_read_unlock(); return -1; } @@ -1376,8 +1378,7 @@ end: * * Return 0 on success else a negative value. */ -int consumer_close_metadata(struct consumer_socket *socket, - uint64_t metadata_key) +int consumer_close_metadata(struct consumer_socket *socket, uint64_t metadata_key) { int ret; struct lttcomm_consumer_msg msg; @@ -1409,8 +1410,7 @@ end: * * Return 0 on success else a negative value. */ -int consumer_setup_metadata(struct consumer_socket *socket, - uint64_t metadata_key) +int consumer_setup_metadata(struct consumer_socket *socket, uint64_t metadata_key) { int ret; struct lttcomm_consumer_msg msg; @@ -1444,13 +1444,17 @@ end: * Return 0 on success else a negative value. */ int consumer_push_metadata(struct consumer_socket *socket, - uint64_t metadata_key, char *metadata_str, size_t len, - size_t target_offset, uint64_t version) + uint64_t metadata_key, + char *metadata_str, + size_t len, + size_t target_offset, + uint64_t version) { int ret; struct lttcomm_consumer_msg msg; LTTNG_ASSERT(socket); + ASSERT_RCU_READ_LOCKED(); DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr); @@ -1469,8 +1473,7 @@ int consumer_push_metadata(struct consumer_socket *socket, goto end; } - DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, - len); + DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd_ptr, len); ret = consumer_socket_send(socket, metadata_str, len); if (ret < 0) { @@ -1495,9 +1498,11 @@ end: * Returns LTTNG_OK on success or else an LTTng error code. */ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, - uint64_t key, const struct consumer_output *output, int metadata, - uid_t uid, gid_t gid, const char *channel_path, int wait, - uint64_t nb_packets_per_stream) + uint64_t key, + const struct consumer_output *output, + int metadata, + const char *channel_path, + uint64_t nb_packets_per_stream) { int ret; enum lttng_error_code status = LTTNG_OK; @@ -1515,20 +1520,19 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, msg.u.snapshot_channel.metadata = metadata; if (output->type == CONSUMER_DST_NET) { - msg.u.snapshot_channel.relayd_id = - output->net_seq_index; + msg.u.snapshot_channel.relayd_id = output->net_seq_index; msg.u.snapshot_channel.use_relayd = 1; } else { msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL; } ret = lttng_strncpy(msg.u.snapshot_channel.pathname, - channel_path, - sizeof(msg.u.snapshot_channel.pathname)); + channel_path, + sizeof(msg.u.snapshot_channel.pathname)); if (ret < 0) { ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"", - sizeof(msg.u.snapshot_channel.pathname), - strlen(channel_path), - channel_path); + sizeof(msg.u.snapshot_channel.pathname), + strlen(channel_path), + channel_path); status = LTTNG_ERR_SNAPSHOT_FAIL; goto error; } @@ -1557,8 +1561,10 @@ error: /* * Ask the consumer the number of discarded events for a channel. */ -int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, - struct consumer_output *consumer, uint64_t *discarded) +int consumer_get_discarded_events(uint64_t session_id, + uint64_t channel_key, + struct consumer_output *consumer, + uint64_t *discarded) { int ret; struct consumer_socket *socket; @@ -1576,46 +1582,51 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, *discarded = 0; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { - uint64_t consumer_discarded = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + /* Send command for each consumer. */ + { + lttng::urcu::read_lock_guard read_lock; + + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + uint64_t consumer_discarded = 0; + + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv( + socket, &consumer_discarded, sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_discarded, - sizeof(consumer_discarded)); - if (ret < 0) { - ERR("get discarded events"); pthread_mutex_unlock(socket->lock); - goto end; + *discarded += consumer_discarded; } - pthread_mutex_unlock(socket->lock); - *discarded += consumer_discarded; } + ret = 0; - DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, - *discarded, session_id); + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, *discarded, session_id); end: - rcu_read_unlock(); return ret; } /* * Ask the consumer the number of lost packets for a channel. */ -int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, - struct consumer_output *consumer, uint64_t *lost) +int consumer_get_lost_packets(uint64_t session_id, + uint64_t channel_key, + struct consumer_output *consumer, + uint64_t *lost) { int ret; struct consumer_socket *socket; @@ -1633,38 +1644,38 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, *lost = 0; - /* Send command for each consumer */ - rcu_read_lock(); - cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, - node.node) { - uint64_t consumer_lost = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + /* Send command for each consumer. */ + { + lttng::urcu::read_lock_guard read_lock; - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_lost, - sizeof(consumer_lost)); - if (ret < 0) { - ERR("get lost packets"); + cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } pthread_mutex_unlock(socket->lock); - goto end; + *lost += consumer_lost; } - pthread_mutex_unlock(socket->lock); - *lost += consumer_lost; } + ret = 0; - DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, - *lost, session_id); + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, *lost, session_id); end: - rcu_read_unlock(); return ret; } @@ -1675,9 +1686,10 @@ end: * when the rotation started. On the relay, this allows to keep track in which * chunk each stream is currently writing to (for the rotate_pending operation). */ -int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, - uid_t uid, gid_t gid, struct consumer_output *output, - bool is_metadata_channel) +int consumer_rotate_channel(struct consumer_socket *socket, + uint64_t key, + struct consumer_output *output, + bool is_metadata_channel) { int ret; struct lttcomm_consumer_msg msg; @@ -1722,6 +1734,7 @@ int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key) int ret; lttcomm_consumer_msg msg = { .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS, + .u = {}, }; msg.u.open_channel_packets.key = key; @@ -1771,18 +1784,18 @@ error_socket: return ret; } -int consumer_init(struct consumer_socket *socket, - const lttng_uuid sessiond_uuid) +int consumer_init(struct consumer_socket *socket, const lttng_uuid& sessiond_uuid) { int ret; struct lttcomm_consumer_msg msg = { .cmd_type = LTTNG_CONSUMER_INIT, + .u = {}, }; LTTNG_ASSERT(socket); DBG("Sending consumer initialization command"); - lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); + std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg.u.init.sessiond_uuid); health_code_update(); ret = consumer_send_msg(socket, &msg); @@ -1801,15 +1814,16 @@ error: * Called with the consumer socket lock held. */ int consumer_create_trace_chunk(struct consumer_socket *socket, - uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk, - const char *domain_subdir) + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *chunk, + const char *domain_subdir) { int ret; enum lttng_trace_chunk_status chunk_status; struct lttng_credentials chunk_credentials; - const struct lttng_directory_handle *chunk_directory_handle = NULL; - struct lttng_directory_handle *domain_handle = NULL; + const struct lttng_directory_handle *chunk_directory_handle = nullptr; + struct lttng_directory_handle *domain_handle = nullptr; int domain_dirfd; const char *chunk_name; bool chunk_name_overridden; @@ -1821,6 +1835,7 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, enum lttng_trace_chunk_status tc_status; struct lttcomm_consumer_msg msg = { .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK, + .u = {}, }; msg.u.create_trace_chunk.session_id = session_id; @@ -1828,44 +1843,38 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, LTTNG_ASSERT(chunk); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, - relayd_id); + LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, relayd_id); } - chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, - &chunk_name_overridden); + chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, &chunk_name_overridden); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK && - chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) { + chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) { ERR("Failed to get name of trace chunk"); ret = -LTTNG_ERR_FATAL; goto error; } if (chunk_name_overridden) { ret = lttng_strncpy(msg.u.create_trace_chunk.override_name, - chunk_name, - sizeof(msg.u.create_trace_chunk.override_name)); + chunk_name, + sizeof(msg.u.create_trace_chunk.override_name)); if (ret) { ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol", - chunk_name); + chunk_name); ret = -LTTNG_ERR_FATAL; goto error; } } - chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, - &creation_timestamp); + chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -LTTNG_ERR_FATAL; goto error; } - msg.u.create_trace_chunk.creation_timestamp = - (uint64_t) creation_timestamp; + msg.u.create_trace_chunk.creation_timestamp = (uint64_t) creation_timestamp; /* Only used for logging purposes. */ - ret = time_to_iso8601_str(creation_timestamp, - creation_timestamp_buffer, - sizeof(creation_timestamp_buffer)); - creation_timestamp_str = !ret ? creation_timestamp_buffer : - "(formatting error)"; + ret = time_to_iso8601_str( + creation_timestamp, creation_timestamp_buffer, sizeof(creation_timestamp_buffer)); + creation_timestamp_str = !ret ? creation_timestamp_buffer : "(formatting error)"; chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { @@ -1881,13 +1890,12 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, if (chunk_has_local_output) { chunk_status = lttng_trace_chunk_borrow_chunk_directory_handle( - chunk, &chunk_directory_handle); + chunk, &chunk_directory_handle); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -LTTNG_ERR_FATAL; goto error; } - chunk_status = lttng_trace_chunk_get_credentials( - chunk, &chunk_credentials); + chunk_status = lttng_trace_chunk_get_credentials(chunk, &chunk_credentials); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { /* * Not associating credentials to a sessiond chunk is a @@ -1896,17 +1904,15 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, ret = -LTTNG_ERR_FATAL; goto error; } - tc_status = lttng_trace_chunk_create_subdirectory( - chunk, domain_subdir); + tc_status = lttng_trace_chunk_create_subdirectory(chunk, domain_subdir); if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) { PERROR("Failed to create chunk domain output directory \"%s\"", - domain_subdir); + domain_subdir); ret = -LTTNG_ERR_FATAL; goto error; } - domain_handle = lttng_directory_handle_create_from_handle( - domain_subdir, - chunk_directory_handle); + domain_handle = lttng_directory_handle_create_from_handle(domain_subdir, + chunk_directory_handle); if (!domain_handle) { ret = -LTTNG_ERR_FATAL; goto error; @@ -1920,22 +1926,22 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, * The ownership of the chunk directory handle's is maintained * by the trace chunk. */ - domain_dirfd = lttng_directory_handle_get_dirfd( - domain_handle); + domain_dirfd = lttng_directory_handle_get_dirfd(domain_handle); LTTNG_ASSERT(domain_dirfd >= 0); msg.u.create_trace_chunk.credentials.value.uid = - lttng_credentials_get_uid(&chunk_credentials); + lttng_credentials_get_uid(&chunk_credentials); msg.u.create_trace_chunk.credentials.value.gid = - lttng_credentials_get_gid(&chunk_credentials); + lttng_credentials_get_gid(&chunk_credentials); msg.u.create_trace_chunk.credentials.is_set = 1; } DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 - ", creation_timestamp = %s", - relayd_id, session_id, chunk_id, - creation_timestamp_str); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", creation_timestamp = %s", + relayd_id, + session_id, + chunk_id, + creation_timestamp_str); health_code_update(); ret = consumer_send_msg(socket, &msg); health_code_update(); @@ -1967,14 +1973,16 @@ error: * Called with the consumer socket lock held. */ int consumer_close_trace_chunk(struct consumer_socket *socket, - uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk, - char *closed_trace_chunk_path) + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *chunk, + char *closed_trace_chunk_path) { int ret; enum lttng_trace_chunk_status chunk_status; lttcomm_consumer_msg msg = { .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, + .u = {}, }; msg.u.close_trace_chunk.session_id = session_id; @@ -1989,16 +1997,14 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, lttng_dynamic_buffer_init(&path_reception_buffer); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET( - &msg.u.close_trace_chunk.relayd_id, relayd_id); + LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.relayd_id, relayd_id); } - chunk_status = lttng_trace_chunk_get_close_command( - chunk, &close_command); + chunk_status = lttng_trace_chunk_get_close_command(chunk, &close_command); switch (chunk_status) { case LTTNG_TRACE_CHUNK_STATUS_OK: LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command, - (uint32_t) close_command); + (uint32_t) close_command); break; case LTTNG_TRACE_CHUNK_STATUS_NONE: break; @@ -2017,8 +2023,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.chunk_id = chunk_id; - chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, - &close_timestamp); + chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp); /* * A trace chunk should be closed locally before being closed remotely. * Otherwise, the close timestamp would never be transmitted to the @@ -2028,13 +2033,14 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; if (msg.u.close_trace_chunk.close_command.is_set) { - close_command_name = lttng_trace_chunk_command_type_get_name( - close_command); + close_command_name = lttng_trace_chunk_command_type_get_name(close_command); } DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 - ", close command = \"%s\"", - relayd_id, session_id, chunk_id, close_command_name); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 ", close command = \"%s\"", + relayd_id, + session_id, + chunk_id, + close_command_name); health_code_update(); ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg)); @@ -2048,20 +2054,20 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, goto error; } if (reply.path_length >= LTTNG_PATH_MAX) { - ERR("Invalid path returned by relay daemon: %" PRIu32 "bytes exceeds maximal allowed length of %d bytes", - reply.path_length, LTTNG_PATH_MAX); + ERR("Invalid path returned by relay daemon: %" PRIu32 + "bytes exceeds maximal allowed length of %d bytes", + reply.path_length, + LTTNG_PATH_MAX); ret = -LTTNG_ERR_INVALID_PROTOCOL; goto error; } - ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, - reply.path_length); + ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, reply.path_length); if (ret) { ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command"); ret = -LTTNG_ERR_NOMEM; goto error; } - ret = consumer_socket_recv(socket, path_reception_buffer.data, - path_reception_buffer.size); + ret = consumer_socket_recv(socket, path_reception_buffer.data, path_reception_buffer.size); if (ret < 0) { ERR("Communication error while receiving path of closed trace chunk"); ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; @@ -2077,8 +2083,9 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, * closed_trace_chunk_path is assumed to have a length >= * LTTNG_PATH_MAX */ - memcpy(closed_trace_chunk_path, path_reception_buffer.data, - path_reception_buffer.size); + memcpy(closed_trace_chunk_path, + path_reception_buffer.data, + path_reception_buffer.size); } error: lttng_dynamic_buffer_reset(&path_reception_buffer); @@ -2093,14 +2100,16 @@ error: * Returns 0 on success, or a negative value on error. */ int consumer_trace_chunk_exists(struct consumer_socket *socket, - uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk, - enum consumer_trace_chunk_exists_status *result) + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *chunk, + enum consumer_trace_chunk_exists_status *result) { int ret; enum lttng_trace_chunk_status chunk_status; lttcomm_consumer_msg msg = { .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, + .u = {}, }; msg.u.trace_chunk_exists.session_id = session_id; @@ -2110,8 +2119,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, LTTNG_ASSERT(socket); if (relayd_id != -1ULL) { - LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, - relayd_id); + LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, relayd_id); } chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); @@ -2127,8 +2135,10 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, msg.u.trace_chunk_exists.chunk_id = chunk_id; DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64 - ", session_id = %" PRIu64 - ", chunk_id = %" PRIu64, relayd_id, session_id, chunk_id); + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64, + relayd_id, + session_id, + chunk_id); health_code_update(); ret = consumer_send_msg(socket, &msg); @@ -2150,8 +2160,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, ret = -1; goto error; } - DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", - consumer_reply_str); + DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", consumer_reply_str); ret = 0; error: health_code_update();