X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Flib%2Flttng-ctl%2Fchannel.c;h=bcecc65fbbb281d6466b6805c88f28ca1cac448e;hp=5bfb75902e2ff98155884208668125ecd9db1799;hb=d977a743144ea108584e328c32eb217d8162c137;hpb=e4f3498e493f36ac06233fa27229a71567ada3a4 diff --git a/src/lib/lttng-ctl/channel.c b/src/lib/lttng-ctl/channel.c index 5bfb75902..bcecc65fb 100644 --- a/src/lib/lttng-ctl/channel.c +++ b/src/lib/lttng-ctl/channel.c @@ -26,13 +26,14 @@ #include #include #include "lttng-ctl-helper.h" +#include static int handshake(struct lttng_notification_channel *channel); /* * Populates the reception buffer with the next complete message. - * The caller must acquire the client's lock. + * The caller must acquire the channel's lock. */ static int receive_message(struct lttng_notification_channel *channel) @@ -40,9 +41,9 @@ int receive_message(struct lttng_notification_channel *channel) ssize_t ret; struct lttng_notification_channel_message msg; - ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0); - if (ret) { - goto error; + if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) { + ret = -1; + goto end; } ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg)); @@ -209,12 +210,15 @@ lttng_notification_channel_get_next_notification( struct lttng_notification *notification = NULL; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + struct lttng_poll_event events; if (!channel || !_notification) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end; } + pthread_mutex_lock(&channel->lock); + if (channel->pending_notifications.count) { struct pending_notification *pending_notification; @@ -232,15 +236,43 @@ lttng_notification_channel_get_next_notification( cds_list_del(&pending_notification->node); channel->pending_notifications.count--; free(pending_notification); - goto end; + goto end_unlock; } - pthread_mutex_lock(&channel->lock); + /* + * Block on interruptible epoll/poll() instead of the message reception + * itself as the recvmsg() wrappers always restart on EINTR. We choose + * to wait using interruptible epoll/poll() in order to: + * 1) Return if a signal occurs, + * 2) Not deal with partially received messages. + * + * The drawback to this approach is that we assume that messages + * are complete/well formed. If a message is shorter than its + * announced length, receive_message() will block on recvmsg() + * and never return (even if a signal is received). + */ + ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + ret = lttng_poll_wait_interruptible(&events, -1); + if (ret <= 0) { + status = (ret == -1 && errno == EINTR) ? + LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED : + LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } ret = receive_message(channel); if (ret) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } switch (get_current_message_type(channel)) { @@ -249,7 +281,7 @@ lttng_notification_channel_get_next_notification( channel); if (!notification) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } break; case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: @@ -259,15 +291,15 @@ lttng_notification_channel_get_next_notification( default: /* Protocol error. */ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; - goto end_unlock; + goto end_clean_poll; } +end_clean_poll: + lttng_poll_clean(&events); end_unlock: pthread_mutex_unlock(&channel->lock); + *_notification = notification; end: - if (_notification) { - *_notification = notification; - } return status; } @@ -356,6 +388,106 @@ error: goto end; } +enum lttng_notification_channel_status +lttng_notification_channel_has_pending_notification( + struct lttng_notification_channel *channel, + bool *_notification_pending) +{ + int ret; + enum lttng_notification_channel_status status = + LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; + struct lttng_poll_event events; + + if (!channel || !_notification_pending) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; + goto end; + } + + pthread_mutex_lock(&channel->lock); + + if (channel->pending_notifications.count) { + *_notification_pending = true; + goto end_unlock; + } + + if (channel->socket < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED; + goto end_unlock; + } + + /* + * Check, without blocking, if data is available on the channel's + * socket. If there is data available, it is safe to read (blocking) + * on the socket for a message from the session daemon. + * + * Since all commands wait for the session daemon's reply before + * releasing the channel's lock, the protocol only allows for + * notifications and "notification dropped" messages to come + * through. If we receive a different message type, it is + * considered a protocol error. + * + * Note that this function is not guaranteed not to block. This + * will block until our peer (the session daemon) has sent a complete + * message if we see data available on the socket. If the peer does + * not respect the protocol, this may block indefinitely. + */ + ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_unlock; + } + ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR); + if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + /* timeout = 0: return immediately. */ + ret = lttng_poll_wait_interruptible(&events, 0); + if (ret == 0) { + /* No data available. */ + *_notification_pending = false; + goto end_clean_poll; + } else if (ret < 0) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + + /* Data available on socket. */ + ret = receive_message(channel); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + + switch (get_current_message_type(channel)) { + case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION: + ret = enqueue_notification_from_current_message(channel); + if (ret) { + goto end_clean_poll; + } + *_notification_pending = true; + break; + case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED: + ret = enqueue_dropped_notification(channel); + if (ret) { + goto end_clean_poll; + } + *_notification_pending = true; + break; + default: + /* Protocol error. */ + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; + goto end_clean_poll; + } + +end_clean_poll: + lttng_poll_clean(&events); +end_unlock: + pthread_mutex_unlock(&channel->lock); +end: + return status; +} + static int receive_command_reply(struct lttng_notification_channel *channel, enum lttng_notification_channel_status *status) @@ -444,7 +576,7 @@ int handshake(struct lttng_notification_channel *channel) pthread_mutex_lock(&channel->lock); - ret = lttcomm_send_unix_sock(channel->socket, send_buffer, + ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer, sizeof(send_buffer)); if (ret < 0) { goto end_unlock; @@ -478,14 +610,16 @@ enum lttng_notification_channel_status send_condition_command( const struct lttng_condition *condition) { int socket; - ssize_t command_size, ret; + ssize_t ret; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; - char *command_buffer = NULL; - struct lttng_notification_channel_message cmd_message = { - .type = type, + struct lttng_dynamic_buffer buffer; + struct lttng_notification_channel_message cmd_header = { + .type = (int8_t) type, }; + lttng_dynamic_buffer_init(&buffer); + if (!channel) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end; @@ -501,28 +635,24 @@ enum lttng_notification_channel_status send_condition_command( goto end_unlock; } - ret = lttng_condition_serialize(condition, NULL); - if (ret < 0) { - status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; - goto end_unlock; - } - assert(ret < UINT32_MAX); - cmd_message.size = (uint32_t) ret; - command_size = ret + sizeof( - struct lttng_notification_channel_message); - command_buffer = zmalloc(command_size); - if (!command_buffer) { + ret = lttng_dynamic_buffer_append(&buffer, &cmd_header, + sizeof(cmd_header)); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; goto end_unlock; } - memcpy(command_buffer, &cmd_message, sizeof(cmd_message)); - ret = lttng_condition_serialize(condition, - command_buffer + sizeof(cmd_message)); - if (ret < 0) { + ret = lttng_condition_serialize(condition, &buffer); + if (ret) { + status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID; goto end_unlock; } - ret = lttcomm_send_unix_sock(socket, command_buffer, command_size); + /* Update payload length. */ + ((struct lttng_notification_channel_message *) buffer.data)->size = + (uint32_t) (buffer.size - sizeof(cmd_header)); + + ret = lttcomm_send_unix_sock(socket, buffer.data, buffer.size); if (ret < 0) { status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR; goto end_unlock; @@ -536,7 +666,7 @@ enum lttng_notification_channel_status send_condition_command( end_unlock: pthread_mutex_unlock(&channel->lock); end: - free(command_buffer); + lttng_dynamic_buffer_reset(&buffer); return status; }