Fix: consumer should await for initial streams
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
index 5d690548c4e1153492246d485ee69f214088d75a..33cbbed3e679a42df9db3951b0a2040b681b481c 100644 (file)
@@ -48,7 +48,8 @@ int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
                        channel->fd,
                        channel->channel->attr.subbuf_size,
                        0, /* Kernel */
-                       channel->channel->name);
+                       channel->channel->name,
+                       channel->stream_count);
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
@@ -65,9 +66,10 @@ error:
 int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
 {
        int ret;
+       char tmp_path[PATH_MAX];
        const char *pathname;
        struct lttcomm_consumer_msg lkm;
-       struct consumer_output *output;
+       struct consumer_output *consumer;
 
        /* Safety net */
        assert(session);
@@ -76,13 +78,37 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
        DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
 
        /* Get consumer output pointer */
-       output = session->consumer;
+       consumer = session->consumer;
+
+       /* Get the right path name destination */
+       if (consumer->type == CONSUMER_DST_LOCAL) {
+               /* Set application path to the destination path */
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+                               consumer->dst.trace_path, consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
 
-       /* Get correct path name destination */
-       if (output->type == CONSUMER_DST_LOCAL) {
-               pathname = output->dst.trace_path;
+               /* Create directory */
+               ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
+                               session->uid, session->gid);
+               if (ret < 0) {
+                       if (ret != -EEXIST) {
+                               ERR("Trace directory creation error");
+                               goto error;
+                       }
+               }
+               DBG3("Kernel local consumer tracefile path: %s", pathname);
        } else {
-               pathname = output->subdir;
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf metadata path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel network consumer subdir path: %s", pathname);
        }
 
        /* Prep channel message structure */
@@ -91,7 +117,8 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
                        session->metadata->fd,
                        session->metadata->conf->attr.subbuf_size,
                        0, /* for kernel */
-                       "metadata");
+                       "metadata",
+                       1);
 
        ret = consumer_send_channel(sock, &lkm);
        if (ret < 0) {
@@ -108,13 +135,13 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
                        0, /* Kernel */
                        session->uid,
                        session->gid,
-                       output->net_seq_index,
+                       consumer->net_seq_index,
                        1, /* Metadata flag set */
                        "metadata",
                        pathname);
 
        /* Send stream and file descriptor */
-       ret = consumer_send_stream(sock, output, &lkm,
+       ret = consumer_send_stream(sock, consumer, &lkm,
                        &session->metadata_stream_fd, 1);
        if (ret < 0) {
                goto error;
@@ -131,9 +158,10 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
                struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
 {
        int ret;
+       char tmp_path[PATH_MAX];
        const char *pathname;
        struct lttcomm_consumer_msg lkm;
-       struct consumer_output *output;
+       struct consumer_output *consumer;
 
        assert(channel);
        assert(stream);
@@ -144,15 +172,27 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
                        stream->fd, channel->channel->name);
 
        /* Get consumer output pointer */
-       output = session->consumer;
+       consumer = session->consumer;
 
-       /* Get correct path name destination */
-       if (output->type == CONSUMER_DST_LOCAL) {
-               pathname = output->dst.trace_path;
-               DBG3("Consumer is local to %s", pathname);
+       /* Get the right path name destination */
+       if (consumer->type == CONSUMER_DST_LOCAL) {
+               /* Set application path to the destination path */
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
+                               consumer->dst.trace_path, consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf stream path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel local consumer tracefile path: %s", pathname);
        } else {
-               pathname = output->subdir;
-               DBG3("Consumer is network to subdir %s", pathname);
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
+               if (ret < 0) {
+                       PERROR("snprintf stream path");
+                       goto error;
+               }
+               pathname = tmp_path;
+               DBG3("Kernel network consumer subdir path: %s", pathname);
        }
 
        /* Prep stream consumer message */
@@ -164,13 +204,13 @@ int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
                        0, /* Kernel */
                        session->uid,
                        session->gid,
-                       output->net_seq_index,
+                       consumer->net_seq_index,
                        0, /* Metadata flag unset */
                        stream->name,
                        pathname);
 
        /* Send stream and file descriptor */
-       ret = consumer_send_stream(sock, output, &lkm, &stream->fd, 1);
+       ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
        if (ret < 0) {
                goto error;
        }
@@ -195,7 +235,7 @@ int kernel_consumer_send_channel_stream(int sock,
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
@@ -238,7 +278,7 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
 
        /* Bail out if consumer is disabled */
        if (!session->consumer->enabled) {
-               ret = LTTCOMM_OK;
+               ret = LTTNG_OK;
                goto error;
        }
 
@@ -269,54 +309,3 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
 error:
        return ret;
 }
-
-/*
- * Send relayd socket to consumer associated with a session name.
- *
- * On success return positive value. On error, negative value.
- */
-int kernel_consumer_send_relayd_socket(int consumer_sock,
-               struct lttcomm_sock *sock, struct consumer_output *consumer,
-               enum lttng_stream_type type)
-{
-       int ret;
-       struct lttcomm_consumer_msg msg;
-
-       /* Code flow error. Safety net. */
-       assert(sock);
-       assert(consumer);
-
-       /* Bail out if consumer is disabled */
-       if (!consumer->enabled) {
-               ret = LTTCOMM_OK;
-               goto error;
-       }
-
-       msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
-       /*
-        * Assign network consumer output index using the temporary consumer since
-        * this call should only be made from within a set_consumer_uri() function
-        * call in the session daemon.
-        */
-       msg.u.relayd_sock.net_index = consumer->net_seq_index;
-       msg.u.relayd_sock.type = type;
-       memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
-
-       DBG2("Sending relayd sock info to consumer");
-       ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
-       if (ret < 0) {
-               PERROR("send consumer relayd socket info");
-               goto error;
-       }
-
-       DBG2("Sending relayd socket file descriptor to consumer");
-       ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
-       if (ret < 0) {
-               goto error;
-       }
-
-       DBG("Kernel consumer relayd socket sent");
-
-error:
-       return ret;
-}
This page took 0.025738 seconds and 4 git commands to generate.