On-disk multiple tracefiles circular buffer
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
index 5d690548c4e1153492246d485ee69f214088d75a..05e30e49cb0b9ff65e88534ae9cec25de2222693 100644 (file)
 #include <common/defaults.h>
 
 #include "consumer.h"
+#include "health.h"
 #include "kernel-consumer.h"
 
 /*
  * Sending a single channel to the consumer with command ADD_CHANNEL.
  */
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
 {
        int ret;
+       char tmp_path[PATH_MAX];
+       const char *pathname;
        struct lttcomm_consumer_msg lkm;
+       struct consumer_output *consumer;
 
        /* Safety net */
        assert(channel);
+       assert(session);
+       assert(session->consumer);
+
+       consumer = session->consumer;
 
        DBG("Kernel consumer adding channel %s to kernel consumer",
                        channel->channel->name);
 
+       /* Get the right path name destination */
+       if (consumer->type == CONSUMER_DST_LOCAL) {
+               /* Set application path to the destination path */
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+                               consumer->dst.trace_path, consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+
+               /* Create directory */
+               ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
+                               session->uid, session->gid);
+               if (ret < 0) {
+                       if (ret != -EEXIST) {
+                               ERR("Trace directory creation error");
+                               goto error;
+                       }
+               }
+               DBG3("Kernel local consumer tracefile path: %s", pathname);
+       } else {
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel network consumer subdir path: %s", pathname);
+       }
+
        /* Prep channel message structure */
        consumer_init_channel_comm_msg(&lkm,
                        LTTNG_CONSUMER_ADD_CHANNEL,
                        channel->fd,
-                       channel->channel->attr.subbuf_size,
-                       0, /* Kernel */
-                       channel->channel->name);
+                       session->id,
+                       pathname,
+                       session->uid,
+                       session->gid,
+                       consumer->net_seq_index,
+                       channel->channel->name,
+                       channel->stream_count,
+                       channel->channel->attr.output,
+                       CONSUMER_CHANNEL_TYPE_DATA,
+                       channel->channel->attr.tracefile_size,
+                       channel->channel->attr.tracefile_count);
+
+       health_code_update();
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
 error:
        return ret;
 }
@@ -62,64 +114,98 @@ error:
 /*
  * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
  */
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+               struct ltt_kernel_session *session)
 {
        int ret;
+       char tmp_path[PATH_MAX];
        const char *pathname;
        struct lttcomm_consumer_msg lkm;
-       struct consumer_output *output;
+       struct consumer_output *consumer;
 
        /* Safety net */
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
 
        /* Get consumer output pointer */
-       output = session->consumer;
+       consumer = session->consumer;
+
+       /* Get the right path name destination */
+       if (consumer->type == CONSUMER_DST_LOCAL) {
+               /* Set application path to the destination path */
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+                               consumer->dst.trace_path, consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
 
-       /* Get correct path name destination */
-       if (output->type == CONSUMER_DST_LOCAL) {
-               pathname = output->dst.trace_path;
+               /* Create directory */
+               ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
+                               session->uid, session->gid);
+               if (ret < 0) {
+                       if (ret != -EEXIST) {
+                               ERR("Trace directory creation error");
+                               goto error;
+                       }
+               }
+               DBG3("Kernel local consumer tracefile path: %s", pathname);
        } else {
-               pathname = output->subdir;
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel network consumer subdir path: %s", pathname);
        }
 
        /* Prep channel message structure */
        consumer_init_channel_comm_msg(&lkm,
                        LTTNG_CONSUMER_ADD_CHANNEL,
                        session->metadata->fd,
-                       session->metadata->conf->attr.subbuf_size,
-                       0, /* for kernel */
-                       "metadata");
+                       session->id,
+                       pathname,
+                       session->uid,
+                       session->gid,
+                       consumer->net_seq_index,
+                       DEFAULT_METADATA_NAME,
+                       1,
+                       DEFAULT_KERNEL_CHANNEL_OUTPUT,
+                       CONSUMER_CHANNEL_TYPE_METADATA,
+                       0, 0);
+
+       health_code_update();
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
        /* Prep stream message structure */
        consumer_init_stream_comm_msg(&lkm,
                        LTTNG_CONSUMER_ADD_STREAM,
                        session->metadata->fd,
                        session->metadata_stream_fd,
-                       LTTNG_CONSUMER_ACTIVE_STREAM,
-                       DEFAULT_KERNEL_CHANNEL_OUTPUT,
-                       0, /* Kernel */
-                       session->uid,
-                       session->gid,
-                       output->net_seq_index,
-                       1, /* Metadata flag set */
-                       "metadata",
-                       pathname);
+                       0); /* CPU: 0 for metadata. */
+
+       health_code_update();
 
        /* Send stream and file descriptor */
-       ret = consumer_send_stream(sock, output, &lkm,
+       ret = consumer_send_stream(sock, consumer, &lkm,
                        &session->metadata_stream_fd, 1);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
 error:
        return ret;
 }
@@ -127,54 +213,43 @@ error:
 /*
  * Sending a single stream to the consumer with command ADD_STREAM.
  */
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
-               struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+               struct ltt_kernel_session *session)
 {
        int ret;
-       const char *pathname;
        struct lttcomm_consumer_msg lkm;
-       struct consumer_output *output;
+       struct consumer_output *consumer;
 
        assert(channel);
        assert(stream);
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        DBG("Sending stream %d of channel %s to kernel consumer",
                        stream->fd, channel->channel->name);
 
        /* Get consumer output pointer */
-       output = session->consumer;
-
-       /* Get correct path name destination */
-       if (output->type == CONSUMER_DST_LOCAL) {
-               pathname = output->dst.trace_path;
-               DBG3("Consumer is local to %s", pathname);
-       } else {
-               pathname = output->subdir;
-               DBG3("Consumer is network to subdir %s", pathname);
-       }
+       consumer = session->consumer;
 
        /* Prep stream consumer message */
-       consumer_init_stream_comm_msg(&lkm, LTTNG_CONSUMER_ADD_STREAM,
+       consumer_init_stream_comm_msg(&lkm,
+                       LTTNG_CONSUMER_ADD_STREAM,
                        channel->fd,
                        stream->fd,
-                       stream->state,
-                       channel->channel->attr.output,
-                       0, /* Kernel */
-                       session->uid,
-                       session->gid,
-                       output->net_seq_index,
-                       0, /* Metadata flag unset */
-                       stream->name,
-                       pathname);
+                       stream->cpu);
+
+       health_code_update();
 
        /* Send stream and file descriptor */
-       ret = consumer_send_stream(sock, output, &lkm, &stream->fd, 1);
+       ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
 error:
        return ret;
 }
@@ -182,7 +257,7 @@ error:
 /*
  * Send all stream fds of kernel channel to the consumer.
  */
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
                struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
 {
        int ret;
@@ -192,17 +267,18 @@ int kernel_consumer_send_channel_stream(int sock,
        assert(channel);
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
        DBG("Sending streams of channel %s to kernel consumer",
                        channel->channel->name);
 
-       ret = kernel_consumer_add_channel(sock, channel);
+       ret = kernel_consumer_add_channel(sock, channel, session);
        if (ret < 0) {
                goto error;
        }
@@ -227,7 +303,8 @@ error:
 /*
  * Send all stream fds of the kernel session to the consumer.
  */
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock,
+               struct ltt_kernel_session *session)
 {
        int ret;
        struct ltt_kernel_channel *chan;
@@ -235,10 +312,11 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
        /* Safety net */
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
@@ -269,54 +347,3 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
 error:
        return ret;
 }
-
-/*
- * Send relayd socket to consumer associated with a session name.
- *
- * On success return positive value. On error, negative value.
- */
-int kernel_consumer_send_relayd_socket(int consumer_sock,
-               struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type)
-{
-       int ret;
-       struct lttcomm_consumer_msg msg;
-
-       /* Code flow error. Safety net. */
-       assert(sock);
-       assert(consumer);
-
-       /* Bail out if consumer is disabled */
-       if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
-               goto error;
-       }
-
-       msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
-       /*
-        * Assign network consumer output index using the temporary consumer since
-        * this call should only be made from within a set_consumer_uri() function
-        * call in the session daemon.
-        */
-       msg.u.relayd_sock.net_index = consumer->net_seq_index;
-       msg.u.relayd_sock.type = type;
-       memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
-
-       DBG2("Sending relayd sock info to consumer");
-       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
-       if (ret < 0) {
-               PERROR("send consumer relayd socket info");
-               goto error;
-       }
-
-       DBG2("Sending relayd socket file descriptor to consumer");
-       ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
-       if (ret < 0) {
-               goto error;
-       }
-
-       DBG("Kernel consumer relayd socket sent");
-
-error:
-       return ret;
-}
This page took 0.02691 seconds and 4 git commands to generate.