Fix: relayd metadata size
[lttng-tools.git] / src / common / consumer.c
index ee5575262ec5667d448007a8ab82eec0607e19c9..831592a1e96ef4a8a22b64fdc29ffeaa75f62786 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
  *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *               2012 - David Goulet <dgoulet@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
 
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
+#include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/kernel-consumer/kernel-consumer.h>
+#include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 
 #include "consumer.h"
@@ -151,6 +154,45 @@ void consumer_free_stream(struct rcu_head *head)
        free(stream);
 }
 
+/*
+ * RCU protected relayd socket pair free.
+ */
+static void consumer_rcu_free_relayd(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct consumer_relayd_sock_pair *relayd =
+               caa_container_of(node, struct consumer_relayd_sock_pair, node);
+
+       free(relayd);
+}
+
+/*
+ * Destroy and free relayd socket pair object.
+ *
+ * This function MUST be called with the consumer_data lock acquired.
+ */
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       DBG("Consumer destroy and close relayd socket pair");
+
+       iter.iter.node = &relayd->node.node;
+       ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
+       assert(!ret);
+
+       /* Close all sockets */
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       (void) relayd_close(&relayd->control_sock);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       (void) relayd_close(&relayd->data_sock);
+
+       /* RCU free() call */
+       call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
+}
+
 /*
  * Remove a stream from the global list protected by a mutex. This
  * function is also responsible for freeing its data structures.
@@ -160,6 +202,9 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        int ret;
        struct lttng_ht_iter iter;
        struct lttng_consumer_channel *free_chan = NULL;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
 
        pthread_mutex_lock(&consumer_data.lock);
 
@@ -214,8 +259,24 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                        PERROR("close");
                }
        }
-       if (!--stream->chan->refcount)
+
+       /* Check and cleanup relayd */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               uatomic_dec(&relayd->refcount);
+               assert(uatomic_read(&relayd->refcount) >= 0);
+               if (uatomic_read(&relayd->refcount) == 0) {
+                       /* Refcount of the relayd struct is 0, destroy it */
+                       consumer_destroy_relayd(relayd);
+               }
+       }
+       rcu_read_unlock();
+
+       if (!--stream->chan->refcount) {
                free_chan = stream->chan;
+       }
+
 
        call_rcu(&stream->node.head, consumer_free_stream);
 end:
@@ -234,7 +295,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                enum lttng_event_output output,
                const char *path_name,
                uid_t uid,
-               gid_t gid)
+               gid_t gid,
+               int net_index,
+               int metadata_flag)
 {
        struct lttng_consumer_stream *stream;
        int ret;
@@ -261,9 +324,12 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        stream->output = output;
        stream->uid = uid;
        stream->gid = gid;
-       strncpy(stream->path_name, path_name, PATH_MAX - 1);
-       stream->path_name[PATH_MAX - 1] = '\0';
+       stream->net_seq_idx = net_index;
+       stream->metadata_flag = metadata_flag;
+       strncpy(stream->path_name, path_name, sizeof(stream->path_name));
+       stream->path_name[sizeof(stream->path_name) - 1] = '\0';
        lttng_ht_node_init_ulong(&stream->node, stream->key);
+       lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -282,12 +348,13 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                assert(0);
                goto end;
        }
-       DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)",
+       DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
                        stream->path_name, stream->key,
                        stream->shm_fd,
                        stream->wait_fd,
                        (unsigned long long) stream->mmap_len,
-                       stream->out_fd);
+                       stream->out_fd,
+                       stream->net_seq_idx);
 end:
        return stream;
 }
@@ -300,12 +367,13 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        int ret = 0;
        struct lttng_ht_node_ulong *node;
        struct lttng_ht_iter iter;
+       struct consumer_relayd_sock_pair *relayd;
 
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
        consumer_steal_stream_key(stream->key);
-       rcu_read_lock();
 
+       rcu_read_lock();
        lttng_ht_lookup(consumer_data.stream_ht,
                        (void *)((unsigned long) stream->key), &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -316,7 +384,15 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        }
 
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+
+       /* Check and cleanup relayd */
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               uatomic_inc(&relayd->refcount);
+       }
        rcu_read_unlock();
+
+       /* Update consumer data */
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
@@ -340,6 +416,153 @@ end:
        return ret;
 }
 
+/*
+ * Add relayd socket to global consumer data hashtable.
+ */
+int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
+{
+       int ret = 0;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_ht_iter iter;
+
+       if (relayd == NULL) {
+               ret = -1;
+               goto end;
+       }
+
+       rcu_read_lock();
+
+       lttng_ht_lookup(consumer_data.relayd_ht,
+                       (void *)((unsigned long) relayd->net_seq_idx), &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               rcu_read_unlock();
+               /* Relayd already exist. Ignore the insertion */
+               goto end;
+       }
+       lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
+
+       rcu_read_unlock();
+
+end:
+       return ret;
+}
+
+/*
+ * Allocate and return a consumer relayd socket.
+ */
+struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
+               int net_seq_idx)
+{
+       struct consumer_relayd_sock_pair *obj = NULL;
+
+       /* Negative net sequence index is a failure */
+       if (net_seq_idx < 0) {
+               goto error;
+       }
+
+       obj = zmalloc(sizeof(struct consumer_relayd_sock_pair));
+       if (obj == NULL) {
+               PERROR("zmalloc relayd sock");
+               goto error;
+       }
+
+       obj->net_seq_idx = net_seq_idx;
+       obj->refcount = 0;
+       lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
+       pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
+
+error:
+       return obj;
+}
+
+/*
+ * Find a relayd socket pair in the global consumer data.
+ *
+ * Return the object if found else NULL.
+ * RCU read-side lock must be held across this call and while using the
+ * returned object.
+ */
+struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+
+       /* Negative keys are lookup failures */
+       if (key < 0) {
+               goto error;
+       }
+
+       lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+       }
+
+error:
+       return relayd;
+}
+
+/*
+ * Handle stream for relayd transmission if the stream applies for network
+ * streaming where the net sequence index is set.
+ *
+ * Return destination file descriptor or negative value on error.
+ */
+int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
+               size_t data_size)
+{
+       int outfd = -1, ret;
+       struct consumer_relayd_sock_pair *relayd;
+       struct lttcomm_relayd_data_hdr data_hdr;
+
+       /* Safety net */
+       assert(stream);
+
+       /* Reset data header */
+       memset(&data_hdr, 0, sizeof(data_hdr));
+
+       rcu_read_lock();
+       /* Get relayd reference of the stream. */
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd == NULL) {
+               /* Stream is either local or corrupted */
+               goto error;
+       }
+
+       DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
+       if (stream->metadata_flag) {
+               /* Caller MUST acquire the relayd control socket lock */
+               ret = relayd_send_metadata(&relayd->control_sock, data_size);
+               if (ret < 0) {
+                       goto error;
+               }
+
+               /* Metadata are always sent on the control socket. */
+               outfd = relayd->control_sock.fd;
+       } else {
+               /* Set header with stream information */
+               data_hdr.stream_id = htobe64(stream->relayd_stream_id);
+               data_hdr.data_size = htobe32(data_size);
+               /* Other fields are zeroed previously */
+
+               ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
+                               sizeof(data_hdr));
+               if (ret < 0) {
+                       goto error;
+               }
+
+               /* Set to go on data socket */
+               outfd = relayd->data_sock.fd;
+       }
+
+error:
+       rcu_read_unlock();
+       return outfd;
+}
+
 /*
  * Update a stream according to what we just received.
  */
@@ -464,9 +687,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
                goto end;
        }
        DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
-                       channel->key,
-                       channel->shm_fd,
-                       channel->wait_fd,
+                       channel->key, channel->shm_fd, channel->wait_fd,
                        (unsigned long long) channel->mmap_len,
                        (unsigned long long) channel->max_sb_size);
 end:
@@ -512,7 +733,8 @@ end:
  */
 int consumer_update_poll_array(
                struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
-               struct lttng_consumer_stream **local_stream)
+               struct lttng_consumer_stream **local_stream,
+               struct lttng_ht *metadata_ht)
 {
        int i = 0;
        struct lttng_ht_iter iter;
@@ -528,6 +750,10 @@ int consumer_update_poll_array(
                DBG("Active FD %d", stream->wait_fd);
                (*pollfd)[i].fd = stream->wait_fd;
                (*pollfd)[i].events = POLLIN | POLLPRI;
+               if (stream->metadata_flag && metadata_ht) {
+                       lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node);
+                       DBG("Active FD added to metadata hash table");
+               }
                local_stream[i] = stream;
                i++;
        }
@@ -584,7 +810,6 @@ void lttng_consumer_set_error_sock(
 /*
  * Set the command socket path.
  */
-
 void lttng_consumer_set_command_sock_path(
                struct lttng_consumer_local_data *ctx, char *sock)
 {
@@ -636,6 +861,9 @@ void lttng_consumer_cleanup(void)
        }
 
        rcu_read_unlock();
+
+       lttng_ht_destroy(consumer_data.stream_ht);
+       lttng_ht_destroy(consumer_data.channel_ht);
 }
 
 /*
@@ -645,14 +873,16 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
        int ret;
        consumer_quit = 1;
-       ret = write(ctx->consumer_should_quit[1], "4", 1);
+       do {
+               ret = write(ctx->consumer_should_quit[1], "4", 1);
+       } while (ret < 0 && errno == EINTR);
        if (ret < 0) {
                perror("write consumer quit");
        }
 }
 
-void lttng_consumer_sync_trace_file(
-               struct lttng_consumer_stream *stream, off_t orig_offset)
+void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
+               off_t orig_offset)
 {
        int outfd = stream->out_fd;
 
@@ -949,6 +1179,13 @@ void *lttng_consumer_thread_poll_fds(void *data)
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
+       struct lttng_ht *metadata_ht;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct lttng_consumer_stream *metadata_stream;
+       ssize_t len;
+
+       metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 
        rcu_register_thread();
 
@@ -989,7 +1226,8 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
-                       ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+                       ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+                                       metadata_ht);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR);
@@ -1026,10 +1264,9 @@ void *lttng_consumer_thread_poll_fds(void *data)
                }
 
                /*
-                * If the consumer_poll_pipe triggered poll go
-                * directly to the beginning of the loop to update the
-                * array. We want to prioritize array update over
-                * low-priority reads.
+                * If the consumer_poll_pipe triggered poll go directly to the
+                * beginning of the loop to update the array. We want to prioritize
+                * array update over low-priority reads.
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
                        size_t pipe_readlen;
@@ -1045,9 +1282,24 @@ void *lttng_consumer_thread_poll_fds(void *data)
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
-                       if (pollfd[i].revents & POLLPRI) {
-                               ssize_t len;
-
+                       /* Lookup for metadata which is the highest priority */
+                       lttng_ht_lookup(metadata_ht,
+                                       (void *)((unsigned long) pollfd[i].fd), &iter);
+                       node = lttng_ht_iter_get_node_ulong(&iter);
+                       if (node != NULL &&
+                                       (pollfd[i].revents & (POLLIN | POLLPRI))) {
+                               DBG("Urgent metadata read on fd %d", pollfd[i].fd);
+                               metadata_stream = caa_container_of(node,
+                                               struct lttng_consumer_stream, waitfd_node);
+                               high_prio = 1;
+                               len = ctx->on_buffer_ready(metadata_stream, ctx);
+                               /* it's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN) {
+                                       goto end;
+                               } else if (len > 0) {
+                                       metadata_stream->data_read = 1;
+                               }
+                       } else if (pollfd[i].revents & POLLPRI) {
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
@@ -1072,8 +1324,6 @@ void *lttng_consumer_thread_poll_fds(void *data)
                for (i = 0; i < nb_fd; i++) {
                        if ((pollfd[i].revents & POLLIN) ||
                                        local_stream[i]->hangup_flush_done) {
-                               ssize_t len;
-
                                DBG("Normal read on fd %d", pollfd[i].fd);
                                len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
@@ -1105,18 +1355,33 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
+                                       if (local_stream[i]->metadata_flag) {
+                                               iter.iter.node = &local_stream[i]->waitfd_node.node;
+                                               ret = lttng_ht_del(metadata_ht, &iter);
+                                               assert(!ret);
+                                       }
                                        consumer_del_stream(local_stream[i]);
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
+                                       if (local_stream[i]->metadata_flag) {
+                                               iter.iter.node = &local_stream[i]->waitfd_node.node;
+                                               ret = lttng_ht_del(metadata_ht, &iter);
+                                               assert(!ret);
+                                       }
                                        consumer_del_stream(local_stream[i]);
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
+                                       if (local_stream[i]->metadata_flag) {
+                                               iter.iter.node = &local_stream[i]->waitfd_node.node;
+                                               ret = lttng_ht_del(metadata_ht, &iter);
+                                               assert(!ret);
+                                       }
                                        consumer_del_stream(local_stream[i]);
                                        num_hup++;
                                }
@@ -1257,7 +1522,7 @@ end:
         */
        do {
                ret = write(ctx->consumer_poll_pipe[1], "", 1);
-       } while (ret == -1UL && errno == EINTR);
+       } while (ret < 0 && errno == EINTR);
        rcu_unregister_thread();
        return NULL;
 }
@@ -1300,5 +1565,5 @@ void lttng_consumer_init(void)
 {
        consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+       consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 }
-
This page took 0.029503 seconds and 4 git commands to generate.