Merge duplicate code in consumer for add relayd
[lttng-tools.git] / src / common / consumer.c
index c1dadddb3dd08854bffd8eef8cca3e9a76e82bed..363fa4eda6c5ba0dc75296d4c9f7d764c80b21da 100644 (file)
@@ -27,6 +27,7 @@
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <inttypes.h>
 
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
@@ -1091,7 +1092,7 @@ static int write_relayd_metadata_id(int fd,
                PERROR("write metadata stream id");
                goto end;
        }
-       DBG("Metadata stream id %zu written before data",
+       DBG("Metadata stream id %" PRIu64 " written before data",
                        stream->relayd_stream_id);
 
 end:
@@ -1192,14 +1193,14 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        }
                        goto end;
                } else if (ret > len) {
-                       PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+                       PERROR("Error in file write (ret %zd > len %lu)", ret, len);
                        written += ret;
                        goto end;
                } else {
                        len -= ret;
                        mmap_offset += ret;
                }
-               DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+               DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
 
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
@@ -1840,3 +1841,95 @@ void lttng_consumer_init(void)
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 }
+
+/*
+ * Process the ADD_RELAYD command receive by a consumer.
+ *
+ * This will create a relayd socket pair and add it to the relayd hash table.
+ * The caller MUST acquire a RCU read side lock before calling it.
+ */
+int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+               struct lttng_consumer_local_data *ctx, int sock,
+               struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+{
+       int fd, ret = -1;
+       struct consumer_relayd_sock_pair *relayd;
+
+       DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+
+       /* Get relayd reference if exists. */
+       relayd = consumer_find_relayd(net_seq_idx);
+       if (relayd == NULL) {
+               /* Not found. Allocate one. */
+               relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+               if (relayd == NULL) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+                       goto error;
+               }
+       }
+
+       /* Poll on consumer socket. */
+       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               ret = -EINTR;
+               goto error;
+       }
+
+       /* Get relayd socket from session daemon */
+       ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+       if (ret != sizeof(fd)) {
+               lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+               ret = -1;
+               goto error;
+       }
+
+       /* Copy socket information and received FD */
+       switch (sock_type) {
+       case LTTNG_STREAM_CONTROL:
+               /* Copy received lttcomm socket */
+               lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
+               ret = lttcomm_create_sock(&relayd->control_sock);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               /* Close the created socket fd which is useless */
+               close(relayd->control_sock.fd);
+
+               /* Assign new file descriptor */
+               relayd->control_sock.fd = fd;
+               break;
+       case LTTNG_STREAM_DATA:
+               /* Copy received lttcomm socket */
+               lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
+               ret = lttcomm_create_sock(&relayd->data_sock);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               /* Close the created socket fd which is useless */
+               close(relayd->data_sock.fd);
+
+               /* Assign new file descriptor */
+               relayd->data_sock.fd = fd;
+               break;
+       default:
+               ERR("Unknown relayd socket type (%d)", sock_type);
+               goto error;
+       }
+
+       DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+                       sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+                       relayd->net_seq_idx, fd);
+
+       /*
+        * Add relayd socket pair to consumer data hashtable. If object already
+        * exists or on error, the function gracefully returns.
+        */
+       consumer_add_relayd(relayd);
+
+       /* All good! */
+       ret = 0;
+
+error:
+       return ret;
+}
This page took 0.025261 seconds and 4 git commands to generate.