sessiond: notification: use lttng_payload for communications
[lttng-tools.git] / src / lib / lttng-ctl / channel.c
index 75a911f2af3ec7080085c648792d23393f82ad1c..a4e910e1d93db1b02b35f75824ddfb7f714cf5a1 100644 (file)
@@ -1,18 +1,8 @@
 /*
- * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
- * This library is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License, version 2.1 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: LGPL-2.1-only
  *
- * This library is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
- * for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this library; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
 #include <lttng/notification/notification-internal.h>
 #include <common/dynamic-buffer.h>
 #include <common/utils.h>
 #include <common/defaults.h>
+#include <common/payload.h>
+#include <common/payload-view.h>
+#include <common/unix.h>
 #include <assert.h>
 #include "lttng-ctl-helper.h"
+#include <common/compat/poll.h>
 
 static
 int handshake(struct lttng_notification_channel *channel);
 
 /*
  * Populates the reception buffer with the next complete message.
- * The caller must acquire the client's lock.
+ * The caller must acquire the channel's lock.
  */
 static
 int receive_message(struct lttng_notification_channel *channel)
@@ -40,10 +34,7 @@ int receive_message(struct lttng_notification_channel *channel)
        ssize_t ret;
        struct lttng_notification_channel_message msg;
 
-       ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
-       if (ret) {
-               goto error;
-       }
+       lttng_payload_clear(&channel->reception_payload);
 
        ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
        if (ret <= 0) {
@@ -57,31 +48,41 @@ int receive_message(struct lttng_notification_channel *channel)
        }
 
        /* Add message header at buffer's start. */
-       ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
+       ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
                        sizeof(msg));
        if (ret) {
                goto error;
        }
 
        /* Reserve space for the payload. */
-       ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
-                       channel->reception_buffer.size + msg.size);
+       ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
+                       channel->reception_payload.buffer.size + msg.size);
        if (ret) {
                goto error;
        }
 
        /* Receive message payload. */
        ret = lttcomm_recv_unix_sock(channel->socket,
-                       channel->reception_buffer.data + sizeof(msg), msg.size);
+                       channel->reception_payload.buffer.data + sizeof(msg), msg.size);
        if (ret < (ssize_t) msg.size) {
                ret = -1;
                goto error;
        }
+
+       /* Receive message fds. */
+       if (msg.fds != 0) {
+               ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
+                               msg.fds, &channel->reception_payload);
+               if (ret < sizeof(int) * msg.fds) {
+                       ret = -1;
+                       goto error;
+               }
+       }
        ret = 0;
 end:
        return ret;
 error:
-       lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0);
+       lttng_payload_clear(&channel->reception_payload);
        goto end;
 }
 
@@ -91,10 +92,10 @@ enum lttng_notification_channel_message_type get_current_message_type(
 {
        struct lttng_notification_channel_message *msg;
 
-       assert(channel->reception_buffer.size >= sizeof(*msg));
+       assert(channel->reception_payload.buffer.size >= sizeof(*msg));
 
        msg = (struct lttng_notification_channel_message *)
-                       channel->reception_buffer.data;
+                       channel->reception_payload.buffer.data;
        return (enum lttng_notification_channel_message_type) msg->type;
 }
 
@@ -104,18 +105,23 @@ struct lttng_notification *create_notification_from_current_message(
 {
        ssize_t ret;
        struct lttng_notification *notification = NULL;
-       struct lttng_buffer_view view;
 
-       if (channel->reception_buffer.size <=
+       if (channel->reception_payload.buffer.size <=
                        sizeof(struct lttng_notification_channel_message)) {
                goto end;
        }
 
-       view = lttng_buffer_view_from_dynamic_buffer(&channel->reception_buffer,
-                       sizeof(struct lttng_notification_channel_message), -1);
+       {
+               struct lttng_payload_view view = lttng_payload_view_from_payload(
+                               &channel->reception_payload,
+                               sizeof(struct lttng_notification_channel_message),
+                               -1);
+
+               ret = lttng_notification_create_from_payload(
+                               &view, &notification);
+       }
 
-       ret = lttng_notification_create_from_buffer(&view, &notification);
-       if (ret != channel->reception_buffer.size -
+       if (ret != channel->reception_payload.buffer.size -
                        sizeof(struct lttng_notification_channel_message)) {
                lttng_notification_destroy(notification);
                notification = NULL;
@@ -149,7 +155,7 @@ struct lttng_notification_channel *lttng_notification_channel_create(
        }
        channel->socket = -1;
        pthread_mutex_init(&channel->lock, NULL);
-       lttng_dynamic_buffer_init(&channel->reception_buffer);
+       lttng_payload_init(&channel->reception_payload);
        CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
 
        is_root = (getuid() == 0);
@@ -207,12 +213,15 @@ lttng_notification_channel_get_next_notification(
        struct lttng_notification *notification = NULL;
        enum lttng_notification_channel_status status =
                        LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+       struct lttng_poll_event events;
 
        if (!channel || !_notification) {
                status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
                goto end;
        }
 
+       pthread_mutex_lock(&channel->lock);
+
        if (channel->pending_notifications.count) {
                struct pending_notification *pending_notification;
 
@@ -230,15 +239,43 @@ lttng_notification_channel_get_next_notification(
                cds_list_del(&pending_notification->node);
                channel->pending_notifications.count--;
                free(pending_notification);
-               goto end;
+               goto end_unlock;
        }
 
-       pthread_mutex_lock(&channel->lock);
+       /*
+        * Block on interruptible epoll/poll() instead of the message reception
+        * itself as the recvmsg() wrappers always restart on EINTR. We choose
+        * to wait using interruptible epoll/poll() in order to:
+        *   1) Return if a signal occurs,
+        *   2) Not deal with partially received messages.
+        *
+        * The drawback to this approach is that we assume that messages
+        * are complete/well formed. If a message is shorter than its
+        * announced length, receive_message() will block on recvmsg()
+        * and never return (even if a signal is received).
+        */
+       ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
+       if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_unlock;
+       }
+       ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_clean_poll;
+       }
+       ret = lttng_poll_wait_interruptible(&events, -1);
+       if (ret <= 0) {
+               status = (ret == -1 && errno == EINTR) ?
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_INTERRUPTED :
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_clean_poll;
+       }
 
        ret = receive_message(channel);
        if (ret) {
                status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
-               goto end_unlock;
+               goto end_clean_poll;
        }
 
        switch (get_current_message_type(channel)) {
@@ -247,7 +284,7 @@ lttng_notification_channel_get_next_notification(
                                channel);
                if (!notification) {
                        status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
-                       goto end_unlock;
+                       goto end_clean_poll;
                }
                break;
        case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
@@ -257,15 +294,15 @@ lttng_notification_channel_get_next_notification(
        default:
                /* Protocol error. */
                status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
-               goto end_unlock;
+               goto end_clean_poll;
        }
 
+end_clean_poll:
+       lttng_poll_clean(&events);
 end_unlock:
        pthread_mutex_unlock(&channel->lock);
+       *_notification = notification;
 end:
-       if (_notification) {
-               *_notification = notification;
-       }
        return status;
 }
 
@@ -354,6 +391,106 @@ error:
        goto end;
 }
 
+enum lttng_notification_channel_status
+lttng_notification_channel_has_pending_notification(
+               struct lttng_notification_channel *channel,
+               bool *_notification_pending)
+{
+       int ret;
+       enum lttng_notification_channel_status status =
+                       LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
+       struct lttng_poll_event events;
+
+       if (!channel || !_notification_pending) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
+               goto end;
+       }
+
+       pthread_mutex_lock(&channel->lock);
+
+       if (channel->pending_notifications.count) {
+               *_notification_pending = true;
+               goto end_unlock;
+       }
+
+       if (channel->socket < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED;
+               goto end_unlock;
+       }
+
+       /*
+        * Check, without blocking, if data is available on the channel's
+        * socket. If there is data available, it is safe to read (blocking)
+        * on the socket for a message from the session daemon.
+        *
+        * Since all commands wait for the session daemon's reply before
+        * releasing the channel's lock, the protocol only allows for
+        * notifications and "notification dropped" messages to come
+        * through. If we receive a different message type, it is
+        * considered a protocol error.
+        *
+        * Note that this function is not guaranteed not to block. This
+        * will block until our peer (the session daemon) has sent a complete
+        * message if we see data available on the socket. If the peer does
+        * not respect the protocol, this may block indefinitely.
+        */
+       ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
+       if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_unlock;
+       }
+       ret = lttng_poll_add(&events, channel->socket, LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_clean_poll;
+       }
+       /* timeout = 0: return immediately. */
+       ret = lttng_poll_wait_interruptible(&events, 0);
+       if (ret == 0) {
+               /* No data available. */
+               *_notification_pending = false;
+               goto end_clean_poll;
+       } else if (ret < 0) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_clean_poll;
+       }
+
+       /* Data available on socket. */
+       ret = receive_message(channel);
+       if (ret) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_clean_poll;
+       }
+
+       switch (get_current_message_type(channel)) {
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION:
+               ret = enqueue_notification_from_current_message(channel);
+               if (ret) {
+                       goto end_clean_poll;
+               }
+               *_notification_pending = true;
+               break;
+       case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED:
+               ret = enqueue_dropped_notification(channel);
+               if (ret) {
+                       goto end_clean_poll;
+               }
+               *_notification_pending = true;
+               break;
+       default:
+               /* Protocol error. */
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+               goto end_clean_poll;
+       }
+
+end_clean_poll:
+       lttng_poll_clean(&events);
+end_unlock:
+       pthread_mutex_unlock(&channel->lock);
+end:
+       return status;
+}
+
 static
 int receive_command_reply(struct lttng_notification_channel *channel,
                enum lttng_notification_channel_status *status)
@@ -391,7 +528,7 @@ int receive_command_reply(struct lttng_notification_channel *channel,
                        struct lttng_notification_channel_command_handshake *handshake;
 
                        handshake = (struct lttng_notification_channel_command_handshake *)
-                                       (channel->reception_buffer.data +
+                                       (channel->reception_payload.buffer.data +
                                        sizeof(struct lttng_notification_channel_message));
                        channel->version.major = handshake->major;
                        channel->version.minor = handshake->minor;
@@ -405,7 +542,7 @@ int receive_command_reply(struct lttng_notification_channel *channel,
        }
 
 exit_loop:
-       if (channel->reception_buffer.size <
+       if (channel->reception_payload.buffer.size <
                        (sizeof(struct lttng_notification_channel_message) +
                        sizeof(*reply))) {
                /* Invalid message received. */
@@ -414,7 +551,7 @@ exit_loop:
        }
 
        reply = (struct lttng_notification_channel_command_reply *)
-                       (channel->reception_buffer.data +
+                       (channel->reception_payload.buffer.data +
                        sizeof(struct lttng_notification_channel_message));
        *status = (enum lttng_notification_channel_status) reply->status;
 end:
@@ -442,7 +579,7 @@ int handshake(struct lttng_notification_channel *channel)
 
        pthread_mutex_lock(&channel->lock);
 
-       ret = lttcomm_send_unix_sock(channel->socket, send_buffer,
+       ret = lttcomm_send_creds_unix_sock(channel->socket, send_buffer,
                        sizeof(send_buffer));
        if (ret < 0) {
                goto end_unlock;
@@ -476,14 +613,16 @@ enum lttng_notification_channel_status send_condition_command(
                const struct lttng_condition *condition)
 {
        int socket;
-       ssize_t command_size, ret;
+       ssize_t ret;
        enum lttng_notification_channel_status status =
                        LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
-       char *command_buffer = NULL;
-       struct lttng_notification_channel_message cmd_message = {
-               .type = type,
+       struct lttng_payload payload;
+       struct lttng_notification_channel_message cmd_header = {
+               .type = (int8_t) type,
        };
 
+       lttng_payload_init(&payload);
+
        if (!channel) {
                status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
                goto end;
@@ -494,36 +633,56 @@ enum lttng_notification_channel_status send_condition_command(
 
        pthread_mutex_lock(&channel->lock);
        socket = channel->socket;
+
        if (!lttng_condition_validate(condition)) {
                status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
                goto end_unlock;
        }
 
-       ret = lttng_condition_serialize(condition, NULL);
-       if (ret < 0) {
-               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
-               goto end_unlock;
-       }
-       assert(ret < UINT32_MAX);
-       cmd_message.size = (uint32_t) ret;
-       command_size = ret + sizeof(
-                       struct lttng_notification_channel_message);
-       command_buffer = zmalloc(command_size);
-       if (!command_buffer) {
+       ret = lttng_dynamic_buffer_append(&payload.buffer, &cmd_header,
+                       sizeof(cmd_header));
+       if (ret) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
                goto end_unlock;
        }
 
-       memcpy(command_buffer, &cmd_message, sizeof(cmd_message));
-       ret = lttng_condition_serialize(condition,
-                       command_buffer + sizeof(cmd_message));
-       if (ret < 0) {
+       ret = lttng_condition_serialize(condition, &payload);
+       if (ret) {
+               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
                goto end_unlock;
        }
 
-       ret = lttcomm_send_unix_sock(socket, command_buffer, command_size);
-       if (ret < 0) {
-               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
-               goto end_unlock;
+       /* Update payload length. */
+       ((struct lttng_notification_channel_message *) payload.buffer.data)->size =
+                       (uint32_t) (payload.buffer.size - sizeof(cmd_header));
+
+       {
+               struct lttng_payload_view pv =
+                               lttng_payload_view_from_payload(
+                                               &payload, 0, -1);
+               const int fd_count =
+                               lttng_payload_view_get_fd_handle_count(&pv);
+
+               /* Update fd count. */
+               ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
+                       (uint32_t) fd_count;
+
+               ret = lttcomm_send_unix_sock(
+                       socket, pv.buffer.data, pv.buffer.size);
+               if (ret < 0) {
+                       status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+                       goto end_unlock;
+               }
+
+               /* Pass fds if present. */
+               if (fd_count > 0) {
+                       ret = lttcomm_send_payload_view_fds_unix_sock(socket,
+                                       &pv);
+                       if (ret < 0) {
+                               status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+                               goto end_unlock;
+                       }
+               }
        }
 
        ret = receive_command_reply(channel, &status);
@@ -534,7 +693,7 @@ enum lttng_notification_channel_status send_condition_command(
 end_unlock:
        pthread_mutex_unlock(&channel->lock);
 end:
-       free(command_buffer);
+       lttng_payload_reset(&payload);
        return status;
 }
 
@@ -567,6 +726,6 @@ void lttng_notification_channel_destroy(
                (void) lttcomm_close_unix_sock(channel->socket);
        }
        pthread_mutex_destroy(&channel->lock);
-       lttng_dynamic_buffer_reset(&channel->reception_buffer);
+       lttng_payload_reset(&channel->reception_payload);
        free(channel);
 }
This page took 0.02982 seconds and 4 git commands to generate.