Fix: send per-pid session id in channel creation
[lttng-tools.git] / src / common / consumer.c
index a856f030efe63c3553c820d96012b065e9817e8b..57322ae08dbb41685f17bcadb3b59ccc378c2431 100644 (file)
@@ -41,6 +41,7 @@
 #include <common/ust-consumer/ust-consumer.h>
 
 #include "consumer.h"
+#include "consumer-stream.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -254,10 +255,8 @@ static void free_relayd_rcu(struct rcu_head *head)
 
 /*
  * Destroy and free relayd socket pair object.
- *
- * This function MUST be called with the consumer_data lock acquired.
  */
-static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -287,6 +286,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
        struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream, *stmp;
 
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
@@ -297,6 +297,13 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               /* Delete streams that might have been left in the stream list. */
+               cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                               send_node) {
+                       cds_list_del(&stream->send_node);
+                       lttng_ustconsumer_del_stream(stream);
+                       free(stream);
+               }
                lttng_ustconsumer_del_channel(channel);
                break;
        default:
@@ -305,6 +312,25 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
+       /* Empty no monitor streams list. */
+       if (!channel->monitor) {
+               struct lttng_consumer_stream *stream, *stmp;
+
+               /*
+                * So, these streams are not visible to any data thread. This is why we
+                * close and free them because they were never added to any data
+                * structure apart from this one.
+                */
+               cds_list_for_each_entry_safe(stream, stmp,
+                               &channel->stream_no_monitor_list.head, no_monitor_node) {
+                       cds_list_del(&stream->no_monitor_node);
+                       /* Close everything in that stream. */
+                       consumer_stream_close(stream);
+                       /* Free the ressource. */
+                       consumer_stream_free(stream);
+               }
+       }
+
        rcu_read_lock();
        iter.iter.node = &channel->node.node;
        ret = lttng_ht_del(consumer_data.channel_ht, &iter);
@@ -329,7 +355,7 @@ static void cleanup_relayd_ht(void)
 
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 
        rcu_read_unlock();
@@ -396,7 +422,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * Delete the relayd from the relayd hash table, close the sockets and free
         * the object in a RCU call.
         */
-       destroy_relayd(relayd);
+       consumer_destroy_relayd(relayd);
 
        /* Set inactive endpoint to all streams */
        update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
@@ -428,130 +454,20 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
 
        /* Destroy the relayd if refcount is 0 */
        if (uatomic_read(&relayd->refcount) == 0) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 }
 
 /*
- * Remove a stream from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Completly destroy stream from every visiable data structure and the given
+ * hash table if one.
+ *
+ * One this call returns, the stream object is not longer usable nor visible.
  */
 void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
-       int ret;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *free_chan = NULL;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-
-       DBG("Consumer del stream %d", stream->wait_fd);
-
-       if (ht == NULL) {
-               /* Means the stream was allocated but not successfully added */
-               goto free_stream_rcu;
-       }
-
-       pthread_mutex_lock(&consumer_data.lock);
-       pthread_mutex_lock(&stream->lock);
-
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (stream->mmap_base != NULL) {
-                       ret = munmap(stream->mmap_base, stream->mmap_len);
-                       if (ret != 0) {
-                               PERROR("munmap");
-                       }
-               }
-
-               if (stream->wait_fd >= 0) {
-                       ret = close(stream->wait_fd);
-                       if (ret) {
-                               PERROR("close");
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_del_stream(stream);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
-       }
-
-       rcu_read_lock();
-       iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_channel_id.node;
-       ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_session_id.node;
-       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
-       assert(!ret);
-       rcu_read_unlock();
-
-       assert(consumer_data.stream_count > 0);
-       consumer_data.stream_count--;
-
-       if (stream->out_fd >= 0) {
-               ret = close(stream->out_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       /* Check and cleanup relayd */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_dec(&relayd->refcount);
-               assert(uatomic_read(&relayd->refcount) >= 0);
-
-               /* Closing streams requires to lock the control socket. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_close_stream(&relayd->control_sock,
-                               stream->relayd_stream_id,
-                               stream->next_net_seq_num - 1);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       DBG("Unable to close stream on the relayd. Continuing");
-                       /*
-                        * Continue here. There is nothing we can do for the relayd.
-                        * Chances are that the relayd has closed the socket so we just
-                        * continue cleaning up.
-                        */
-               }
-
-               /* Both conditions are met, we destroy the relayd. */
-               if (uatomic_read(&relayd->refcount) == 0 &&
-                               uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
-               }
-       }
-       rcu_read_unlock();
-
-       if (!uatomic_sub_return(&stream->chan->refcount, 1)
-                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
-               free_chan = stream->chan;
-       }
-
-end:
-       consumer_data.need_update = 1;
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&consumer_data.lock);
-
-       if (free_chan) {
-               consumer_del_channel(free_chan);
-       }
-
-free_stream_rcu:
-       call_rcu(&stream->node.head, free_stream_rcu);
+       consumer_stream_destroy(stream, ht);
 }
 
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
@@ -560,7 +476,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                const char *channel_name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
@@ -612,8 +528,10 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        /* Init session id node with the stream session id */
        lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
 
-       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64,
-                       stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id);
+       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+                       " relayd_id %" PRIu64 ", session_id %" PRIu64,
+                       stream->name, stream->key, channel_key,
+                       stream->net_seq_idx, stream->session_id);
 
        rcu_read_unlock();
        return stream;
@@ -847,10 +765,12 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                enum lttng_event_output output,
                uint64_t tracefile_size,
-               uint64_t tracefile_count)
+               uint64_t tracefile_count,
+               uint64_t session_id_per_pid,
+               unsigned int monitor)
 {
        struct lttng_consumer_channel *channel;
 
@@ -863,12 +783,41 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->key = key;
        channel->refcount = 0;
        channel->session_id = session_id;
+       channel->session_id_per_pid = session_id_per_pid;
        channel->uid = uid;
        channel->gid = gid;
        channel->relayd_id = relayd_id;
        channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
+       channel->monitor = monitor;
+
+       /*
+        * In monitor mode, the streams associated with the channel will be put in
+        * a special list ONLY owned by this channel. So, the refcount is set to 1
+        * here meaning that the channel itself has streams that are referenced.
+        *
+        * On a channel deletion, once the channel is no longer visible, the
+        * refcount is decremented and checked for a zero value to delete it. With
+        * streams in no monitor mode, it will now be safe to destroy the channel.
+        */
+       if (!channel->monitor) {
+               channel->refcount = 1;
+       }
+
+       switch (output) {
+       case LTTNG_EVENT_SPLICE:
+               channel->output = CONSUMER_CHANNEL_SPLICE;
+               break;
+       case LTTNG_EVENT_MMAP:
+               channel->output = CONSUMER_CHANNEL_MMAP;
+               break;
+       default:
+               ERR("Allocate channel output unknown %d", output);
+               free(channel);
+               channel = NULL;
+               goto end;
+       }
 
        strncpy(channel->pathname, pathname, sizeof(channel->pathname));
        channel->pathname[sizeof(channel->pathname) - 1] = '\0';
@@ -881,6 +830,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->wait_fd = -1;
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
+       CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
 
        DBG("Allocated channel (key %" PRIu64 ")", channel->key)
 
@@ -890,6 +840,8 @@ end:
 
 /*
  * Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
@@ -907,7 +859,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
                /* Channel already exist. Ignore the insertion */
                ERR("Consumer add channel key %" PRIu64 " already exists!",
                        channel->key);
-               ret = LTTNG_ERR_KERN_CHAN_EXIST;
+               ret = -EEXIST;
                goto end;
        }
 
@@ -1884,7 +1836,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                                PERROR("munmap metadata stream");
                        }
                }
-
                if (stream->wait_fd >= 0) {
                        ret = close(stream->wait_fd);
                        if (ret < 0) {
@@ -1947,7 +1898,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
+                       consumer_destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
@@ -2019,9 +1970,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                uatomic_inc(&relayd->refcount);
        }
 
-       /* Update channel refcount once added without error(s). */
-       uatomic_inc(&stream->chan->refcount);
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -2352,7 +2300,7 @@ void *consumer_thread_data_poll(void *data)
 
                        /* allocate for all fds + 1 for the consumer_data_pipe */
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
-                                       sizeof(struct lttng_consumer_stream));
+                                       sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -2719,24 +2667,51 @@ restart:
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                        chan->wait_fd);
+                                               rcu_read_lock();
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                &chan->wait_fd_node);
+                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
                                                                LPOLLIN | LPOLLPRI);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
+                                               struct lttng_consumer_stream *stream, *stmp;
+
+                                               rcu_read_lock();
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
+                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
                                                        break;
                                                }
                                                lttng_poll_del(&events, chan->wait_fd);
+                                               iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
                                                consumer_close_channel_streams(chan);
 
+                                               switch (consumer_data.type) {
+                                               case LTTNG_CONSUMER_KERNEL:
+                                                       break;
+                                               case LTTNG_CONSUMER32_UST:
+                                               case LTTNG_CONSUMER64_UST:
+                                                       /* Delete streams that might have been left in the stream list. */
+                                                       cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+                                                                       send_node) {
+                                                               cds_list_del(&stream->send_node);
+                                                               lttng_ustconsumer_del_stream(stream);
+                                                               uatomic_sub(&stream->chan->refcount, 1);
+                                                               assert(&chan->refcount);
+                                                               free(stream);
+                                                       }
+                                                       break;
+                                               default:
+                                                       ERR("Unknown consumer_data type");
+                                                       assert(0);
+                                               }
+
                                                /*
                                                 * Release our own refcount. Force channel deletion even if
                                                 * streams were not initialized.
@@ -2744,6 +2719,7 @@ restart:
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
+                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -2782,6 +2758,7 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
+                               assert(cds_list_empty(&chan->streams.head));
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
@@ -2870,12 +2847,6 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
-       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
-
        /* prepare the FDs to poll : to client socket and the should_quit pipe */
        consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
        consumer_sockpoll[0].events = POLLIN | POLLPRI;
@@ -2893,11 +2864,6 @@ void *consumer_thread_sessiond_poll(void *data)
                WARN("On accept");
                goto end;
        }
-       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
 
        /*
         * Setup metadata socket which is the second socket connection on the
@@ -3058,29 +3024,36 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
-       /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0) {
-               /* Somehow, the session daemon is not responding anymore. */
-               goto error;
-       }
-
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd == NULL) {
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
-                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
-                       ret = -1;
-                       goto error;
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+                       ret = -ENOMEM;
+               } else {
+                       relayd->sessiond_session_id = (uint64_t) sessiond_id;
+                       relayd_created = 1;
                }
-               relayd->sessiond_session_id = (uint64_t) sessiond_id;
-               relayd_created = 1;
+
+               /*
+                * This code path MUST continue to the consumer send status message to
+                * we can notify the session daemon and continue our work without
+                * killing everything.
+                */
+       }
+
+       /* First send a status message before receiving the fds. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0 || ret_code != LTTNG_OK) {
+               /* Somehow, the session daemon is not responding anymore. */
+               goto error;
        }
 
        /* Poll on consumer socket. */
        if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
                goto error;
        }
@@ -3088,15 +3061,31 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        /* Get relayd socket from session daemon */
        ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
        if (ret != sizeof(fd)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+               ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
                ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
-               goto error_close;
+
+               /*
+                * Failing to receive FDs might indicate a major problem such as
+                * reaching a fd limit during the receive where the kernel returns a
+                * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+                * don't take any chances and stop everything.
+                *
+                * XXX: Feature request #558 will fix that and avoid this possible
+                * issue when reaching the fd limit.
+                */
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+
+               /*
+                * This code path MUST continue to the consumer send status message so
+                * we can send the error to the thread expecting a reply. The above
+                * call will make everything stop.
+                */
        }
 
        /* We have the fds without error. Send status back. */
        ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0) {
+       if (ret < 0 || ret_code != LTTNG_OK) {
                /* Somehow, the session daemon is not responding anymore. */
                goto error;
        }
@@ -3120,6 +3109,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 
                /* Assign new file descriptor */
                relayd->control_sock.sock.fd = fd;
+               fd = -1;        /* For error path */
                /* Assign version values. */
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
@@ -3165,6 +3155,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
 
                /* Assign new file descriptor */
                relayd->data_sock.sock.fd = fd;
+               fd = -1;        /* for eventual error paths */
                /* Assign version values. */
                relayd->data_sock.major = relayd_sock->major;
                relayd->data_sock.minor = relayd_sock->minor;
@@ -3196,7 +3187,6 @@ error:
                }
        }
 
-error_close:
        if (relayd_created) {
                free(relayd);
        }
This page took 0.032584 seconds and 4 git commands to generate.