X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Flib%2Flttng-ctl%2Fchannel.c;h=a4e910e1d93db1b02b35f75824ddfb7f714cf5a1;hp=5487d051b5013fd60b080491ce865966bc6ffeab;hb=882093eef6fdd658833928a62be5d42fc0cdcb00;hpb=f83bcd90ceb5ce659ffe9d7747d6f3366a09748a diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c index 5487d051b..a4e910e1d 100644 --- a/src/lib/lttng-ctl/channel.c +++ b/src/lib/lttng-ctl/channel.c @@ -1,18 +1,8 @@ /* - * Copyright (C) 2017 - Jérémie Galarneau + * Copyright (C) 2017 Jérémie Galarneau * - * This library is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License, version 2.1 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: LGPL-2.1-only * - * This library is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License - * for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this library; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include @@ -24,10 +14,12 @@ #include #include #include +#include +#include +#include #include #include "lttng-ctl-helper.h" -#include -#include +#include static int handshake(struct lttng_notification_channel *channel); @@ -42,10 +34,7 @@ int receive_message(struct lttng_notification_channel *channel) ssize_t ret; struct lttng_notification_channel_message msg; - if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) { - ret = -1; - goto end; - } + lttng_payload_clear(&channel->reception_payload); ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); if (ret <= 0) { @@ -59,33 +48,41 @@ int receive_message(struct lttng_notification_channel *channel) } /* Add message header at buffer's start. */ - ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg, + ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg, sizeof(msg)); if (ret) { goto error; } /* Reserve space for the payload. */ - ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, - channel->reception_buffer.size + msg.size); + ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer, + channel->reception_payload.buffer.size + msg.size); if (ret) { goto error; } /* Receive message payload. */ ret = lttcomm_recv_unix_sock(channel->socket, - channel->reception_buffer.data + sizeof(msg), msg.size); + channel->reception_payload.buffer.data + sizeof(msg), msg.size); if (ret < (ssize_t) msg.size) { ret = -1; goto error; } + + /* Receive message fds. */ + if (msg.fds != 0) { + ret = lttcomm_recv_payload_fds_unix_sock(channel->socket, + msg.fds, &channel->reception_payload); + if (ret < sizeof(int) * msg.fds) { + ret = -1; + goto error; + } + } ret = 0; end: return ret; error: - if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) { - ret = -1; - } + lttng_payload_clear(&channel->reception_payload); goto end; } @@ -95,10 +92,10 @@ enum lttng_notification_channel_message_type get_current_message_type( { struct lttng_notification_channel_message *msg; - assert(channel->reception_buffer.size >= sizeof(*msg)); + assert(channel->reception_payload.buffer.size >= sizeof(*msg)); msg = (struct lttng_notification_channel_message *) - channel->reception_buffer.data; + channel->reception_payload.buffer.data; return (enum lttng_notification_channel_message_type) msg->type; } @@ -108,18 +105,23 @@ struct lttng_notification *create_notification_from_current_message( { ssize_t ret; struct lttng_notification *notification = NULL; - struct lttng_buffer_view view; - if (channel->reception_buffer.size <= + if (channel->reception_payload.buffer.size <= sizeof(struct lttng_notification_channel_message)) { goto end; } - view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer, - sizeof(struct lttng_notification_channel_message), -1); + { + struct lttng_payload_view view = lttng_payload_view_from_payload( + &channel->reception_payload, + sizeof(struct lttng_notification_channel_message), + -1); - ret = lttng_notification_create_from_buffer(&view, ¬ification); - if (ret != channel->reception_buffer.size - + ret = lttng_notification_create_from_payload( + &view, ¬ification); + } + + if (ret != channel->reception_payload.buffer.size - sizeof(struct lttng_notification_channel_message)) { lttng_notification_destroy(notification); notification = NULL; @@ -153,7 +155,7 @@ struct lttng_notification_channel *lttng_notification_channel_create( } channel->socket = -1; pthread_mutex_init(&channel->lock, NULL); - lttng_dynamic_buffer_init(&channel->reception_buffer); + lttng_payload_init(&channel->reception_payload); CDS_INIT_LIST_HEAD(&channel->pending_notifications.list); is_root = (getuid() == 0); @@ -211,7 +213,7 @@ lttng_notification_channel_get_next_notification( struct lttng_notification *notification = NULL; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; - fd_set read_fds; + struct lttng_poll_event events; if (!channel || !_notification) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; @@ -241,9 +243,9 @@ lttng_notification_channel_get_next_notification( } /* - * Block on select() instead of the message reception itself as the - * recvmsg() wrappers always restard on EINTR. We choose to wait - * using select() in order to: + * Block on interruptible epoll/poll() instead of the message reception + * itself as the recvmsg() wrappers always restart on EINTR. We choose + * to wait using interruptible epoll/poll() in order to: * 1) Return if a signal occurs, * 2) Not deal with partially received messages. * @@ -252,20 +254,28 @@ lttng_notification_channel_get_next_notification( * announced length, receive_message() will block on recvmsg() * and never return (even if a signal is received). */ - FD_ZERO(&read_fds); - FD_SET(channel->socket, &read_fds); - ret = select(channel->socket + 1, &read_fds, NULL, NULL, NULL); - if (ret == -1) { - status = errno == EINTR ? + ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + ret = lttng_poll_wait_interruptible(&events, -1); + if (ret <= 0) { + status = (ret == -1 && errno == EINTR) ? LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED : LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } ret = receive_message(channel); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } switch (get_current_message_type(channel)) { @@ -274,7 +284,7 @@ lttng_notification_channel_get_next_notification( channel); if (!notification) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } break; case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: @@ -284,15 +294,15 @@ lttng_notification_channel_get_next_notification( default: /* Protocol error. */ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } +end_clean_poll: + lttng_poll_clean(&events); end_unlock: pthread_mutex_unlock(&channel->lock); + *_notification = notification; end: - if (_notification) { - *_notification = notification; - } return status; } @@ -389,11 +399,7 @@ lttng_notification_channel_has_pending_notification( int ret; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; - fd_set read_fds; - struct timeval timeout; - - FD_ZERO(&read_fds); - memset(&timeout, 0, sizeof(timeout)); + struct lttng_poll_event events; if (!channel || !_notification_pending) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; @@ -428,48 +434,57 @@ lttng_notification_channel_has_pending_notification( * message if we see data available on the socket. If the peer does * not respect the protocol, this may block indefinitely. */ - FD_SET(channel->socket, &read_fds); - do { - ret = select(channel->socket + 1, &read_fds, NULL, NULL, &timeout); - } while (ret < 0 && errno == EINTR); - + ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + /* timeout = 0: return immediately. */ + ret = lttng_poll_wait_interruptible(&events, 0); if (ret == 0) { /* No data available. */ *_notification_pending = false; - goto end_unlock; + goto end_clean_poll; } else if (ret < 0) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } /* Data available on socket. */ ret = receive_message(channel); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } switch (get_current_message_type(channel)) { case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: ret = enqueue_notification_from_current_message(channel); if (ret) { - goto end_unlock; + goto end_clean_poll; } *_notification_pending = true; break; case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: ret = enqueue_dropped_notification(channel); if (ret) { - goto end_unlock; + goto end_clean_poll; } *_notification_pending = true; break; default: /* Protocol error. */ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } +end_clean_poll: + lttng_poll_clean(&events); end_unlock: pthread_mutex_unlock(&channel->lock); end: @@ -513,7 +528,7 @@ int receive_command_reply(struct lttng_notification_channel *channel, struct lttng_notification_channel_command_handshake *handshake; handshake = (struct lttng_notification_channel_command_handshake *) - (channel->reception_buffer.data + + (channel->reception_payload.buffer.data + sizeof(struct lttng_notification_channel_message)); channel->version.major = handshake->major; channel->version.minor = handshake->minor; @@ -527,7 +542,7 @@ int receive_command_reply(struct lttng_notification_channel *channel, } exit_loop: - if (channel->reception_buffer.size < + if (channel->reception_payload.buffer.size < (sizeof(struct lttng_notification_channel_message) + sizeof(*reply))) { /* Invalid message received. */ @@ -536,7 +551,7 @@ exit_loop: } reply = (struct lttng_notification_channel_command_reply *) - (channel->reception_buffer.data + + (channel->reception_payload.buffer.data + sizeof(struct lttng_notification_channel_message)); *status = (enum lttng_notification_channel_status) reply->status; end: @@ -601,12 +616,12 @@ enum lttng_notification_channel_status send_condition_command( ssize_t ret; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; - struct lttng_dynamic_buffer buffer; + struct lttng_payload payload; struct lttng_notification_channel_message cmd_header = { .type = (int8_t) type, }; - lttng_dynamic_buffer_init(&buffer); + lttng_payload_init(&payload); if (!channel) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; @@ -618,32 +633,56 @@ enum lttng_notification_channel_status send_condition_command( pthread_mutex_lock(&channel->lock); socket = channel->socket; + if (!lttng_condition_validate(condition)) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end_unlock; } - ret = lttng_dynamic_buffer_append(&buffer, &cmd_header, + ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header, sizeof(cmd_header)); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; goto end_unlock; } - ret = lttng_condition_serialize(condition, &buffer); + ret = lttng_condition_serialize(condition, &payload); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end_unlock; } /* Update payload length. */ - ((struct lttng_notification_channel_message *) buffer.data)->size = - (uint32_t) (buffer.size - sizeof(cmd_header)); + ((struct lttng_notification_channel_message *) payload.buffer.data)->size = + (uint32_t) (payload.buffer.size - sizeof(cmd_header)); + + { + struct lttng_payload_view pv = + lttng_payload_view_from_payload( + &payload, 0, -1); + const int fd_count = + lttng_payload_view_get_fd_handle_count(&pv); + + /* Update fd count. */ + ((struct lttng_notification_channel_message *) payload.buffer.data)->fds = + (uint32_t) fd_count; + + ret = lttcomm_send_unix_sock( + socket, pv.buffer.data, pv.buffer.size); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } - ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size); - if (ret < 0) { - status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + /* Pass fds if present. */ + if (fd_count > 0) { + ret = lttcomm_send_payload_view_fds_unix_sock(socket, + &pv); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + } } ret = receive_command_reply(channel, &status); @@ -654,7 +693,7 @@ enum lttng_notification_channel_status send_condition_command( end_unlock: pthread_mutex_unlock(&channel->lock); end: - lttng_dynamic_buffer_reset(&buffer); + lttng_payload_reset(&payload); return status; } @@ -687,6 +726,6 @@ void lttng_notification_channel_destroy( (void) lttcomm_close_unix_sock(channel->socket); } pthread_mutex_destroy(&channel->lock); - lttng_dynamic_buffer_reset(&channel->reception_buffer); + lttng_payload_reset(&channel->reception_payload); free(channel); }