consumer: explicitly set endpoint status to active at init
[lttng-tools.git] / src / common / consumer.c
index 6830084e8c5a959f7407f80e0a35061089a738a2..94a0cc3efb1d557da732ed9a201848b0ee4dc3e4 100644 (file)
@@ -291,19 +291,24 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
+
+       /* Delete streams that might have been left in the stream list. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               cds_list_del(&stream->send_node);
+               /*
+                * Once a stream is added to this list, the buffers were created so
+                * we have a guarantee that this call will succeed.
+                */
+               consumer_stream_destroy(stream, NULL);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               /* Delete streams that might have been left in the stream list. */
-               cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                               send_node) {
-                       cds_list_del(&stream->send_node);
-                       lttng_ustconsumer_del_stream(stream);
-                       free(stream);
-               }
                lttng_ustconsumer_del_channel(channel);
                break;
        default:
@@ -312,25 +317,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
-       /* Empty no monitor streams list. */
-       if (!channel->monitor) {
-               struct lttng_consumer_stream *stream, *stmp;
-
-               /*
-                * So, these streams are not visible to any data thread. This is why we
-                * close and free them because they were never added to any data
-                * structure apart from this one.
-                */
-               cds_list_for_each_entry_safe(stream, stmp,
-                               &channel->stream_no_monitor_list.head, no_monitor_node) {
-                       cds_list_del(&stream->no_monitor_node);
-                       /* Close everything in that stream. */
-                       consumer_stream_close(stream);
-                       /* Free the ressource. */
-                       consumer_stream_free(stream);
-               }
-       }
-
        rcu_read_lock();
        iter.iter.node = &channel->node.node;
        ret = lttng_ht_del(consumer_data.channel_ht, &iter);
@@ -339,6 +325,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
@@ -480,7 +467,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
-               enum consumer_channel_type type)
+               enum consumer_channel_type type,
+               unsigned int monitor)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -502,6 +490,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
+       stream->monitor = monitor;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -553,7 +543,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        assert(ht);
@@ -561,6 +550,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -579,12 +569,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
         */
        lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
-       /* Check and cleanup relayd */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -604,6 +588,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -693,6 +678,67 @@ error:
        return relayd;
 }
 
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
+               char *path)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(stream->net_seq_idx != -1ULL);
+       assert(path);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_add_stream(&relayd->control_sock, stream->name,
+                               path, &stream->relayd_stream_id,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+               uatomic_inc(&relayd->refcount);
+               stream->sent_to_relayd = 1;
+       } else {
+               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+                               stream->key, stream->net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+       struct consumer_relayd_sock_pair *relayd;
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               consumer_stream_relayd_close(stream, relayd);
+       }
+       rcu_read_unlock();
+}
+
 /*
  * Handle stream for relayd transmission if the stream applies for network
  * streaming where the net sequence index is set.
@@ -791,6 +837,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       pthread_mutex_init(&channel->lock, NULL);
 
        /*
         * In monitor mode, the streams associated with the channel will be put in
@@ -816,7 +863,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->wait_fd = -1;
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
-       CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
 
        DBG("Allocated channel (key %" PRIu64 ")", channel->key)
 
@@ -837,6 +883,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -853,10 +900,11 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
-                       channel->metadata_stream == NULL) {
+                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
        return ret;
@@ -1088,7 +1136,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                        struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
-               int (*update_stream)(int stream_key, uint32_t state))
+               int (*update_stream)(uint64_t stream_key, uint32_t state))
 {
        int ret;
        struct lttng_consumer_local_data *ctx;
@@ -1812,6 +1860,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1905,6 +1954,7 @@ end:
        stream->chan->metadata_stream = NULL;
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -1923,7 +1973,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
 
@@ -1933,6 +1982,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -1950,12 +2000,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        node = lttng_ht_iter_get_node_u64(&iter);
        assert(!node);
 
-       /* Find relayd and, if one is found, increment refcount. */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -1984,6 +2028,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2258,7 +2303,11 @@ void *consumer_thread_data_poll(void *data)
                goto end;
        }
 
-       local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
+       local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
+       if (local_stream == NULL) {
+               PERROR("local_stream malloc");
+               goto end;
+       }
 
        while (1) {
                high_prio = 0;
@@ -2744,7 +2793,6 @@ restart:
                                lttng_poll_del(&events, chan->wait_fd);
                                ret = lttng_ht_del(channel_ht, &iter);
                                assert(ret == 0);
-                               assert(cds_list_empty(&chan->streams.head));
                                consumer_close_channel_streams(chan);
 
                                /* Release our own refcount */
@@ -2999,7 +3047,7 @@ void lttng_consumer_init(void)
 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
@@ -3017,10 +3065,11 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        ret = -ENOMEM;
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+                       goto error;
                } else {
-                       relayd->sessiond_session_id = (uint64_t) sessiond_id;
+                       relayd->sessiond_session_id = sessiond_id;
                        relayd_created = 1;
                }
 
@@ -3037,23 +3086,23 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0 || ret_code != LTTNG_OK) {
+       ret = consumer_send_status_msg(sock, LTTNG_OK);
+       if (ret < 0) {
                /* Somehow, the session daemon is not responding anymore. */
-               goto error;
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+               goto error_nosignal;
        }
 
        /* Poll on consumer socket. */
        if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
-               goto error;
+               goto error_nosignal;
        }
 
        /* Get relayd socket from session daemon */
        ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
        if (ret != sizeof(fd)) {
-               ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
                ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
 
@@ -3067,18 +3116,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                 * issue when reaching the fd limit.
                 */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
-
-               /*
-                * This code path MUST continue to the consumer send status message so
-                * we can send the error to the thread expecting a reply. The above
-                * call will make everything stop.
-                */
-       }
-
-       /* We have the fds without error. Send status back. */
-       ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0 || ret_code != LTTNG_OK) {
-               /* Somehow, the session daemon is not responding anymore. */
+               ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
                goto error;
        }
 
@@ -3090,6 +3128,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                ret = lttcomm_create_sock(&relayd->control_sock.sock);
                /* Handle create_sock error. */
                if (ret < 0) {
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                }
                /*
@@ -3128,6 +3167,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                         */
                        (void) relayd_close(&relayd->control_sock);
                        (void) relayd_close(&relayd->data_sock);
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        goto error;
                }
 
@@ -3138,6 +3178,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                ret = lttcomm_create_sock(&relayd->data_sock.sock);
                /* Handle create_sock error. */
                if (ret < 0) {
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                }
                /*
@@ -3159,6 +3200,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
                ret = -1;
+               ret_code = LTTCOMM_CONSUMERD_FATAL;
                goto error;
        }
 
@@ -3166,6 +3208,14 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
 
+       /* We successfully added the socket. Send status back. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+               goto error_nosignal;
+       }
+
        /*
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
@@ -3176,6 +3226,11 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        return 0;
 
 error:
+       if (consumer_send_status_msg(sock, ret_code) < 0) {
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+       }
+
+error_nosignal:
        /* Close received socket if valid. */
        if (fd >= 0) {
                if (close(fd)) {
@@ -3414,3 +3469,23 @@ int consumer_send_status_channel(int sock,
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t max_stream_size)
+{
+       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+               /* Offset from the produced position to get the latest buffers. */
+               return produced_pos - max_stream_size;
+       }
+
+       return consumed_pos;
+}
This page took 0.02891 seconds and 4 git commands to generate.