Consumer daemon data available command support
[lttng-tools.git] / src / bin / lttng-sessiond / consumer.c
index 9ff4eceb2c84ba3469ef22a5c59b59e57ca16672..071135bda6975b09abf4f4db7841454dfecdc269 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Send destroy relayd command to consumer.
+ *
+ * On success return positive value. On error, negative value.
+ */
+int consumer_send_destroy_relayd(struct consumer_socket *sock,
+               struct consumer_output *consumer)
+{
+       int ret;
+       struct lttcomm_consumer_msg msg;
+
+       assert(consumer);
+       assert(sock);
+
+       DBG2("Sending destroy relayd command to consumer...");
+
+       /* Bail out if consumer is disabled */
+       if (!consumer->enabled) {
+               ret = LTTNG_OK;
+               DBG3("Consumer is disabled");
+               goto error;
+       }
+
+       msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
+       msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
+
+       pthread_mutex_lock(sock->lock);
+       ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
+       pthread_mutex_unlock(sock->lock);
+       if (ret < 0) {
+               PERROR("send consumer destroy relayd command");
+               goto error;
+       }
+
+       DBG2("Consumer send destroy relayd command done");
+
+error:
+       return ret;
+}
+
+/*
+ * For each consumer socket in the consumer output object, send a destroy
+ * relayd command.
+ */
+void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
+{
+       int ret;
+       struct lttng_ht_iter iter;
+       struct consumer_socket *socket;
+
+       assert(consumer);
+
+       /* Destroy any relayd connection */
+       if (consumer && consumer->type == CONSUMER_DST_NET) {
+               rcu_read_lock();
+               cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+                               node.node) {
+                       /* Send destroy relayd command */
+                       ret = consumer_send_destroy_relayd(socket, consumer);
+                       if (ret < 0) {
+                               ERR("Unable to send destroy relayd command to consumer");
+                               /* Continue since we MUST delete everything at this point. */
+                       }
+               }
+               rcu_read_unlock();
+       }
+}
+
 /*
  * From a consumer_data structure, allocate and add a consumer socket to the
  * consumer output.
@@ -62,6 +130,7 @@ int consumer_create_socket(struct consumer_data *data,
                        goto error;
                }
 
+               socket->registered = 0;
                socket->lock = &data->lock;
                rcu_read_lock();
                consumer_add_socket(socket, output);
@@ -176,8 +245,13 @@ void consumer_destroy_socket(struct consumer_socket *sock)
 
        /*
         * We DO NOT close the file descriptor here since it is global to the
-        * session daemon and is closed only if the consumer dies.
+        * session daemon and is closed only if the consumer dies or a custom
+        * consumer was registered,
         */
+       if (sock->registered) {
+               DBG3("Consumer socket was registered. Closing fd %d", sock->fd);
+               lttcomm_close_unix_sock(sock->fd);
+       }
 
        call_rcu(&sock->node.head, destroy_socket_rcu);
 }
@@ -221,9 +295,15 @@ void consumer_destroy_output(struct consumer_output *obj)
                struct lttng_ht_iter iter;
                struct consumer_socket *socket;
 
+               rcu_read_lock();
                cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) {
+                       consumer_del_socket(socket, obj);
                        consumer_destroy_socket(socket);
                }
+               rcu_read_unlock();
+
+               /* Finally destroy HT */
+               lttng_ht_destroy(obj->socks);
        }
 
        free(obj);
@@ -272,7 +352,8 @@ malloc_error:
 /*
  * Set network URI to the consumer output object.
  *
- * Return 0 on success. Negative value on error.
+ * Return 0 on success. Return 1 if the URI were equal. Else, negative value on
+ * error.
  */
 int consumer_set_network_uri(struct consumer_output *obj,
                struct lttng_uri *uri)
@@ -294,6 +375,7 @@ int consumer_set_network_uri(struct consumer_output *obj,
                        /* Assign default port. */
                        uri->port = DEFAULT_NETWORK_CONTROL_PORT;
                }
+               DBG3("Consumer control URI set with port %d", uri->port);
                break;
        case LTTNG_STREAM_DATA:
                dst_uri = &obj->dst.net.data;
@@ -302,6 +384,7 @@ int consumer_set_network_uri(struct consumer_output *obj,
                        /* Assign default port. */
                        uri->port = DEFAULT_NETWORK_DATA_PORT;
                }
+               DBG3("Consumer data URI set with port %d", uri->port);
                break;
        default:
                ERR("Set network uri type unknown %d", uri->stype);
@@ -312,7 +395,7 @@ int consumer_set_network_uri(struct consumer_output *obj,
        if (!ret) {
                /* Same URI, don't touch it and return success. */
                DBG3("URI network compare are the same");
-               goto end;
+               goto equal;
        }
 
        /* URIs were not equal, replacing it. */
@@ -347,9 +430,9 @@ int consumer_set_network_uri(struct consumer_output *obj,
                DBG3("Consumer set network uri subdir path %s", tmp_path);
        }
 
-end:
        return 0;
-
+equal:
+       return 1;
 error:
        return -1;
 }
@@ -403,7 +486,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                int channel_key,
                uint64_t max_sb_size,
                uint64_t mmap_len,
-               const char *name)
+               const char *name,
+               unsigned int nb_init_streams)
 {
        assert(msg);
 
@@ -417,6 +501,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.channel_key = channel_key;
        msg->u.channel.max_sb_size = max_sb_size;
        msg->u.channel.mmap_len = mmap_len;
+       msg->u.channel.nb_init_streams = nb_init_streams;
 }
 
 /*
@@ -434,7 +519,8 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                int net_index,
                unsigned int metadata_flag,
                const char *name,
-               const char *pathname)
+               const char *pathname,
+               unsigned int session_id)
 {
        assert(msg);
 
@@ -452,6 +538,7 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.stream.gid = gid;
        msg->u.stream.net_index = net_index;
        msg->u.stream.metadata_flag = metadata_flag;
+       msg->u.stream.session_id = (uint64_t) session_id;
        strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name));
        msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0';
        strncpy(msg->u.stream.path_name, pathname,
@@ -477,9 +564,12 @@ int consumer_send_stream(int sock, struct consumer_output *dst,
                break;
        case CONSUMER_DST_LOCAL:
                /* Add stream file name to stream path */
-               strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name));
+               strncat(msg->u.stream.path_name, "/",
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                strncat(msg->u.stream.path_name, msg->u.stream.name,
-                               sizeof(msg->u.stream.path_name));
+                               sizeof(msg->u.stream.path_name) -
+                               strlen(msg->u.stream.path_name) - 1);
                msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0';
                /* Indicate that the stream is NOT network */
                msg->u.stream.net_index = -1;
@@ -525,7 +615,7 @@ int consumer_send_relayd_socket(int consumer_sock,
 
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
@@ -559,40 +649,48 @@ error:
 }
 
 /*
- * Send destroy relayd command to consumer.
- *
- * On success return positive value. On error, negative value.
+ * Set consumer subdirectory using the session name and a generated datetime if
+ * needed. This is appended to the current subdirectory.
  */
-int consumer_send_destroy_relayd(struct consumer_socket *sock,
-               struct consumer_output *consumer)
+int consumer_set_subdir(struct consumer_output *consumer,
+               const char *session_name)
 {
-       int ret;
-       struct lttcomm_consumer_msg msg;
+       int ret = 0;
+       unsigned int have_default_name = 0;
+       char datetime[16], tmp_path[PATH_MAX];
+       time_t rawtime;
+       struct tm *timeinfo;
 
        assert(consumer);
-       assert(sock);
-
-       DBG2("Sending destroy relayd command to consumer...");
-
-       /* Bail out if consumer is disabled */
-       if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
-               DBG3("Consumer is disabled");
-               goto error;
+       assert(session_name);
+
+       memset(tmp_path, 0, sizeof(tmp_path));
+
+       /* Flag if we have a default session. */
+       if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
+                               strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
+               have_default_name = 1;
+       } else {
+               /* Get date and time for session path */
+               time(&rawtime);
+               timeinfo = localtime(&rawtime);
+               strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
        }
 
-       msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
-       msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
-
-       pthread_mutex_lock(sock->lock);
-       ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg));
-       pthread_mutex_unlock(sock->lock);
+       if (have_default_name) {
+               ret = snprintf(tmp_path, sizeof(tmp_path),
+                               "%s/%s", consumer->subdir, session_name);
+       } else {
+               ret = snprintf(tmp_path, sizeof(tmp_path),
+                               "%s/%s-%s/", consumer->subdir, session_name, datetime);
+       }
        if (ret < 0) {
-               PERROR("send consumer destroy relayd command");
+               PERROR("snprintf session name date");
                goto error;
        }
 
-       DBG2("Consumer send destroy relayd command done");
+       strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir));
+       DBG2("Consumer subdir set to %s", consumer->subdir);
 
 error:
        return ret;
This page took 0.026122 seconds and 4 git commands to generate.