Implement the relayd live features
[lttng-tools.git] / src / common / consumer-stream.c
index 02887fcc457aaa3deb435d94bfc3ccb04fa2496b..920948760264405f16941f0069617513e9d27d6f 100644 (file)
@@ -24,6 +24,7 @@
 #include <unistd.h>
 
 #include <common/common.h>
+#include <common/index/index.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 
@@ -135,6 +136,14 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                stream->out_fd = -1;
        }
 
+       if (stream->index_fd >= 0) {
+               ret = close(stream->index_fd);
+               if (ret) {
+                       PERROR("close stream index_fd");
+               }
+               stream->index_fd = -1;
+       }
+
        /* Check and cleanup relayd if needed. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -281,7 +290,6 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                if (stream->globally_visible) {
                        pthread_mutex_lock(&consumer_data.lock);
                        pthread_mutex_lock(&stream->chan->lock);
-                       pthread_mutex_lock(&stream->chan->timer_lock);
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
                        consumer_stream_delete(stream, ht);
@@ -295,7 +303,6 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                        consumer_data.need_update = 1;
 
                        pthread_mutex_unlock(&stream->lock);
-                       pthread_mutex_unlock(&stream->chan->timer_lock);
                        pthread_mutex_unlock(&stream->chan->lock);
                        pthread_mutex_unlock(&consumer_data.lock);
                } else {
@@ -316,3 +323,35 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        /* Free stream within a RCU call. */
        consumer_stream_free(stream);
 }
+
+/*
+ * Write index of a specific stream either on the relayd or local disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_write_index(struct lttng_consumer_stream *stream,
+               struct lttng_packet_index *index)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(index);
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               ret = relayd_send_index(&relayd->control_sock, index,
+                               stream->relayd_stream_id, stream->next_net_seq_num - 1);
+       } else {
+               ret = index_write(stream->index_fd, index,
+                               sizeof(struct lttng_packet_index));
+       }
+       if (ret < 0) {
+               goto error;
+       }
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
This page took 0.023165 seconds and 4 git commands to generate.