Add filter sequence number to UST
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
index 5b909ab1594609257f1026f7d246c39f43bf8d9c..465dd07d74489cd26135126f903e1a30aa6070a2 100644 (file)
@@ -52,7 +52,8 @@ static int send_channel(int sock, struct ust_app_channel *uchan)
                        uchan->obj->shm_fd,
                        uchan->attr.subbuf_size,
                        uchan->obj->memory_map_size,
-                       uchan->name);
+                       uchan->name,
+                       uchan->streams.count);
 
        ret = consumer_send_channel(sock, &msg);
        if (ret < 0) {
@@ -100,7 +101,8 @@ static int send_channel_stream(int sock, struct ust_app_channel *uchan,
                        consumer->net_seq_index,
                        0, /* Metadata flag unset */
                        stream->name,
-                       pathname);
+                       pathname,
+                       usess->id);
 
        /* Send stream and file descriptor */
        fds[0] = stream->obj->shm_fd;
@@ -117,7 +119,7 @@ error:
 /*
  * Send all stream fds of UST channel to the consumer.
  */
-int ust_consumer_send_channel_streams(int sock,
+static int send_channel_streams(int sock,
                struct ust_app_channel *uchan, struct ust_app_session *usess,
                struct consumer_output *consumer)
 {
@@ -136,8 +138,8 @@ int ust_consumer_send_channel_streams(int sock,
        /* Get the right path name destination */
        if (consumer->type == CONSUMER_DST_LOCAL) {
                /* Set application path to the destination path */
-               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
-                               consumer->dst.trace_path, usess->path);
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
+                               consumer->dst.trace_path, consumer->subdir, usess->path);
                if (ret < 0) {
                        PERROR("snprintf stream path");
                        goto error;
@@ -178,7 +180,7 @@ error:
 /*
  * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
  */
-int ust_consumer_send_metadata(int sock, struct ust_app_session *usess,
+static int send_metadata(int sock, struct ust_app_session *usess,
                struct consumer_output *consumer)
 {
        int ret, fd, fds[2];
@@ -208,7 +210,8 @@ int ust_consumer_send_metadata(int sock, struct ust_app_session *usess,
                        usess->metadata->obj->shm_fd,
                        usess->metadata->attr.subbuf_size,
                        usess->metadata->obj->memory_map_size,
-                       "metadata");
+                       "metadata",
+                       1);
 
        ret = consumer_send_channel(sock, &msg);
        if (ret < 0) {
@@ -225,8 +228,8 @@ int ust_consumer_send_metadata(int sock, struct ust_app_session *usess,
        /* Get correct path name destination */
        if (consumer->type == CONSUMER_DST_LOCAL) {
                /* Set application path to the destination path */
-               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
-                               consumer->dst.trace_path, usess->path);
+               ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s/%s",
+                               consumer->dst.trace_path, consumer->subdir, usess->path);
                if (ret < 0) {
                        PERROR("snprintf stream path");
                        goto error;
@@ -264,7 +267,8 @@ int ust_consumer_send_metadata(int sock, struct ust_app_session *usess,
                        consumer->net_seq_index,
                        1, /* Flag metadata set */
                        "metadata",
-                       pathname);
+                       pathname,
+                       usess->id);
 
        /* Send stream and file descriptor */
        fds[0] = usess->metadata->stream_obj->shm_fd;
@@ -281,23 +285,27 @@ error:
 /*
  * Send all stream fds of the UST session to the consumer.
  */
-int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess,
-               struct consumer_output *consumer)
+int ust_consumer_send_session(struct ust_app_session *usess,
+               struct consumer_output *consumer, struct consumer_socket *sock)
 {
        int ret = 0;
-       int sock = consumer_fd;
        struct lttng_ht_iter iter;
        struct ust_app_channel *ua_chan;
 
-       DBG("Sending metadata stream fd");
+       assert(usess);
 
-       if (consumer_fd < 0) {
-               ERR("Consumer has negative file descriptor");
-               return -EINVAL;
+       if (consumer == NULL || sock == NULL) {
+               /* There is no consumer so just ignoring the command. */
+               DBG("UST consumer does not exist. Not sending streams");
+               return 0;
        }
 
+       DBG("Sending metadata stream fd to consumer on %d", sock->fd);
+
+       pthread_mutex_lock(sock->lock);
+
        /* Sending metadata information to the consumer */
-       ret = ust_consumer_send_metadata(consumer_fd, usess, consumer);
+       ret = send_metadata(sock->fd, usess, consumer);
        if (ret < 0) {
                goto error;
        }
@@ -314,7 +322,7 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess,
                        continue;
                }
 
-               ret = ust_consumer_send_channel_streams(sock, ua_chan, usess, consumer);
+               ret = send_channel_streams(sock->fd, ua_chan, usess, consumer);
                if (ret < 0) {
                        rcu_read_unlock();
                        goto error;
@@ -324,8 +332,10 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess,
 
        DBG("consumer fds (metadata and channel streams) sent");
 
-       return 0;
+       /* All good! */
+       ret = 0;
 
 error:
+       pthread_mutex_unlock(sock->lock);
        return ret;
 }
This page took 0.024742 seconds and 4 git commands to generate.