On-disk multiple tracefiles circular buffer
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
index 4c037a9eb25b426edb4677701349d560bc06d549..05e30e49cb0b9ff65e88534ae9cec25de2222693 100644 (file)
 #include <common/defaults.h>
 
 #include "consumer.h"
+#include "health.h"
 #include "kernel-consumer.h"
 
 /*
  * Sending a single channel to the consumer with command ADD_CHANNEL.
  */
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
 {
        int ret;
+       char tmp_path[PATH_MAX];
+       const char *pathname;
        struct lttcomm_consumer_msg lkm;
+       struct consumer_output *consumer;
 
        /* Safety net */
        assert(channel);
+       assert(session);
+       assert(session->consumer);
+
+       consumer = session->consumer;
 
        DBG("Kernel consumer adding channel %s to kernel consumer",
                        channel->channel->name);
 
+       /* Get the right path name destination */
+       if (consumer->type == CONSUMER_DST_LOCAL) {
+               /* Set application path to the destination path */
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+                               consumer->dst.trace_path, consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+
+               /* Create directory */
+               ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
+                               session->uid, session->gid);
+               if (ret < 0) {
+                       if (ret != -EEXIST) {
+                               ERR("Trace directory creation error");
+                               goto error;
+                       }
+               }
+               DBG3("Kernel local consumer tracefile path: %s", pathname);
+       } else {
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel network consumer subdir path: %s", pathname);
+       }
+
        /* Prep channel message structure */
        consumer_init_channel_comm_msg(&lkm,
                        LTTNG_CONSUMER_ADD_CHANNEL,
                        channel->fd,
-                       channel->channel->attr.subbuf_size,
-                       0, /* Kernel */
-                       channel->channel->name);
+                       session->id,
+                       pathname,
+                       session->uid,
+                       session->gid,
+                       consumer->net_seq_index,
+                       channel->channel->name,
+                       channel->stream_count,
+                       channel->channel->attr.output,
+                       CONSUMER_CHANNEL_TYPE_DATA,
+                       channel->channel->attr.tracefile_size,
+                       channel->channel->attr.tracefile_count);
+
+       health_code_update();
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
 error:
        return ret;
 }
@@ -62,64 +114,98 @@ error:
 /*
  * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
  */
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+               struct ltt_kernel_session *session)
 {
        int ret;
+       char tmp_path[PATH_MAX];
        const char *pathname;
        struct lttcomm_consumer_msg lkm;
-       struct consumer_output *output;
+       struct consumer_output *consumer;
 
        /* Safety net */
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
 
        /* Get consumer output pointer */
-       output = session->consumer;
+       consumer = session->consumer;
+
+       /* Get the right path name destination */
+       if (consumer->type == CONSUMER_DST_LOCAL) {
+               /* Set application path to the destination path */
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+                               consumer->dst.trace_path, consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
 
-       /* Get correct path name destination */
-       if (output->type == CONSUMER_DST_LOCAL) {
-               pathname = output->dst.trace_path;
+               /* Create directory */
+               ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
+                               session->uid, session->gid);
+               if (ret < 0) {
+                       if (ret != -EEXIST) {
+                               ERR("Trace directory creation error");
+                               goto error;
+                       }
+               }
+               DBG3("Kernel local consumer tracefile path: %s", pathname);
        } else {
-               pathname = output->subdir;
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel network consumer subdir path: %s", pathname);
        }
 
        /* Prep channel message structure */
        consumer_init_channel_comm_msg(&lkm,
                        LTTNG_CONSUMER_ADD_CHANNEL,
                        session->metadata->fd,
-                       session->metadata->conf->attr.subbuf_size,
-                       0, /* for kernel */
-                       "metadata");
+                       session->id,
+                       pathname,
+                       session->uid,
+                       session->gid,
+                       consumer->net_seq_index,
+                       DEFAULT_METADATA_NAME,
+                       1,
+                       DEFAULT_KERNEL_CHANNEL_OUTPUT,
+                       CONSUMER_CHANNEL_TYPE_METADATA,
+                       0, 0);
+
+       health_code_update();
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
        /* Prep stream message structure */
        consumer_init_stream_comm_msg(&lkm,
                        LTTNG_CONSUMER_ADD_STREAM,
                        session->metadata->fd,
                        session->metadata_stream_fd,
-                       LTTNG_CONSUMER_ACTIVE_STREAM,
-                       DEFAULT_KERNEL_CHANNEL_OUTPUT,
-                       0, /* Kernel */
-                       session->uid,
-                       session->gid,
-                       output->net_seq_index,
-                       1, /* Metadata flag set */
-                       "metadata",
-                       pathname);
+                       0); /* CPU: 0 for metadata. */
+
+       health_code_update();
 
        /* Send stream and file descriptor */
-       ret = consumer_send_stream(sock, output, &lkm,
+       ret = consumer_send_stream(sock, consumer, &lkm,
                        &session->metadata_stream_fd, 1);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
 error:
        return ret;
 }
@@ -127,54 +213,43 @@ error:
 /*
  * Sending a single stream to the consumer with command ADD_STREAM.
  */
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
-               struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+               struct ltt_kernel_session *session)
 {
        int ret;
-       const char *pathname;
        struct lttcomm_consumer_msg lkm;
-       struct consumer_output *output;
+       struct consumer_output *consumer;
 
        assert(channel);
        assert(stream);
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        DBG("Sending stream %d of channel %s to kernel consumer",
                        stream->fd, channel->channel->name);
 
        /* Get consumer output pointer */
-       output = session->consumer;
-
-       /* Get correct path name destination */
-       if (output->type == CONSUMER_DST_LOCAL) {
-               pathname = output->dst.trace_path;
-               DBG3("Consumer is local to %s", pathname);
-       } else {
-               pathname = output->subdir;
-               DBG3("Consumer is network to subdir %s", pathname);
-       }
+       consumer = session->consumer;
 
        /* Prep stream consumer message */
-       consumer_init_stream_comm_msg(&lkm, LTTNG_CONSUMER_ADD_STREAM,
+       consumer_init_stream_comm_msg(&lkm,
+                       LTTNG_CONSUMER_ADD_STREAM,
                        channel->fd,
                        stream->fd,
-                       stream->state,
-                       channel->channel->attr.output,
-                       0, /* Kernel */
-                       session->uid,
-                       session->gid,
-                       output->net_seq_index,
-                       0, /* Metadata flag unset */
-                       stream->name,
-                       pathname);
+                       stream->cpu);
+
+       health_code_update();
 
        /* Send stream and file descriptor */
-       ret = consumer_send_stream(sock, output, &lkm, &stream->fd, 1);
+       ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update();
+
 error:
        return ret;
 }
@@ -182,7 +257,7 @@ error:
 /*
  * Send all stream fds of kernel channel to the consumer.
  */
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
                struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
 {
        int ret;
@@ -192,17 +267,18 @@ int kernel_consumer_send_channel_stream(int sock,
        assert(channel);
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
        DBG("Sending streams of channel %s to kernel consumer",
                        channel->channel->name);
 
-       ret = kernel_consumer_add_channel(sock, channel);
+       ret = kernel_consumer_add_channel(sock, channel, session);
        if (ret < 0) {
                goto error;
        }
@@ -227,7 +303,8 @@ error:
 /*
  * Send all stream fds of the kernel session to the consumer.
  */
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock,
+               struct ltt_kernel_session *session)
 {
        int ret;
        struct ltt_kernel_channel *chan;
@@ -235,10 +312,11 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
        /* Safety net */
        assert(session);
        assert(session->consumer);
+       assert(sock);
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
This page took 0.032757 seconds and 4 git commands to generate.