Add lttng-error.h containing every API err. code
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index 56d938114cd18d4f112f9455d5f1ba80e28ce7b6..b69df16fdb43fcf29c4aa60098924b3c8a675681 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Send destroy relayd command to consumer.
+ *
+ * On success return positive value. On error, negative value.
+ */
+int consumer_send_destroy_relayd(struct consumer_socket *sock,
+               struct consumer_output *consumer)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(consumer);
+       assert(sock);
+
+       DBG2("Sending destroy relayd command to consumer...");
+
+       /* Bail out if consumer is disabled */
+       if (!consumer->enabled) {
+               ret = LTTNG_OK;
+               DBG3("Consumer is disabled");
+               goto error;
+       }
+
+       msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
+       msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
+
+       pthread_mutex_lock(sock->lock);
+       ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
+       pthread_mutex_unlock(sock->lock);
+       if (ret < 0) {
+               PERROR("send consumer destroy relayd command");
+               goto error;
+       }
+
+       DBG2("Consumer send destroy relayd command done");
+
+error:
+       return ret;
+}
+
+/*
+ * For each consumer socket in the consumer output object, send a destroy
+ * relayd command.
+ */
+void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct consumer_socket *socket;
+
+       assert(consumer);
+
+       /* Destroy any relayd connection */
+       if (consumer && consumer->type == CONSUMER_DST_NET) {
+               rcu_read_lock();
+               cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+                               node.node) {
+                       /* Send destroy relayd command */
+                       ret = consumer_send_destroy_relayd(socket, consumer);
+                       if (ret < 0) {
+                               ERR("Unable to send destroy relayd command to consumer");
+                               /* Continue since we MUST delete everything at this point. */
+                       }
+               }
+               rcu_read_unlock();
+       }
+}
+
+/*
+ * From a consumer_data structure, allocate and add a consumer socket to the
+ * consumer output.
+ *
+ * Return 0 on success, else negative value on error
+ */
+int consumer_create_socket(struct consumer_data *data,
+               struct consumer_output *output)
+{
+       int ret = 0;
+       struct consumer_socket *socket;
+
+       assert(data);
+
+       if (output == NULL || data->cmd_sock < 0) {
+               /*
+                * Not an error. Possible there is simply not spawned consumer or it's
+                * disabled for the tracing session asking the socket.
+                */
+               goto error;
+       }
+
+       rcu_read_lock();
+       socket = consumer_find_socket(data->cmd_sock, output);
+       rcu_read_unlock();
+       if (socket == NULL) {
+               socket = consumer_allocate_socket(data->cmd_sock);
+               if (socket == NULL) {
+                       ret = -1;
+                       goto error;
+               }
+
+               socket->registered = 0;
+               socket->lock = &data->lock;
+               rcu_read_lock();
+               consumer_add_socket(socket, output);
+               rcu_read_unlock();
+       }
+
+       DBG3("Consumer socket created (fd: %d) and added to output",
+                       data->cmd_sock);
+
+error:
+       return ret;
+}
+
+/*
+ * Find a consumer_socket in a consumer_output hashtable. Read side lock must
+ * be acquired before calling this function and across use of the
+ * returned consumer_socket.
+ */
+struct consumer_socket *consumer_find_socket(int key,
+               struct consumer_output *consumer)
+{
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_ulong *node;
+       struct consumer_socket *socket = NULL;
+
+       /* Negative keys are lookup failures */
+       if (key < 0 || consumer == NULL) {
+               return NULL;
+       }
+
+       lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key),
+                       &iter);
+       node = lttng_ht_iter_get_node_ulong(&iter);
+       if (node != NULL) {
+               socket = caa_container_of(node, struct consumer_socket, node);
+       }
+
+       return socket;
+}
+
+/*
+ * Allocate a new consumer_socket and return the pointer.
+ */
+struct consumer_socket *consumer_allocate_socket(int fd)
+{
+       struct consumer_socket *socket = NULL;
+
+       socket = zmalloc(sizeof(struct consumer_socket));
+       if (socket == NULL) {
+               PERROR("zmalloc consumer socket");
+               goto error;
+       }
+
+       socket->fd = fd;
+       lttng_ht_node_init_ulong(&socket->node, fd);
+
+error:
+       return socket;
+}
+
+/*
+ * Add consumer socket to consumer output object. Read side lock must be
+ * acquired before calling this function.
+ */
+void consumer_add_socket(struct consumer_socket *sock,
+               struct consumer_output *consumer)
+{
+       assert(sock);
+       assert(consumer);
+
+       lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
+}
+
+/*
+ * Delte consumer socket to consumer output object. Read side lock must be
+ * acquired before calling this function.
+ */
+void consumer_del_socket(struct consumer_socket *sock,
+               struct consumer_output *consumer)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+
+       assert(sock);
+       assert(consumer);
+
+       iter.iter.node = &sock->node.node;
+       ret = lttng_ht_del(consumer->socks, &iter);
+       assert(!ret);
+}
+
+/*
+ * RCU destroy call function.
+ */
+static void destroy_socket_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct consumer_socket *socket =
+               caa_container_of(node, struct consumer_socket, node);
+
+       free(socket);
+}
+
+/*
+ * Destroy and free socket pointer in a call RCU. Read side lock must be
+ * acquired before calling this function.
+ */
+void consumer_destroy_socket(struct consumer_socket *sock)
+{
+       assert(sock);
+
+       /*
+        * We DO NOT close the file descriptor here since it is global to the
+        * session daemon and is closed only if the consumer dies or a custom
+        * consumer was registered,
+        */
+       if (sock->registered) {
+               DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
+               lttcomm_close_unix_sock(sock->fd);
+       }
+
+       call_rcu(&sock->node.head, destroy_socket_rcu);
+}
+
 /*
  * Allocate and assign data to a consumer_output object.
  *
@@ -49,11 +275,8 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type)
        output->enabled = 1;
        output->type = type;
        output->net_seq_index = -1;
-       /*
-        * Important to keep it to a negative value on creation since it was zeroed
-        * during allocation and the file descriptor 0 is a valid one.
-        */
-       output->sock = -1;
+
+       output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
 
 error:
        return output;
@@ -68,9 +291,21 @@ void consumer_destroy_output(struct consumer_output *obj)
                return;
        }
 
-       if (obj->sock >= 0) {
-               (void) close(obj->sock);
+       if (obj->socks) {
+               struct lttng_ht_iter iter;
+               struct consumer_socket *socket;
+
+               rcu_read_lock();
+               cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+                       consumer_del_socket(socket, obj);
+                       consumer_destroy_socket(socket);
+               }
+               rcu_read_unlock();
+
+               /* Finally destroy HT */
+               lttng_ht_destroy(obj->socks);
        }
+
        free(obj);
 }
 
@@ -79,6 +314,8 @@ void consumer_destroy_output(struct consumer_output *obj)
  */
 struct consumer_output *consumer_copy_output(struct consumer_output *obj)
 {
+       struct lttng_ht_iter iter;
+       struct consumer_socket *socket, *copy_sock;
        struct consumer_output *output;
 
        assert(obj);
@@ -90,14 +327,33 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj)
 
        memcpy(output, obj, sizeof(struct consumer_output));
 
+       /* Copy sockets */
+       output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+
+       cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+               /* Create new socket object. */
+               copy_sock = consumer_allocate_socket(socket->fd);
+               if (copy_sock == NULL) {
+                       goto malloc_error;
+               }
+
+               copy_sock->lock = socket->lock;
+               consumer_add_socket(copy_sock, output);
+       }
+
 error:
        return output;
+
+malloc_error:
+       consumer_destroy_output(output);
+       return NULL;
 }
 
 /*
  * Set network URI to the consumer output object.
  *
- * Return 0 on success. Negative value on error.
+ * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
+ * error.
  */
 int consumer_set_network_uri(struct consumer_output *obj,
                struct lttng_uri *uri)
@@ -119,6 +375,7 @@ int consumer_set_network_uri(struct consumer_output *obj,
                        /* Assign default port. */
                        uri->port = DEFAULT_NETWORK_CONTROL_PORT;
                }
+               DBG3("Consumer control URI set with port %d", uri->port);
                break;
        case LTTNG_STREAM_DATA:
                dst_uri = &obj->dst.net.data;
@@ -127,6 +384,7 @@ int consumer_set_network_uri(struct consumer_output *obj,
                        /* Assign default port. */
                        uri->port = DEFAULT_NETWORK_DATA_PORT;
                }
+               DBG3("Consumer data URI set with port %d", uri->port);
                break;
        default:
                ERR("Set network uri type unknown %d", uri->stype);
@@ -137,7 +395,7 @@ int consumer_set_network_uri(struct consumer_output *obj,
        if (!ret) {
                /* Same URI, don't touch it and return success. */
                DBG3("URI network compare are the same");
-               goto end;
+               goto equal;
        }
 
        /* URIs were not equal, replacing it. */
@@ -172,9 +430,9 @@ int consumer_set_network_uri(struct consumer_output *obj,
                DBG3("Consumer set network uri subdir path %s", tmp_path);
        }
 
-end:
        return 0;
-
+equal:
+       return 1;
 error:
        return -1;
 }
@@ -302,9 +560,12 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
                break;
        case CONSUMER_DST_LOCAL:
                /* Add stream file name to stream path */
-               strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
+               strncat(msg->u.stream.path_name, "/",
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                strncat(msg->u.stream.path_name, msg->u.stream.name,
-                               sizeof(msg->u.stream.path_name));
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
                /* Indicate that the stream is NOT network */
                msg->u.stream.net_index = -1;
@@ -331,3 +592,102 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
 error:
        return ret;
 }
+
+/*
+ * Send relayd socket to consumer associated with a session name.
+ *
+ * On success return positive value. On error, negative value.
+ */
+int consumer_send_relayd_socket(int consumer_sock,
+               struct lttcomm_sock *sock, struct consumer_output *consumer,
+               enum lttng_stream_type type)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       /* Code flow error. Safety net. */
+       assert(sock);
+       assert(consumer);
+
+       /* Bail out if consumer is disabled */
+       if (!consumer->enabled) {
+               ret = LTTNG_OK;
+               goto error;
+       }
+
+       msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
+       /*
+        * Assign network consumer output index using the temporary consumer since
+        * this call should only be made from within a set_consumer_uri() function
+        * call in the session daemon.
+        */
+       msg.u.relayd_sock.net_index = consumer->net_seq_index;
+       msg.u.relayd_sock.type = type;
+       memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
+
+       DBG3("Sending relayd sock info to consumer on %d", consumer_sock);
+       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
+       if (ret < 0) {
+               PERROR("send consumer relayd socket info");
+               goto error;
+       }
+
+       DBG3("Sending relayd socket file descriptor to consumer");
+       ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
+       if (ret < 0) {
+               goto error;
+       }
+
+       DBG2("Consumer relayd socket sent");
+
+error:
+       return ret;
+}
+
+/*
+ * Set consumer subdirectory using the session name and a generated datetime if
+ * needed. This is appended to the current subdirectory.
+ */
+int consumer_set_subdir(struct consumer_output *consumer,
+               const char *session_name)
+{
+       int ret = 0;
+       unsigned int have_default_name = 0;
+       char datetime[16], tmp_path[PATH_MAX];
+       time_t rawtime;
+       struct tm *timeinfo;
+
+       assert(consumer);
+       assert(session_name);
+
+       memset(tmp_path, 0, sizeof(tmp_path));
+
+       /* Flag if we have a default session. */
+       if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
+                               strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
+               have_default_name = 1;
+       } else {
+               /* Get date and time for session path */
+               time(&rawtime);
+               timeinfo = localtime(&rawtime);
+               strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
+       }
+
+       if (have_default_name) {
+               ret = snprintf(tmp_path, sizeof(tmp_path),
+                               "%s/%s", consumer->subdir, session_name);
+       } else {
+               ret = snprintf(tmp_path, sizeof(tmp_path),
+                               "%s/%s-%s/", consumer->subdir, session_name, datetime);
+       }
+       if (ret < 0) {
+               PERROR("snprintf session name date");
+               goto error;
+       }
+
+       strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
+       DBG2("Consumer subdir set to %s", consumer->subdir);
+
+error:
+       return ret;
+}
This page took 0.027677 seconds and 4 git commands to generate.