Merge duplicate code in consumer for add relayd
[lttng-tools.git] / src / common / consumer.c
index 1863cddc5757e6e1788642dd788629303a890881..363fa4eda6c5ba0dc75296d4c9f7d764c80b21da 100644 (file)
@@ -27,6 +27,7 @@
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <inttypes.h>
 
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
@@ -274,12 +275,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                uatomic_dec(&relayd->refcount);
                assert(uatomic_read(&relayd->refcount) >= 0);
 
+               /* Closing streams requires to lock the control socket. */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_send_close_stream(&relayd->control_sock,
                                stream->relayd_stream_id,
                                stream->next_net_seq_num - 1);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
-                       ERR("Unable to close stream on the relayd. Continuing");
-                       /* Continue here. There is nothing we can do for the relayd.*/
+                       DBG("Unable to close stream on the relayd. Continuing");
+                       /*
+                        * Continue here. There is nothing we can do for the relayd.
+                        * Chances are that the relayd has closed the socket so we just
+                        * continue cleaning up.
+                        */
                }
 
                /* Both conditions are met, we destroy the relayd. */
@@ -434,8 +442,10 @@ end:
 }
 
 /*
- * Add relayd socket to global consumer data hashtable.
+ * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
+ * be acquired before calling this.
  */
+
 int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret = 0;
@@ -447,20 +457,15 @@ int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
                goto end;
        }
 
-       rcu_read_lock();
-
        lttng_ht_lookup(consumer_data.relayd_ht,
                        (void *)((unsigned long) relayd->net_seq_idx), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
-               rcu_read_unlock();
                /* Relayd already exist. Ignore the insertion */
                goto end;
        }
        lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
 
-       rcu_read_unlock();
-
 end:
        return ret;
 }
@@ -529,28 +534,19 @@ error:
  *
  * Return destination file descriptor or negative value on error.
  */
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
-               size_t data_size)
+static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
+               size_t data_size, struct consumer_relayd_sock_pair *relayd)
 {
        int outfd = -1, ret;
-       struct consumer_relayd_sock_pair *relayd;
        struct lttcomm_relayd_data_hdr data_hdr;
 
        /* Safety net */
        assert(stream);
+       assert(relayd);
 
        /* Reset data header */
        memset(&data_hdr, 0, sizeof(data_hdr));
 
-       rcu_read_lock();
-       /* Get relayd reference of the stream. */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd == NULL) {
-               /* Stream is either local or corrupted */
-               goto error;
-       }
-
-       DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
        if (stream->metadata_flag) {
                /* Caller MUST acquire the relayd control socket lock */
                ret = relayd_send_metadata(&relayd->control_sock, data_size);
@@ -578,7 +574,6 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
        }
 
 error:
-       rcu_read_unlock();
        return outfd;
 }
 
@@ -1079,7 +1074,37 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
 }
 
 /*
- * Mmap the ring buffer, read it and write the data to the tracefile.
+ * Write the metadata stream id on the specified file descriptor.
+ */
+static int write_relayd_metadata_id(int fd,
+               struct lttng_consumer_stream *stream,
+               struct consumer_relayd_sock_pair *relayd)
+{
+       int ret;
+       uint64_t metadata_id;
+
+       metadata_id = htobe64(stream->relayd_stream_id);
+       do {
+               ret = write(fd, (void *) &metadata_id,
+                               sizeof(stream->relayd_stream_id));
+       } while (ret < 0 && errno == EINTR);
+       if (ret < 0) {
+               PERROR("write metadata stream id");
+               goto end;
+       }
+       DBG("Metadata stream id %" PRIu64 " written before data",
+                       stream->relayd_stream_id);
+
+end:
+       return ret;
+}
+
+/*
+ * Mmap the ring buffer, read it and write the data to the tracefile. This is a
+ * core function for writing trace buffers to either the local filesystem or
+ * the network.
+ *
+ * Careful review MUST be put if any changes occur!
  *
  * Returns the number of bytes written
  */
@@ -1092,7 +1117,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
        int outfd = stream->out_fd;
-       uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        /* RCU lock for the relayd pointer */
@@ -1141,30 +1165,18 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        netlen += sizeof(stream->relayd_stream_id);
                }
 
-               ret = consumer_handle_stream_before_relayd(stream, netlen);
+               ret = write_relayd_stream_header(stream, netlen, relayd);
                if (ret >= 0) {
                        /* Use the returned socket. */
                        outfd = ret;
 
                        /* Write metadata stream id before payload */
                        if (stream->metadata_flag) {
-                               metadata_id = htobe64(stream->relayd_stream_id);
-                               do {
-                                       ret = write(outfd, (void *) &metadata_id,
-                                                       sizeof(stream->relayd_stream_id));
-                               } while (ret < 0 && errno == EINTR);
+                               ret = write_relayd_metadata_id(outfd, stream, relayd);
                                if (ret < 0) {
-                                       PERROR("write metadata stream id");
                                        written = ret;
                                        goto end;
                                }
-                               DBG("Metadata stream id %zu written before data",
-                                               stream->relayd_stream_id);
-                               /*
-                                * We do this so the return value can match the len passed as
-                                * argument to this function.
-                                */
-                               written -= sizeof(stream->relayd_stream_id);
                        }
                }
                /* Else, use the default set before which is the filesystem. */
@@ -1181,14 +1193,14 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        }
                        goto end;
                } else if (ret > len) {
-                       PERROR("Error in file write (ret %ld > len %lu)", ret, len);
+                       PERROR("Error in file write (ret %zd > len %lu)", ret, len);
                        written += ret;
                        goto end;
                } else {
                        len -= ret;
                        mmap_offset += ret;
                }
-               DBG("Consumer mmap write() ret %ld (len %lu)", ret, len);
+               DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
 
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
@@ -1226,7 +1238,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        int fd = stream->wait_fd;
        /* Default is on the disk */
        int outfd = stream->out_fd;
-       uint64_t metadata_id;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        switch (consumer_data.type) {
@@ -1260,18 +1271,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                 */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
 
-               metadata_id = htobe64(stream->relayd_stream_id);
-               do {
-                       ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id,
-                                       sizeof(stream->relayd_stream_id));
-               } while (ret < 0 && errno == EINTR);
+               ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1],
+                               stream, relayd);
                if (ret < 0) {
-                       PERROR("write metadata stream id");
                        written = ret;
                        goto end;
                }
-               DBG("Metadata stream id %zu written before data",
-                               stream->relayd_stream_id);
        }
 
        while (len > 0) {
@@ -1302,15 +1307,13 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                written -= sizeof(stream->relayd_stream_id);
                        }
 
-                       ret = consumer_handle_stream_before_relayd(stream, ret_splice);
+                       ret = write_relayd_stream_header(stream, ret_splice, relayd);
                        if (ret >= 0) {
                                /* Use the returned socket. */
                                outfd = ret;
                        } else {
-                               if (outfd == -1) {
-                                       ERR("Remote relayd disconnected. Stopping");
-                                       goto end;
-                               }
+                               ERR("Remote relayd disconnected. Stopping");
+                               goto end;
                        }
                }
 
@@ -1838,3 +1841,95 @@ void lttng_consumer_init(void)
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 }
+
+/*
+ * Process the ADD_RELAYD command receive by a consumer.
+ *
+ * This will create a relayd socket pair and add it to the relayd hash table.
+ * The caller MUST acquire a RCU read side lock before calling it.
+ */
+int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+               struct lttng_consumer_local_data *ctx, int sock,
+               struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+{
+       int fd, ret = -1;
+       struct consumer_relayd_sock_pair *relayd;
+
+       DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+
+       /* Get relayd reference if exists. */
+       relayd = consumer_find_relayd(net_seq_idx);
+       if (relayd == NULL) {
+               /* Not found. Allocate one. */
+               relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
+               if (relayd == NULL) {
+                       lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
+                       goto error;
+               }
+       }
+
+       /* Poll on consumer socket. */
+       if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               ret = -EINTR;
+               goto error;
+       }
+
+       /* Get relayd socket from session daemon */
+       ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+       if (ret != sizeof(fd)) {
+               lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+               ret = -1;
+               goto error;
+       }
+
+       /* Copy socket information and received FD */
+       switch (sock_type) {
+       case LTTNG_STREAM_CONTROL:
+               /* Copy received lttcomm socket */
+               lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
+               ret = lttcomm_create_sock(&relayd->control_sock);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               /* Close the created socket fd which is useless */
+               close(relayd->control_sock.fd);
+
+               /* Assign new file descriptor */
+               relayd->control_sock.fd = fd;
+               break;
+       case LTTNG_STREAM_DATA:
+               /* Copy received lttcomm socket */
+               lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
+               ret = lttcomm_create_sock(&relayd->data_sock);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               /* Close the created socket fd which is useless */
+               close(relayd->data_sock.fd);
+
+               /* Assign new file descriptor */
+               relayd->data_sock.fd = fd;
+               break;
+       default:
+               ERR("Unknown relayd socket type (%d)", sock_type);
+               goto error;
+       }
+
+       DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
+                       sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
+                       relayd->net_seq_idx, fd);
+
+       /*
+        * Add relayd socket pair to consumer data hashtable. If object already
+        * exists or on error, the function gracefully returns.
+        */
+       consumer_add_relayd(relayd);
+
+       /* All good! */
+       ret = 0;
+
+error:
+       return ret;
+}
This page took 0.026914 seconds and 4 git commands to generate.