Port: Remove _GNU_SOURCE, defined in config.h
[lttng-tools.git] / src / common / consumer-stream.c
index 808cae236ded5d142a6c5b962b27e760c41eac2a..a62cef272294d360953e5d685fca745d7509bcff 100644 (file)
@@ -17,7 +17,7 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#define _GNU_SOURCE
+#define _LGPL_SOURCE
 #include <assert.h>
 #include <inttypes.h>
 #include <sys/mman.h>
@@ -28,6 +28,7 @@
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/utils.h>
 
 #include "consumer-stream.h"
 
@@ -119,10 +120,35 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                        }
                        stream->wait_fd = -1;
                }
+               if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+                       utils_close_pipe(stream->splice_pipe);
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+       {
+               /*
+                * Special case for the metadata since the wait fd is an internal pipe
+                * polled in the metadata thread.
+                */
+               if (stream->metadata_flag && stream->chan->monitor) {
+                       int rpipe = stream->ust_metadata_poll_pipe[0];
+
+                       /*
+                        * This will stop the channel timer if one and close the write side
+                        * of the metadata poll pipe.
+                        */
+                       lttng_ustconsumer_close_metadata(stream->chan);
+                       if (rpipe >= 0) {
+                               ret = close(rpipe);
+                               if (ret < 0) {
+                                       PERROR("closing metadata pipe read side");
+                               }
+                               stream->ust_metadata_poll_pipe[0] = -1;
+                       }
+               }
                break;
+       }
        default:
                ERR("Unknown consumer_data type");
                assert(0);
@@ -195,9 +221,11 @@ void consumer_stream_delete(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
 
-       /* Decrement the stream count of the global consumer data. */
-       assert(consumer_data.stream_count > 0);
-       consumer_data.stream_count--;
+       if (!stream->metadata_flag) {
+               /* Decrement the stream count of the global consumer data. */
+               assert(consumer_data.stream_count > 0);
+               consumer_data.stream_count--;
+       }
 }
 
 /*
@@ -331,7 +359,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
  * Return 0 on success or else a negative value.
  */
 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
-               struct lttng_packet_index *index)
+               struct ctf_packet_index *index)
 {
        int ret;
        struct consumer_relayd_sock_pair *relayd;
@@ -342,11 +370,20 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_send_index(&relayd->control_sock, index,
                                stream->relayd_stream_id, stream->next_net_seq_num - 1);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        } else {
-               ret = index_write(stream->index_fd, index,
-                               sizeof(struct lttng_packet_index));
+               ssize_t size_ret;
+
+               size_ret = index_write(stream->index_fd, index,
+                               sizeof(struct ctf_packet_index));
+               if (size_ret < sizeof(struct ctf_packet_index)) {
+                       ret = -1;
+               } else {
+                       ret = 0;
+               }
        }
        if (ret < 0) {
                goto error;
@@ -358,45 +395,20 @@ error:
 }
 
 /*
- * Synchronize the metadata using a given session ID. A successful acquisition
- * of a metadata stream will trigger a request to the session daemon and a
- * snapshot so the metadata thread can consume it.
- *
- * This function call is a rendez-vous point between the metadata thread and
- * the data thread.
+ * Actually do the metadata sync using the given metadata stream.
  *
- * Return 0 on success or else a negative value.
+ * Return 0 on success else a negative value. ENODATA can be returned also
+ * indicating that there is no metadata available for that stream.
  */
-int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
-               uint64_t session_id)
+static int do_sync_metadata(struct lttng_consumer_stream *metadata,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       struct lttng_consumer_stream *metadata = NULL, *stream = NULL;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht;
 
+       assert(metadata);
+       assert(metadata->metadata_flag);
        assert(ctx);
 
-       /* Ease our life a bit. */
-       ht = consumer_data.stream_list_ht;
-
-       rcu_read_lock();
-
-       /* Search the metadata associated with the session id of the given stream. */
-
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
-                       &session_id, &iter.iter, stream, node_session_id.node) {
-               if (stream->metadata_flag) {
-                       metadata = stream;
-                       break;
-               }
-       }
-       if (!metadata) {
-               ret = 0;
-               goto end_unlock_rcu;
-       }
-
        /*
         * In UST, since we have to write the metadata from the cache packet
         * by packet, we might need to start this procedure multiple times
@@ -482,12 +494,61 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
                pthread_mutex_unlock(&metadata->metadata_rdv_lock);
        } while (ret == EAGAIN);
 
-       ret = 0;
-       goto end_unlock_rcu;
+       /* Success */
+       return 0;
 
 end_unlock_mutex:
        pthread_mutex_unlock(&metadata->lock);
-end_unlock_rcu:
+       return ret;
+}
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+               uint64_t session_id)
+{
+       int ret;
+       struct lttng_consumer_stream *stream = NULL;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+
+       assert(ctx);
+
+       /* Ease our life a bit. */
+       ht = consumer_data.stream_list_ht;
+
+       rcu_read_lock();
+
+       /* Search the metadata associated with the session id of the given stream. */
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+                       &session_id, &iter.iter, stream, node_session_id.node) {
+               if (!stream->metadata_flag) {
+                       continue;
+               }
+
+               ret = do_sync_metadata(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
+       /*
+        * Force return code to 0 (success) since ret might be ENODATA for instance
+        * which is not an error but rather that we should come back.
+        */
+       ret = 0;
+
+end:
        rcu_read_unlock();
        return ret;
 }
This page took 0.025162 seconds and 4 git commands to generate.