Port: Remove _GNU_SOURCE, defined in config.h
[lttng-tools.git] / src / common / consumer-stream.c
index 717e0a7351a49461e4702c872fc91914b8efaffe..a62cef272294d360953e5d685fca745d7509bcff 100644 (file)
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <inttypes.h>
 #include <sys/mman.h>
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/index/index.h>
+#include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/utils.h>
 
 #include "consumer-stream.h"
 
@@ -117,10 +120,35 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                        }
                        stream->wait_fd = -1;
                }
+               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+                       utils_close_pipe(stream->splice_pipe);
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+       {
+               /*
+                * Special case for the metadata since the wait fd is an internal pipe
+                * polled in the metadata thread.
+                */
+               if (stream->metadata_flag && stream->chan->monitor) {
+                       int rpipe = stream->ust_metadata_poll_pipe[0];
+
+                       /*
+                        * This will stop the channel timer if one and close the write side
+                        * of the metadata poll pipe.
+                        */
+                       lttng_ustconsumer_close_metadata(stream->chan);
+                       if (rpipe >= 0) {
+                               ret = close(rpipe);
+                               if (ret < 0) {
+                                       PERROR("closing metadata pipe read side");
+                               }
+                               stream->ust_metadata_poll_pipe[0] = -1;
+                       }
+               }
                break;
+       }
        default:
                ERR("Unknown consumer_data type");
                assert(0);
@@ -135,6 +163,14 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                stream->out_fd = -1;
        }
 
+       if (stream->index_fd >= 0) {
+               ret = close(stream->index_fd);
+               if (ret) {
+                       PERROR("close stream index_fd");
+               }
+               stream->index_fd = -1;
+       }
+
        /* Check and cleanup relayd if needed. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -185,9 +221,11 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
-       /* Decrement the stream count of the global consumer data. */
-       assert(consumer_data.stream_count > 0);
-       consumer_data.stream_count--;
+       if (!stream->metadata_flag) {
+               /* Decrement the stream count of the global consumer data. */
+               assert(consumer_data.stream_count > 0);
+               consumer_data.stream_count--;
+       }
 }
 
 /*
@@ -314,3 +352,203 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        /* Free stream within a RCU call. */
        consumer_stream_free(stream);
 }
+
+/*
+ * Write index of a specific stream either on the relayd or local disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_write_index(struct lttng_consumer_stream *stream,
+               struct ctf_packet_index *index)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(index);
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_send_index(&relayd->control_sock, index,
+                               stream->relayd_stream_id, stream->next_net_seq_num - 1);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       } else {
+               ssize_t size_ret;
+
+               size_ret = index_write(stream->index_fd, index,
+                               sizeof(struct ctf_packet_index));
+               if (size_ret < sizeof(struct ctf_packet_index)) {
+                       ret = -1;
+               } else {
+                       ret = 0;
+               }
+       }
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Actually do the metadata sync using the given metadata stream.
+ *
+ * Return 0 on success else a negative value. ENODATA can be returned also
+ * indicating that there is no metadata available for that stream.
+ */
+static int do_sync_metadata(struct lttng_consumer_stream *metadata,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+
+       assert(metadata);
+       assert(metadata->metadata_flag);
+       assert(ctx);
+
+       /*
+        * In UST, since we have to write the metadata from the cache packet
+        * by packet, we might need to start this procedure multiple times
+        * until all the metadata from the cache has been extracted.
+        */
+       do {
+               /*
+                * Steps :
+                * - Lock the metadata stream
+                * - Check if metadata stream node was deleted before locking.
+                *   - if yes, release and return success
+                * - Check if new metadata is ready (flush + snapshot pos)
+                * - If nothing : release and return.
+                * - Lock the metadata_rdv_lock
+                * - Unlock the metadata stream
+                * - cond_wait on metadata_rdv to wait the wakeup from the
+                *   metadata thread
+                * - Unlock the metadata_rdv_lock
+                */
+               pthread_mutex_lock(&metadata->lock);
+
+               /*
+                * There is a possibility that we were able to acquire a reference on the
+                * stream from the RCU hash table but between then and now, the node might
+                * have been deleted just before the lock is acquired. Thus, after locking,
+                * we make sure the metadata node has not been deleted which means that the
+                * buffers are closed.
+                *
+                * In that case, there is no need to sync the metadata hence returning a
+                * success return code.
+                */
+               ret = cds_lfht_is_node_deleted(&metadata->node.node);
+               if (ret) {
+                       ret = 0;
+                       goto end_unlock_mutex;
+               }
+
+               switch (ctx->type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Empty the metadata cache and flush the current stream.
+                        */
+                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Ask the sessiond if we have new metadata waiting and update the
+                        * consumer metadata cache.
+                        */
+                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       break;
+               default:
+                       assert(0);
+                       ret = -1;
+                       break;
+               }
+               /*
+                * Error or no new metadata, we exit here.
+                */
+               if (ret <= 0 || ret == ENODATA) {
+                       goto end_unlock_mutex;
+               }
+
+               /*
+                * At this point, new metadata have been flushed, so we wait on the
+                * rendez-vous point for the metadata thread to wake us up when it
+                * finishes consuming the metadata and continue execution.
+                */
+
+               pthread_mutex_lock(&metadata->metadata_rdv_lock);
+
+               /*
+                * Release metadata stream lock so the metadata thread can process it.
+                */
+               pthread_mutex_unlock(&metadata->lock);
+
+               /*
+                * Wait on the rendez-vous point. Once woken up, it means the metadata was
+                * consumed and thus synchronization is achieved.
+                */
+               pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
+               pthread_mutex_unlock(&metadata->metadata_rdv_lock);
+       } while (ret == EAGAIN);
+
+       /* Success */
+       return 0;
+
+end_unlock_mutex:
+       pthread_mutex_unlock(&metadata->lock);
+       return ret;
+}
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+               uint64_t session_id)
+{
+       int ret;
+       struct lttng_consumer_stream *stream = NULL;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+
+       assert(ctx);
+
+       /* Ease our life a bit. */
+       ht = consumer_data.stream_list_ht;
+
+       rcu_read_lock();
+
+       /* Search the metadata associated with the session id of the given stream. */
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+                       &session_id, &iter.iter, stream, node_session_id.node) {
+               if (!stream->metadata_flag) {
+                       continue;
+               }
+
+               ret = do_sync_metadata(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
+       /*
+        * Force return code to 0 (success) since ret might be ENODATA for instance
+        * which is not an error but rather that we should come back.
+        */
+       ret = 0;
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.025648 seconds and 4 git commands to generate.