Consumer daemon data available command support
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index 3503e0415489636654d5a3fa037b4909a68907c3..071135bda6975b09abf4f4db7841454dfecdc269 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Send destroy relayd command to consumer.
+ *
+ * On success return positive value. On error, negative value.
+ */
+int consumer_send_destroy_relayd(struct consumer_socket *sock,
+               struct consumer_output *consumer)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(consumer);
+       assert(sock);
+
+       DBG2("Sending destroy relayd command to consumer...");
+
+       /* Bail out if consumer is disabled */
+       if (!consumer->enabled) {
+               ret = LTTNG_OK;
+               DBG3("Consumer is disabled");
+               goto error;
+       }
+
+       msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
+       msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
+
+       pthread_mutex_lock(sock->lock);
+       ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
+       pthread_mutex_unlock(sock->lock);
+       if (ret < 0) {
+               PERROR("send consumer destroy relayd command");
+               goto error;
+       }
+
+       DBG2("Consumer send destroy relayd command done");
+
+error:
+       return ret;
+}
+
+/*
+ * For each consumer socket in the consumer output object, send a destroy
+ * relayd command.
+ */
+void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct consumer_socket *socket;
+
+       assert(consumer);
+
+       /* Destroy any relayd connection */
+       if (consumer && consumer->type == CONSUMER_DST_NET) {
+               rcu_read_lock();
+               cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+                               node.node) {
+                       /* Send destroy relayd command */
+                       ret = consumer_send_destroy_relayd(socket, consumer);
+                       if (ret < 0) {
+                               ERR("Unable to send destroy relayd command to consumer");
+                               /* Continue since we MUST delete everything at this point. */
+                       }
+               }
+               rcu_read_unlock();
+       }
+}
+
 /*
  * From a consumer_data structure, allocate and add a consumer socket to the
  * consumer output.
@@ -62,6 +130,7 @@ int consumer_create_socket(struct consumer_data *data,
                        goto error;
                }
 
+               socket->registered = 0;
                socket->lock = &data->lock;
                rcu_read_lock();
                consumer_add_socket(socket, output);
@@ -176,8 +245,13 @@ void consumer_destroy_socket(struct consumer_socket *sock)
 
        /*
         * We DO NOT close the file descriptor here since it is global to the
-        * session daemon and is closed only if the consumer dies.
+        * session daemon and is closed only if the consumer dies or a custom
+        * consumer was registered,
         */
+       if (sock->registered) {
+               DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
+               lttcomm_close_unix_sock(sock->fd);
+       }
 
        call_rcu(&sock->node.head, destroy_socket_rcu);
 }
@@ -221,9 +295,15 @@ void consumer_destroy_output(struct consumer_output *obj)
                struct lttng_ht_iter iter;
                struct consumer_socket *socket;
 
+               rcu_read_lock();
                cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+                       consumer_del_socket(socket, obj);
                        consumer_destroy_socket(socket);
                }
+               rcu_read_unlock();
+
+               /* Finally destroy HT */
+               lttng_ht_destroy(obj->socks);
        }
 
        free(obj);
@@ -406,7 +486,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int channel_key,
                uint64_t max_sb_size,
                uint64_t mmap_len,
-               const char *name)
+               const char *name,
+               unsigned int nb_init_streams)
 {
        assert(msg);
 
@@ -420,6 +501,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.channel_key = channel_key;
        msg->u.channel.max_sb_size = max_sb_size;
        msg->u.channel.mmap_len = mmap_len;
+       msg->u.channel.nb_init_streams = nb_init_streams;
 }
 
 /*
@@ -437,7 +519,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                int net_index,
                unsigned int metadata_flag,
                const char *name,
-               const char *pathname)
+               const char *pathname,
+               unsigned int session_id)
 {
        assert(msg);
 
@@ -455,6 +538,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.stream.gid = gid;
        msg->u.stream.net_index = net_index;
        msg->u.stream.metadata_flag = metadata_flag;
+       msg->u.stream.session_id = (uint64_t) session_id;
        strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
        msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
        strncpy(msg->u.stream.path_name, pathname,
@@ -480,9 +564,12 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
                break;
        case CONSUMER_DST_LOCAL:
                /* Add stream file name to stream path */
-               strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
+               strncat(msg->u.stream.path_name, "/",
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                strncat(msg->u.stream.path_name, msg->u.stream.name,
-                               sizeof(msg->u.stream.path_name));
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
                /* Indicate that the stream is NOT network */
                msg->u.stream.net_index = -1;
@@ -528,7 +615,7 @@ int consumer_send_relayd_socket(int consumer_sock,
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
@@ -562,40 +649,48 @@ error:
 }
 
 /*
- * Send destroy relayd command to consumer.
- *
- * On success return positive value. On error, negative value.
+ * Set consumer subdirectory using the session name and a generated datetime if
+ * needed. This is appended to the current subdirectory.
  */
-int consumer_send_destroy_relayd(struct consumer_socket *sock,
-               struct consumer_output *consumer)
+int consumer_set_subdir(struct consumer_output *consumer,
+               const char *session_name)
 {
-       int ret;
-       struct lttcomm_consumer_msg msg;
+       int ret = 0;
+       unsigned int have_default_name = 0;
+       char datetime[16], tmp_path[PATH_MAX];
+       time_t rawtime;
+       struct tm *timeinfo;
 
        assert(consumer);
-       assert(sock);
-
-       DBG2("Sending destroy relayd command to consumer...");
-
-       /* Bail out if consumer is disabled */
-       if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
-               DBG3("Consumer is disabled");
-               goto error;
+       assert(session_name);
+
+       memset(tmp_path, 0, sizeof(tmp_path));
+
+       /* Flag if we have a default session. */
+       if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
+                               strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
+               have_default_name = 1;
+       } else {
+               /* Get date and time for session path */
+               time(&rawtime);
+               timeinfo = localtime(&rawtime);
+               strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
        }
 
-       msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
-       msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
-
-       pthread_mutex_lock(sock->lock);
-       ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
-       pthread_mutex_unlock(sock->lock);
+       if (have_default_name) {
+               ret = snprintf(tmp_path, sizeof(tmp_path),
+                               "%s/%s", consumer->subdir, session_name);
+       } else {
+               ret = snprintf(tmp_path, sizeof(tmp_path),
+                               "%s/%s-%s/", consumer->subdir, session_name, datetime);
+       }
        if (ret < 0) {
-               PERROR("send consumer destroy relayd command");
+               PERROR("snprintf session name date");
                goto error;
        }
 
-       DBG2("Consumer send destroy relayd command done");
+       strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
+       DBG2("Consumer subdir set to %s", consumer->subdir);
 
 error:
        return ret;
This page took 0.025356 seconds and 4 git commands to generate.