X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;fp=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=7c36d61be793d669fa6ef5eeb779c887d469c744;hp=4ba40a075fdae27516c53406f6cc1a1e9938f45d;hb=a0377dfefe40662ba7d68617bce6ff467114136c;hpb=cc3b9644f017a91d347d7a414387292e3175635e diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 4ba40a075..7c36d61be 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -7,7 +7,6 @@ */ #define _LGPL_SOURCE -#include #include #include #include @@ -41,8 +40,8 @@ char *setup_channel_trace_path(struct consumer_output *consumer, int ret; char *pathname; - assert(consumer); - assert(session_path); + LTTNG_ASSERT(consumer); + LTTNG_ASSERT(session_path); health_code_update(); @@ -99,9 +98,9 @@ int consumer_socket_send( int fd; ssize_t size; - assert(socket); - assert(socket->fd_ptr); - assert(msg); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(socket->fd_ptr); + LTTNG_ASSERT(msg); /* Consumer socket is invalid. Stopping. */ fd = *socket->fd_ptr; @@ -143,9 +142,9 @@ int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len) int fd; ssize_t size; - assert(socket); - assert(socket->fd_ptr); - assert(msg); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(socket->fd_ptr); + LTTNG_ASSERT(msg); /* Consumer socket is invalid. Stopping. */ fd = *socket->fd_ptr; @@ -186,7 +185,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock) int ret; struct lttcomm_consumer_status_msg reply; - assert(sock); + LTTNG_ASSERT(sock); ret = consumer_socket_recv(sock, &reply, sizeof(reply)); if (ret < 0) { @@ -219,9 +218,9 @@ int consumer_recv_status_channel(struct consumer_socket *sock, int ret; struct lttcomm_consumer_status_channel reply; - assert(sock); - assert(stream_count); - assert(key); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(stream_count); + LTTNG_ASSERT(key); ret = consumer_socket_recv(sock, &reply, sizeof(reply)); if (ret < 0) { @@ -253,8 +252,8 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, int ret; struct lttcomm_consumer_msg msg; - assert(consumer); - assert(sock); + LTTNG_ASSERT(consumer); + LTTNG_ASSERT(sock); DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr); @@ -287,7 +286,7 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) struct lttng_ht_iter iter; struct consumer_socket *socket; - assert(consumer); + LTTNG_ASSERT(consumer); /* Destroy any relayd connection */ if (consumer->type == CONSUMER_DST_NET) { @@ -319,7 +318,7 @@ int consumer_create_socket(struct consumer_data *data, int ret = 0; struct consumer_socket *socket; - assert(data); + LTTNG_ASSERT(data); if (output == NULL || data->cmd_sock < 0) { /* @@ -376,7 +375,7 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits, consumer_fd = uatomic_read(&the_ust_consumerd32_fd); break; default: - assert(0); + abort(); goto end; } @@ -424,7 +423,7 @@ struct consumer_socket *consumer_allocate_socket(int *fd) { struct consumer_socket *socket = NULL; - assert(fd); + LTTNG_ASSERT(fd); socket = zmalloc(sizeof(struct consumer_socket)); if (socket == NULL) { @@ -446,8 +445,8 @@ error: void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer) { - assert(sock); - assert(consumer); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(consumer); lttng_ht_add_unique_ulong(consumer->socks, &sock->node); } @@ -462,12 +461,12 @@ void consumer_del_socket(struct consumer_socket *sock, int ret; struct lttng_ht_iter iter; - assert(sock); - assert(consumer); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(consumer); iter.iter.node = &sock->node.node; ret = lttng_ht_del(consumer->socks, &iter); - assert(!ret); + LTTNG_ASSERT(!ret); } /* @@ -489,7 +488,7 @@ static void destroy_socket_rcu(struct rcu_head *head) */ void consumer_destroy_socket(struct consumer_socket *sock) { - assert(sock); + LTTNG_ASSERT(sock); /* * We DO NOT close the file descriptor here since it is global to the @@ -604,7 +603,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src) int ret; struct consumer_output *output; - assert(src); + LTTNG_ASSERT(src); output = consumer_create_output(src->type); if (output == NULL) { @@ -643,8 +642,8 @@ int consumer_copy_sockets(struct consumer_output *dst, struct lttng_ht_iter iter; struct consumer_socket *socket, *copy_sock; - assert(dst); - assert(src); + LTTNG_ASSERT(dst); + LTTNG_ASSERT(src); rcu_read_lock(); cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) { @@ -691,8 +690,8 @@ int consumer_set_network_uri(const struct ltt_session *session, struct lttng_uri *dst_uri = NULL; /* Code flow error safety net. */ - assert(output); - assert(uri); + LTTNG_ASSERT(output); + LTTNG_ASSERT(uri); switch (uri->stype) { case LTTNG_STREAM_CONTROL: @@ -839,10 +838,10 @@ int consumer_send_fds(struct consumer_socket *sock, const int *fds, { int ret; - assert(fds); - assert(sock); - assert(nb_fd > 0); - assert(pthread_mutex_trylock(sock->lock) == EBUSY); + LTTNG_ASSERT(fds); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(nb_fd > 0); + LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY); ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd); if (ret < 0) { @@ -866,9 +865,9 @@ int consumer_send_msg(struct consumer_socket *sock, { int ret; - assert(msg); - assert(sock); - assert(pthread_mutex_trylock(sock->lock) == EBUSY); + LTTNG_ASSERT(msg); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY); ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { @@ -891,8 +890,8 @@ int consumer_send_channel(struct consumer_socket *sock, { int ret; - assert(msg); - assert(sock); + LTTNG_ASSERT(msg); + LTTNG_ASSERT(sock); ret = consumer_send_msg(sock, msg); if (ret < 0) { @@ -936,7 +935,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, struct lttng_trace_chunk *trace_chunk, const struct lttng_credentials *buffer_credentials) { - assert(msg); + LTTNG_ASSERT(msg); /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); @@ -948,7 +947,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id); } msg->u.ask_channel.buffer_credentials.uid = @@ -1023,7 +1022,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int monitor_timer_interval, struct lttng_trace_chunk *trace_chunk) { - assert(msg); + LTTNG_ASSERT(msg); /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); @@ -1033,7 +1032,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id); } @@ -1068,7 +1067,7 @@ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t stream_key, int32_t cpu) { - assert(msg); + LTTNG_ASSERT(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); @@ -1082,7 +1081,7 @@ void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, uint64_t channel_key, uint64_t net_seq_idx) { - assert(msg); + LTTNG_ASSERT(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); @@ -1100,10 +1099,10 @@ int consumer_send_stream(struct consumer_socket *sock, { int ret; - assert(msg); - assert(dst); - assert(sock); - assert(fds); + LTTNG_ASSERT(msg); + LTTNG_ASSERT(dst); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(fds); ret = consumer_send_msg(sock, msg); if (ret < 0) { @@ -1138,9 +1137,9 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_consumer_msg msg; /* Code flow error. Safety net. */ - assert(rsock); - assert(consumer); - assert(consumer_sock); + LTTNG_ASSERT(rsock); + LTTNG_ASSERT(consumer); + LTTNG_ASSERT(consumer_sock); memset(&msg, 0, sizeof(msg)); /* Bail out if consumer is disabled */ @@ -1265,7 +1264,7 @@ int consumer_is_data_pending(uint64_t session_id, struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; - assert(consumer); + LTTNG_ASSERT(consumer); DBG3("Consumer data pending for id %" PRIu64, session_id); @@ -1321,7 +1320,7 @@ int consumer_flush_channel(struct consumer_socket *socket, uint64_t key) int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer flush channel key %" PRIu64, key); @@ -1353,7 +1352,7 @@ int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t ke int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer clear quiescent channel key %" PRIu64, key); @@ -1387,7 +1386,7 @@ int consumer_close_metadata(struct consumer_socket *socket, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer close metadata channel key %" PRIu64, metadata_key); @@ -1420,7 +1419,7 @@ int consumer_setup_metadata(struct consumer_socket *socket, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key); @@ -1455,7 +1454,7 @@ int consumer_push_metadata(struct consumer_socket *socket, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr); @@ -1508,8 +1507,8 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, enum lttng_error_code status = LTTNG_OK; struct lttcomm_consumer_msg msg; - assert(socket); - assert(output); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(output); DBG("Consumer snapshot channel key %" PRIu64, key); @@ -1570,7 +1569,7 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; - assert(consumer); + LTTNG_ASSERT(consumer); DBG3("Consumer discarded events id %" PRIu64, session_id); @@ -1627,7 +1626,7 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; - assert(consumer); + LTTNG_ASSERT(consumer); DBG3("Consumer lost packets id %" PRIu64, session_id); @@ -1687,7 +1686,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG("Consumer rotate channel key %" PRIu64, key); @@ -1730,7 +1729,7 @@ int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key) .u.open_channel_packets.key = key, }; - assert(socket); + LTTNG_ASSERT(socket); DBG("Consumer open channel packets: channel key = %" PRIu64, key); @@ -1753,7 +1752,7 @@ int consumer_clear_channel(struct consumer_socket *socket, uint64_t key) int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG("Consumer clear channel %" PRIu64, key); @@ -1784,7 +1783,7 @@ int consumer_init(struct consumer_socket *socket, .cmd_type = LTTNG_CONSUMER_INIT, }; - assert(socket); + LTTNG_ASSERT(socket); DBG("Sending consumer initialization command"); lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); @@ -1829,8 +1828,8 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, .u.create_trace_chunk.session_id = session_id, }; - assert(socket); - assert(chunk); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(chunk); if (relayd_id != -1ULL) { LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, @@ -1927,7 +1926,7 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, */ domain_dirfd = lttng_directory_handle_get_dirfd( domain_handle); - assert(domain_dirfd >= 0); + LTTNG_ASSERT(domain_dirfd >= 0); msg.u.create_trace_chunk.credentials.value.uid = lttng_credentials_get_uid(&chunk_credentials); @@ -1989,7 +1988,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, const char *close_command_name = "none"; struct lttng_dynamic_buffer path_reception_buffer; - assert(socket); + LTTNG_ASSERT(socket); lttng_dynamic_buffer_init(&path_reception_buffer); if (relayd_id != -1ULL) { @@ -2018,7 +2017,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, * (consumerd and relayd). They are used internally for * backward-compatibility purposes. */ - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.chunk_id = chunk_id; chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, @@ -2028,7 +2027,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, * Otherwise, the close timestamp would never be transmitted to the * peers. */ - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; if (msg.u.close_trace_chunk.close_command.is_set) { @@ -2110,7 +2109,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, uint64_t chunk_id; const char *consumer_reply_str; - assert(socket); + LTTNG_ASSERT(socket); if (relayd_id != -1ULL) { LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id,