#include <unistd.h>
#include <common/common.h>
+#include <common/index/index.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
stream->out_fd = -1;
}
+ if (stream->index_fd >= 0) {
+ ret = close(stream->index_fd);
+ if (ret) {
+ PERROR("close stream index_fd");
+ }
+ stream->index_fd = -1;
+ }
+
/* Check and cleanup relayd if needed. */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
if (stream->globally_visible) {
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
- pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
consumer_stream_delete(stream, ht);
consumer_data.need_update = 1;
pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
} else {
/* Free stream within a RCU call. */
consumer_stream_free(stream);
}
+
+/*
+ * Write index of a specific stream either on the relayd or local disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_write_index(struct lttng_consumer_stream *stream,
+ struct lttng_packet_index *index)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+ assert(index);
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd) {
+ ret = relayd_send_index(&relayd->control_sock, index,
+ stream->relayd_stream_id, stream->next_net_seq_num - 1);
+ } else {
+ ret = index_write(stream->index_fd, index,
+ sizeof(struct lttng_packet_index));
+ }
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ rcu_read_unlock();
+ return ret;
+}