/*
- * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
- * David Goulet <dgoulet@efficios.com>
- * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <common/common.h>
#include <common/utils.h>
+#include <common/compat/endian.h>
#include "lttng-relayd.h"
#include "stream.h"
#include "index.h"
+#include "connection.h"
/*
* Allocate a new relay index object. Pass the stream in which it is
}
int relay_index_set_data(struct relay_index *index,
- const struct ctf_packet_index *data)
+ const struct ctf_packet_index *data)
{
int ret = 0;
{
int ret = 1;
bool flushed = false;
- int fd;
pthread_mutex_lock(&index->lock);
if (index->flushed) {
if (!index->has_index_data || !index->index_file) {
goto skip;
}
- fd = index->index_file->fd;
- DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
- " on fd %d", index->stream->stream_handle,
- index->index_n.key, fd);
+
+ DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
+ index->stream->stream_handle, index->index_n.key);
flushed = true;
index->flushed = true;
ret = lttng_index_file_write(index->index_file, &index->index_data);
rcu_read_unlock();
return net_seq_num;
}
+
+/*
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
+ */
+static
+int relay_index_switch_file(struct relay_index *index,
+ struct lttng_index_file *new_index_file,
+ uint64_t removed_data_count)
+{
+ int ret = 0;
+ uint64_t offset;
+
+ pthread_mutex_lock(&index->lock);
+ if (!index->index_file) {
+ ERR("No index_file");
+ ret = 0;
+ goto end;
+ }
+
+ lttng_index_file_put(index->index_file);
+ lttng_index_file_get(new_index_file);
+ index->index_file = new_index_file;
+ offset = be64toh(index->index_data.offset);
+ index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+ int ret = 0;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ ret = relay_index_switch_file(index, stream->index_file,
+ stream->pos_after_last_complete_data_index);
+ if (ret) {
+ goto end;
+ }
+ }
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Set index data from the control port to a given index object.
+ */
+int relay_index_set_control_data(struct relay_index *index,
+ const struct lttcomm_relayd_index *data,
+ unsigned int minor_version)
+{
+ /* The index on disk is encoded in big endian. */
+ const struct ctf_packet_index index_data = {
+ .packet_size = htobe64(data->packet_size),
+ .content_size = htobe64(data->content_size),
+ .timestamp_begin = htobe64(data->timestamp_begin),
+ .timestamp_end = htobe64(data->timestamp_end),
+ .events_discarded = htobe64(data->events_discarded),
+ .stream_id = htobe64(data->stream_id),
+ };
+
+ if (minor_version >= 8) {
+ index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
+ index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+ } else {
+ uint64_t unset_value = -1ULL;
+
+ index->index_data.stream_instance_id = htobe64(unset_value);
+ index->index_data.packet_seq_num = htobe64(unset_value);
+ }
+
+ return relay_index_set_data(index, &index_data);
+}