X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=7c36d61be793d669fa6ef5eeb779c887d469c744;hb=a0377dfefe40662ba7d68617bce6ff467114136c;hp=2e31279247ddf84335bc1a2391bf47a29732f612;hpb=eacb7b6f2773556e31efb5e5d53d888c004b8f2f;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 2e3127924..7c36d61be 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1,23 +1,12 @@ /* - * Copyright (C) 2012 - David Goulet - * 2018 - Jérémie Galarneau + * Copyright (C) 2012 David Goulet + * Copyright (C) 2018 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE -#include #include #include #include @@ -51,8 +40,8 @@ char *setup_channel_trace_path(struct consumer_output *consumer, int ret; char *pathname; - assert(consumer); - assert(session_path); + LTTNG_ASSERT(consumer); + LTTNG_ASSERT(session_path); health_code_update(); @@ -103,14 +92,15 @@ error: * * Return 0 on success else a negative value on error. */ -int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len) +int consumer_socket_send( + struct consumer_socket *socket, const void *msg, size_t len) { int fd; ssize_t size; - assert(socket); - assert(socket->fd_ptr); - assert(msg); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(socket->fd_ptr); + LTTNG_ASSERT(msg); /* Consumer socket is invalid. Stopping. */ fd = *socket->fd_ptr; @@ -152,9 +142,9 @@ int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len) int fd; ssize_t size; - assert(socket); - assert(socket->fd_ptr); - assert(msg); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(socket->fd_ptr); + LTTNG_ASSERT(msg); /* Consumer socket is invalid. Stopping. */ fd = *socket->fd_ptr; @@ -195,7 +185,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock) int ret; struct lttcomm_consumer_status_msg reply; - assert(sock); + LTTNG_ASSERT(sock); ret = consumer_socket_recv(sock, &reply, sizeof(reply)); if (ret < 0) { @@ -228,9 +218,9 @@ int consumer_recv_status_channel(struct consumer_socket *sock, int ret; struct lttcomm_consumer_status_channel reply; - assert(sock); - assert(stream_count); - assert(key); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(stream_count); + LTTNG_ASSERT(key); ret = consumer_socket_recv(sock, &reply, sizeof(reply)); if (ret < 0) { @@ -262,8 +252,8 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock, int ret; struct lttcomm_consumer_msg msg; - assert(consumer); - assert(sock); + LTTNG_ASSERT(consumer); + LTTNG_ASSERT(sock); DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr); @@ -296,7 +286,7 @@ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) struct lttng_ht_iter iter; struct consumer_socket *socket; - assert(consumer); + LTTNG_ASSERT(consumer); /* Destroy any relayd connection */ if (consumer->type == CONSUMER_DST_NET) { @@ -328,7 +318,7 @@ int consumer_create_socket(struct consumer_data *data, int ret = 0; struct consumer_socket *socket; - assert(data); + LTTNG_ASSERT(data); if (output == NULL || data->cmd_sock < 0) { /* @@ -379,13 +369,13 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits, switch (bits) { case 64: - consumer_fd = uatomic_read(&ust_consumerd64_fd); + consumer_fd = uatomic_read(&the_ust_consumerd64_fd); break; case 32: - consumer_fd = uatomic_read(&ust_consumerd32_fd); + consumer_fd = uatomic_read(&the_ust_consumerd32_fd); break; default: - assert(0); + abort(); goto end; } @@ -433,7 +423,7 @@ struct consumer_socket *consumer_allocate_socket(int *fd) { struct consumer_socket *socket = NULL; - assert(fd); + LTTNG_ASSERT(fd); socket = zmalloc(sizeof(struct consumer_socket)); if (socket == NULL) { @@ -455,8 +445,8 @@ error: void consumer_add_socket(struct consumer_socket *sock, struct consumer_output *consumer) { - assert(sock); - assert(consumer); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(consumer); lttng_ht_add_unique_ulong(consumer->socks, &sock->node); } @@ -471,12 +461,12 @@ void consumer_del_socket(struct consumer_socket *sock, int ret; struct lttng_ht_iter iter; - assert(sock); - assert(consumer); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(consumer); iter.iter.node = &sock->node.node; ret = lttng_ht_del(consumer->socks, &iter); - assert(!ret); + LTTNG_ASSERT(!ret); } /* @@ -498,7 +488,7 @@ static void destroy_socket_rcu(struct rcu_head *head) */ void consumer_destroy_socket(struct consumer_socket *sock) { - assert(sock); + LTTNG_ASSERT(sock); /* * We DO NOT close the file descriptor here since it is global to the @@ -613,7 +603,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src) int ret; struct consumer_output *output; - assert(src); + LTTNG_ASSERT(src); output = consumer_create_output(src->type); if (output == NULL) { @@ -652,8 +642,8 @@ int consumer_copy_sockets(struct consumer_output *dst, struct lttng_ht_iter iter; struct consumer_socket *socket, *copy_sock; - assert(dst); - assert(src); + LTTNG_ASSERT(dst); + LTTNG_ASSERT(src); rcu_read_lock(); cds_lfht_for_each_entry(src->socks->ht, &iter.iter, socket, node.node) { @@ -700,8 +690,8 @@ int consumer_set_network_uri(const struct ltt_session *session, struct lttng_uri *dst_uri = NULL; /* Code flow error safety net. */ - assert(output); - assert(uri); + LTTNG_ASSERT(output); + LTTNG_ASSERT(uri); switch (uri->stype) { case LTTNG_STREAM_CONTROL: @@ -848,10 +838,10 @@ int consumer_send_fds(struct consumer_socket *sock, const int *fds, { int ret; - assert(fds); - assert(sock); - assert(nb_fd > 0); - assert(pthread_mutex_trylock(sock->lock) == EBUSY); + LTTNG_ASSERT(fds); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(nb_fd > 0); + LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY); ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd); if (ret < 0) { @@ -871,13 +861,13 @@ error: * The consumer socket lock must be held by the caller. */ int consumer_send_msg(struct consumer_socket *sock, - struct lttcomm_consumer_msg *msg) + const struct lttcomm_consumer_msg *msg) { int ret; - assert(msg); - assert(sock); - assert(pthread_mutex_trylock(sock->lock) == EBUSY); + LTTNG_ASSERT(msg); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(pthread_mutex_trylock(sock->lock) == EBUSY); ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { @@ -900,8 +890,8 @@ int consumer_send_channel(struct consumer_socket *sock, { int ret; - assert(msg); - assert(sock); + LTTNG_ASSERT(msg); + LTTNG_ASSERT(sock); ret = consumer_send_msg(sock, msg); if (ret < 0) { @@ -923,6 +913,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int switch_timer_interval, unsigned int read_timer_interval, unsigned int live_timer_interval, + bool is_in_live_session, unsigned int monitor_timer_interval, int output, int type, @@ -944,23 +935,25 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, struct lttng_trace_chunk *trace_chunk, const struct lttng_credentials *buffer_credentials) { - assert(msg); + LTTNG_ASSERT(msg); - /* Zeroed structure */ + /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); msg->u.ask_channel.buffer_credentials.uid = UINT32_MAX; msg->u.ask_channel.buffer_credentials.gid = UINT32_MAX; - if (trace_chunk) { + if (trace_chunk) { uint64_t chunk_id; enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id); - } - msg->u.ask_channel.buffer_credentials.uid = buffer_credentials->uid; - msg->u.ask_channel.buffer_credentials.gid = buffer_credentials->gid; + } + msg->u.ask_channel.buffer_credentials.uid = + lttng_credentials_get_uid(buffer_credentials); + msg->u.ask_channel.buffer_credentials.gid = + lttng_credentials_get_gid(buffer_credentials); msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; msg->u.ask_channel.subbuf_size = subbuf_size; @@ -969,6 +962,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; msg->u.ask_channel.live_timer_interval = live_timer_interval; + msg->u.ask_channel.is_live = is_in_live_session; msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; @@ -1024,22 +1018,23 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_count, unsigned int monitor, unsigned int live_timer_interval, + bool is_in_live_session, unsigned int monitor_timer_interval, struct lttng_trace_chunk *trace_chunk) { - assert(msg); + LTTNG_ASSERT(msg); /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); - if (trace_chunk) { + if (trace_chunk) { uint64_t chunk_id; enum lttng_trace_chunk_status chunk_status; chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id); - } + } /* Send channel */ msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; @@ -1053,6 +1048,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.tracefile_count = tracefile_count; msg->u.channel.monitor = monitor; msg->u.channel.live_timer_interval = live_timer_interval; + msg->u.channel.is_live = is_in_live_session; msg->u.channel.monitor_timer_interval = monitor_timer_interval; strncpy(msg->u.channel.pathname, pathname, @@ -1071,7 +1067,7 @@ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t stream_key, int32_t cpu) { - assert(msg); + LTTNG_ASSERT(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); @@ -1085,7 +1081,7 @@ void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, uint64_t channel_key, uint64_t net_seq_idx) { - assert(msg); + LTTNG_ASSERT(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); @@ -1103,10 +1099,10 @@ int consumer_send_stream(struct consumer_socket *sock, { int ret; - assert(msg); - assert(dst); - assert(sock); - assert(fds); + LTTNG_ASSERT(msg); + LTTNG_ASSERT(dst); + LTTNG_ASSERT(sock); + LTTNG_ASSERT(fds); ret = consumer_send_msg(sock, msg); if (ret < 0) { @@ -1141,9 +1137,9 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_consumer_msg msg; /* Code flow error. Safety net. */ - assert(rsock); - assert(consumer); - assert(consumer_sock); + LTTNG_ASSERT(rsock); + LTTNG_ASSERT(consumer); + LTTNG_ASSERT(consumer_sock); memset(&msg, 0, sizeof(msg)); /* Bail out if consumer is disabled */ @@ -1156,12 +1152,10 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, char output_path[LTTNG_PATH_MAX] = {}; uint64_t relayd_session_id; - ret = relayd_create_session(rsock, - &relayd_session_id, + ret = relayd_create_session(rsock, &relayd_session_id, session_name, hostname, base_path, - session_live_timer, - consumer->snapshot, session_id, - sessiond_uuid, current_chunk_id, + session_live_timer, consumer->snapshot, + session_id, the_sessiond_uuid, current_chunk_id, session_creation_time, session_name_contains_creation_time, output_path); @@ -1270,7 +1264,7 @@ int consumer_is_data_pending(uint64_t session_id, struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; - assert(consumer); + LTTNG_ASSERT(consumer); DBG3("Consumer data pending for id %" PRIu64, session_id); @@ -1326,7 +1320,7 @@ int consumer_flush_channel(struct consumer_socket *socket, uint64_t key) int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer flush channel key %" PRIu64, key); @@ -1358,7 +1352,7 @@ int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t ke int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer clear quiescent channel key %" PRIu64, key); @@ -1392,7 +1386,7 @@ int consumer_close_metadata(struct consumer_socket *socket, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer close metadata channel key %" PRIu64, metadata_key); @@ -1425,7 +1419,7 @@ int consumer_setup_metadata(struct consumer_socket *socket, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key); @@ -1460,7 +1454,7 @@ int consumer_push_metadata(struct consumer_socket *socket, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr); @@ -1513,8 +1507,8 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, enum lttng_error_code status = LTTNG_OK; struct lttcomm_consumer_msg msg; - assert(socket); - assert(output); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(output); DBG("Consumer snapshot channel key %" PRIu64, key); @@ -1575,7 +1569,7 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; - assert(consumer); + LTTNG_ASSERT(consumer); DBG3("Consumer discarded events id %" PRIu64, session_id); @@ -1632,7 +1626,7 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; - assert(consumer); + LTTNG_ASSERT(consumer); DBG3("Consumer lost packets id %" PRIu64, session_id); @@ -1692,7 +1686,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, int ret; struct lttcomm_consumer_msg msg; - assert(socket); + LTTNG_ASSERT(socket); DBG("Consumer rotate channel key %" PRIu64, key); @@ -1727,6 +1721,60 @@ error: return ret; } +int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key) +{ + int ret; + const struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS, + .u.open_channel_packets.key = key, + }; + + LTTNG_ASSERT(socket); + + DBG("Consumer open channel packets: channel key = %" PRIu64, key); + + health_code_update(); + + pthread_mutex_lock(socket->lock); + ret = consumer_send_msg(socket, &msg); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error_socket; + } + +error_socket: + health_code_update(); + return ret; +} + +int consumer_clear_channel(struct consumer_socket *socket, uint64_t key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + LTTNG_ASSERT(socket); + + DBG("Consumer clear channel %" PRIu64, key); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_CHANNEL; + msg.u.clear_channel.key = key; + + health_code_update(); + + pthread_mutex_lock(socket->lock); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto error_socket; + } + +error_socket: + pthread_mutex_unlock(socket->lock); + + health_code_update(); + return ret; +} + int consumer_init(struct consumer_socket *socket, const lttng_uuid sessiond_uuid) { @@ -1735,7 +1783,7 @@ int consumer_init(struct consumer_socket *socket, .cmd_type = LTTNG_CONSUMER_INIT, }; - assert(socket); + LTTNG_ASSERT(socket); DBG("Sending consumer initialization command"); lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); @@ -1774,13 +1822,14 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, char creation_timestamp_buffer[ISO8601_STR_LEN]; const char *creation_timestamp_str = "(none)"; const bool chunk_has_local_output = relayd_id == -1ULL; + enum lttng_trace_chunk_status tc_status; struct lttcomm_consumer_msg msg = { .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK, .u.create_trace_chunk.session_id = session_id, }; - assert(socket); - assert(chunk); + LTTNG_ASSERT(socket); + LTTNG_ASSERT(chunk); if (relayd_id != -1ULL) { LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, @@ -1851,12 +1900,9 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, ret = -LTTNG_ERR_FATAL; goto error; } - ret = lttng_directory_handle_create_subdirectory_as_user( - chunk_directory_handle, - domain_subdir, - S_IRWXU | S_IRWXG, - &chunk_credentials); - if (ret) { + tc_status = lttng_trace_chunk_create_subdirectory( + chunk, domain_subdir); + if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) { PERROR("Failed to create chunk domain output directory \"%s\"", domain_subdir); ret = -LTTNG_ERR_FATAL; @@ -1880,12 +1926,12 @@ int consumer_create_trace_chunk(struct consumer_socket *socket, */ domain_dirfd = lttng_directory_handle_get_dirfd( domain_handle); - assert(domain_dirfd >= 0); + LTTNG_ASSERT(domain_dirfd >= 0); msg.u.create_trace_chunk.credentials.value.uid = - chunk_credentials.uid; + lttng_credentials_get_uid(&chunk_credentials); msg.u.create_trace_chunk.credentials.value.gid = - chunk_credentials.gid; + lttng_credentials_get_gid(&chunk_credentials); msg.u.create_trace_chunk.credentials.is_set = 1; } @@ -1942,7 +1988,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, const char *close_command_name = "none"; struct lttng_dynamic_buffer path_reception_buffer; - assert(socket); + LTTNG_ASSERT(socket); lttng_dynamic_buffer_init(&path_reception_buffer); if (relayd_id != -1ULL) { @@ -1971,7 +2017,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, * (consumerd and relayd). They are used internally for * backward-compatibility purposes. */ - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.chunk_id = chunk_id; chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, @@ -1981,7 +2027,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, * Otherwise, the close timestamp would never be transmitted to the * peers. */ - assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; if (msg.u.close_trace_chunk.close_command.is_set) { @@ -2063,7 +2109,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket, uint64_t chunk_id; const char *consumer_reply_str; - assert(socket); + LTTNG_ASSERT(socket); if (relayd_id != -1ULL) { LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id,