Fix: Add missing health code update for consumer command
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
index b7e0d359261dbdec4a686a893dbb3a0416ea04ac..e804f4b5ac74284c1c3b0b9ea23a9317b505649f 100644 (file)
 /*
  * Send a single channel to the consumer using command ADD_CHANNEL.
  */
-static int send_channel(int sock, struct ust_app_channel *uchan)
+static int send_channel(struct consumer_socket *sock,
+               struct ust_app_channel *uchan)
 {
        int ret, fd;
        struct lttcomm_consumer_msg msg;
 
        /* Safety net */
        assert(uchan);
+       assert(sock);
 
-       if (sock < 0) {
+       if (sock->fd < 0) {
                ret = -EINVAL;
                goto error;
        }
@@ -52,19 +54,26 @@ static int send_channel(int sock, struct ust_app_channel *uchan)
                        uchan->obj->shm_fd,
                        uchan->attr.subbuf_size,
                        uchan->obj->memory_map_size,
-                       uchan->name);
+                       uchan->name,
+                       uchan->streams.count);
+
+       health_code_update(&health_thread_cmd);
 
        ret = consumer_send_channel(sock, &msg);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update(&health_thread_cmd);
+
        fd = uchan->obj->shm_fd;
        ret = consumer_send_fds(sock, &fd, 1);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update(&health_thread_cmd);
+
 error:
        return ret;
 }
@@ -72,9 +81,10 @@ error:
 /*
  * Send a single stream to the consumer using ADD_STREAM command.
  */
-static int send_channel_stream(int sock, struct ust_app_channel *uchan,
-               struct ust_app_session *usess, struct ltt_ust_stream *stream,
-               struct consumer_output *consumer, const char *pathname)
+static int send_channel_stream(struct consumer_socket *sock,
+               struct ust_app_channel *uchan, struct ust_app_session *usess,
+               struct ltt_ust_stream *stream, struct consumer_output *consumer,
+               const char *pathname)
 {
        int ret, fds[2];
        struct lttcomm_consumer_msg msg;
@@ -84,6 +94,7 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan,
        assert(usess);
        assert(stream);
        assert(consumer);
+       assert(sock);
 
        DBG2("Sending stream %d of channel %s to kernel consumer",
                        stream->obj->shm_fd, uchan->name);
@@ -100,7 +111,10 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan,
                        consumer->net_seq_index,
                        0, /* Metadata flag unset */
                        stream->name,
-                       pathname);
+                       pathname,
+                       usess->id);
+
+       health_code_update(&health_thread_cmd);
 
        /* Send stream and file descriptor */
        fds[0] = stream->obj->shm_fd;
@@ -110,6 +124,8 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan,
                goto error;
        }
 
+       health_code_update(&health_thread_cmd);
+
 error:
        return ret;
 }
@@ -117,7 +133,7 @@ error:
 /*
  * Send all stream fds of UST channel to the consumer.
  */
-static int send_channel_streams(int sock,
+static int send_channel_streams(struct consumer_socket *sock,
                struct ust_app_channel *uchan, struct ust_app_session *usess,
                struct consumer_output *consumer)
 {
@@ -126,6 +142,8 @@ static int send_channel_streams(int sock,
        const char *pathname;
        struct ltt_ust_stream *stream, *tmp;
 
+       assert(sock);
+
        DBG("Sending streams of channel %s to UST consumer", uchan->name);
 
        ret = send_channel(sock, uchan);
@@ -136,8 +154,8 @@ static int send_channel_streams(int sock,
        /* Get the right path name destination */
        if (consumer->type == CONSUMER_DST_LOCAL) {
                /* Set application path to the destination path */
-               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
-                               consumer->dst.trace_path, usess->path);
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
+                               consumer->dst.trace_path, consumer->subdir, usess->path);
                if (ret < 0) {
                        PERROR("snprintf stream path");
                        goto error;
@@ -178,8 +196,8 @@ error:
 /*
  * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
  */
-static int send_metadata(int sock, struct ust_app_session *usess,
-               struct consumer_output *consumer)
+static int send_metadata(struct consumer_socket *sock,
+               struct ust_app_session *usess, struct consumer_output *consumer)
 {
        int ret, fd, fds[2];
        char tmp_path[PATH_MAX];
@@ -189,9 +207,10 @@ static int send_metadata(int sock, struct ust_app_session *usess,
        /* Safety net */
        assert(usess);
        assert(consumer);
+       assert(sock);
 
-       if (sock < 0) {
-               ERR("Consumer socket is negative (%d)", sock);
+       if (sock->fd < 0) {
+               ERR("Consumer socket is negative (%d)", sock->fd);
                return -EINVAL;
        }
 
@@ -208,13 +227,18 @@ static int send_metadata(int sock, struct ust_app_session *usess,
                        usess->metadata->obj->shm_fd,
                        usess->metadata->attr.subbuf_size,
                        usess->metadata->obj->memory_map_size,
-                       "metadata");
+                       "metadata",
+                       1);
+
+       health_code_update(&health_thread_cmd);
 
        ret = consumer_send_channel(sock, &msg);
        if (ret < 0) {
                goto error;
        }
 
+       health_code_update(&health_thread_cmd);
+
        /* Sending metadata shared memory fd */
        fd = usess->metadata->obj->shm_fd;
        ret = consumer_send_fds(sock, &fd, 1);
@@ -222,11 +246,13 @@ static int send_metadata(int sock, struct ust_app_session *usess,
                goto error;
        }
 
+       health_code_update(&health_thread_cmd);
+
        /* Get correct path name destination */
        if (consumer->type == CONSUMER_DST_LOCAL) {
                /* Set application path to the destination path */
-               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
-                               consumer->dst.trace_path, usess->path);
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
+                               consumer->dst.trace_path, consumer->subdir, usess->path);
                if (ret < 0) {
                        PERROR("snprintf stream path");
                        goto error;
@@ -264,7 +290,10 @@ static int send_metadata(int sock, struct ust_app_session *usess,
                        consumer->net_seq_index,
                        1, /* Flag metadata set */
                        "metadata",
-                       pathname);
+                       pathname,
+                       usess->id);
+
+       health_code_update(&health_thread_cmd);
 
        /* Send stream and file descriptor */
        fds[0] = usess->metadata->stream_obj->shm_fd;
@@ -274,6 +303,8 @@ static int send_metadata(int sock, struct ust_app_session *usess,
                goto error;
        }
 
+       health_code_update(&health_thread_cmd);
+
 error:
        return ret;
 }
@@ -289,15 +320,19 @@ int ust_consumer_send_session(struct ust_app_session *usess,
        struct ust_app_channel *ua_chan;
 
        assert(usess);
-       assert(consumer);
-       assert(sock);
+
+       if (consumer == NULL || sock == NULL) {
+               /* There is no consumer so just ignoring the command. */
+               DBG("UST consumer does not exist. Not sending streams");
+               return 0;
+       }
 
        DBG("Sending metadata stream fd to consumer on %d", sock->fd);
 
        pthread_mutex_lock(sock->lock);
 
        /* Sending metadata information to the consumer */
-       ret = send_metadata(sock->fd, usess, consumer);
+       ret = send_metadata(sock, usess, consumer);
        if (ret < 0) {
                goto error;
        }
@@ -314,7 +349,7 @@ int ust_consumer_send_session(struct ust_app_session *usess,
                        continue;
                }
 
-               ret = send_channel_streams(sock->fd, ua_chan, usess, consumer);
+               ret = send_channel_streams(sock, ua_chan, usess, consumer);
                if (ret < 0) {
                        rcu_read_unlock();
                        goto error;
This page took 0.025891 seconds and 4 git commands to generate.