Use lttng_read/lttng_write wrappers
[lttng-tools.git] / src / common / consumer-stream.c
index 723ec829f80095ff17bd3e5996540d01d754072a..063ba50ab12eb968e4ee81c4f49cadefcb7f3d90 100644 (file)
@@ -24,6 +24,8 @@
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/index/index.h>
+#include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 
@@ -135,6 +137,14 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                stream->out_fd = -1;
        }
 
+       if (stream->index_fd >= 0) {
+               ret = close(stream->index_fd);
+               if (ret) {
+                       PERROR("close stream index_fd");
+               }
+               stream->index_fd = -1;
+       }
+
        /* Check and cleanup relayd if needed. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -280,6 +290,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                 */
                if (stream->globally_visible) {
                        pthread_mutex_lock(&consumer_data.lock);
+                       pthread_mutex_lock(&stream->chan->lock);
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
                        consumer_stream_delete(stream, ht);
@@ -293,6 +304,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                        consumer_data.need_update = 1;
 
                        pthread_mutex_unlock(&stream->lock);
+                       pthread_mutex_unlock(&stream->chan->lock);
                        pthread_mutex_unlock(&consumer_data.lock);
                } else {
                        /*
@@ -312,3 +324,177 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        /* Free stream within a RCU call. */
        consumer_stream_free(stream);
 }
+
+/*
+ * Write index of a specific stream either on the relayd or local disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_write_index(struct lttng_consumer_stream *stream,
+               struct lttng_packet_index *index)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(index);
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               ret = relayd_send_index(&relayd->control_sock, index,
+                               stream->relayd_stream_id, stream->next_net_seq_num - 1);
+       } else {
+               ssize_t size_ret;
+
+               size_ret = index_write(stream->index_fd, index,
+                               sizeof(struct lttng_packet_index));
+               if (size_ret < sizeof(struct lttng_packet_index)) {
+                       ret = -1;
+               } else {
+                       ret = 0;
+               }
+       }
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+               uint64_t session_id)
+{
+       int ret;
+       struct lttng_consumer_stream *metadata = NULL, *stream = NULL;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+
+       assert(ctx);
+
+       /* Ease our life a bit. */
+       ht = consumer_data.stream_list_ht;
+
+       rcu_read_lock();
+
+       /* Search the metadata associated with the session id of the given stream. */
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+                       &session_id, &iter.iter, stream, node_session_id.node) {
+               if (stream->metadata_flag) {
+                       metadata = stream;
+                       break;
+               }
+       }
+       if (!metadata) {
+               ret = 0;
+               goto end_unlock_rcu;
+       }
+
+       /*
+        * In UST, since we have to write the metadata from the cache packet
+        * by packet, we might need to start this procedure multiple times
+        * until all the metadata from the cache has been extracted.
+        */
+       do {
+               /*
+                * Steps :
+                * - Lock the metadata stream
+                * - Check if metadata stream node was deleted before locking.
+                *   - if yes, release and return success
+                * - Check if new metadata is ready (flush + snapshot pos)
+                * - If nothing : release and return.
+                * - Lock the metadata_rdv_lock
+                * - Unlock the metadata stream
+                * - cond_wait on metadata_rdv to wait the wakeup from the
+                *   metadata thread
+                * - Unlock the metadata_rdv_lock
+                */
+               pthread_mutex_lock(&metadata->lock);
+
+               /*
+                * There is a possibility that we were able to acquire a reference on the
+                * stream from the RCU hash table but between then and now, the node might
+                * have been deleted just before the lock is acquired. Thus, after locking,
+                * we make sure the metadata node has not been deleted which means that the
+                * buffers are closed.
+                *
+                * In that case, there is no need to sync the metadata hence returning a
+                * success return code.
+                */
+               ret = cds_lfht_is_node_deleted(&metadata->node.node);
+               if (ret) {
+                       ret = 0;
+                       goto end_unlock_mutex;
+               }
+
+               switch (ctx->type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Empty the metadata cache and flush the current stream.
+                        */
+                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Ask the sessiond if we have new metadata waiting and update the
+                        * consumer metadata cache.
+                        */
+                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       break;
+               default:
+                       assert(0);
+                       ret = -1;
+                       break;
+               }
+               /*
+                * Error or no new metadata, we exit here.
+                */
+               if (ret <= 0 || ret == ENODATA) {
+                       goto end_unlock_mutex;
+               }
+
+               /*
+                * At this point, new metadata have been flushed, so we wait on the
+                * rendez-vous point for the metadata thread to wake us up when it
+                * finishes consuming the metadata and continue execution.
+                */
+
+               pthread_mutex_lock(&metadata->metadata_rdv_lock);
+
+               /*
+                * Release metadata stream lock so the metadata thread can process it.
+                */
+               pthread_mutex_unlock(&metadata->lock);
+
+               /*
+                * Wait on the rendez-vous point. Once woken up, it means the metadata was
+                * consumed and thus synchronization is achieved.
+                */
+               pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
+               pthread_mutex_unlock(&metadata->metadata_rdv_lock);
+       } while (ret == EAGAIN);
+
+       ret = 0;
+       goto end_unlock_rcu;
+
+end_unlock_mutex:
+       pthread_mutex_unlock(&metadata->lock);
+end_unlock_rcu:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.026158 seconds and 4 git commands to generate.