From 1c20f0e29cbf8627bfb1ff444572d52d6655c4e2 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Mon, 19 Aug 2013 11:35:43 -0400 Subject: [PATCH] Send indexes in streaming mode To write an index on disk, we need to know all the fields sent by the RELAY_SEND_INDEX command as well as the offset of the data in the tracefile. Since the control and data connection are not synchronized, this process happens in two separate steps synchronized by an HT indexed by the index_handle and the net_seq_num. When we receive data and when we receive an index, we lookup in the HT if an entry already exists. If it does, it means that we only need to fill the fields we just received and write the index on disk, otherwise, we allocate a new index, set the fields we know and store it in the HT. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- include/lttng/lttng-error.h | 2 +- src/bin/lttng-relayd/Makefile.am | 4 +- src/bin/lttng-relayd/index.c | 218 +++++++++++++ src/bin/lttng-relayd/index.h | 60 ++++ src/bin/lttng-relayd/lttng-relayd.h | 2 + src/bin/lttng-relayd/main.c | 310 +++++++++++++++++-- src/common/consumer-stream.c | 33 ++ src/common/consumer-stream.h | 6 + src/common/consumer.c | 1 + src/common/defaults.h | 1 + src/common/index/index.c | 20 +- src/common/kernel-consumer/kernel-consumer.c | 24 +- src/common/relayd/relayd.c | 61 ++++ src/common/relayd/relayd.h | 3 + src/common/sessiond-comm/relayd.h | 15 + src/common/sessiond-comm/sessiond-comm.h | 3 + src/common/ust-consumer/ust-consumer.c | 25 +- 17 files changed, 732 insertions(+), 56 deletions(-) create mode 100644 src/bin/lttng-relayd/index.c create mode 100644 src/bin/lttng-relayd/index.h diff --git a/include/lttng/lttng-error.h b/include/lttng/lttng-error.h index 5e0dd07b2..b43c88fbe 100644 --- a/include/lttng/lttng-error.h +++ b/include/lttng/lttng-error.h @@ -82,7 +82,7 @@ enum lttng_error_code { LTTNG_ERR_KERN_STREAM_FAIL = 49, /* Kernel create stream failed */ LTTNG_ERR_START_SESSION_ONCE = 50, /* Session needs to be started once. */ LTTNG_ERR_SNAPSHOT_FAIL = 51, /* Snapshot record failed. */ - /* 52 */ + LTTNG_ERR_NO_STREAM = 52, /* Index without stream on relay. */ LTTNG_ERR_KERN_LIST_FAIL = 53, /* Kernel listing events failed */ LTTNG_ERR_UST_CALIBRATE_FAIL = 54, /* UST calibration failed */ LTTNG_ERR_UST_EVENT_ENABLED = 55, /* UST event already enabled. */ diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index ed8214429..853c21ae9 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -7,6 +7,7 @@ AM_CFLAGS = -fno-strict-aliasing bin_PROGRAMS = lttng-relayd lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ + index.c index.h \ cmd-generic.c cmd-generic.h \ cmd-2-1.c cmd-2-1.h \ cmd-2-2.c cmd-2-2.h @@ -17,4 +18,5 @@ lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ $(top_builddir)/src/common/hashtable/libhashtable.la \ $(top_builddir)/src/common/libcommon.la \ - $(top_builddir)/src/common/compat/libcompat.la + $(top_builddir)/src/common/compat/libcompat.la \ + $(top_builddir)/src/common/index/libindex.la diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c new file mode 100644 index 000000000..97918cd06 --- /dev/null +++ b/src/bin/lttng-relayd/index.c @@ -0,0 +1,218 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include + +#include +#include + +#include "index.h" + +/* + * Deferred free of a relay index object. MUST only be called by a call RCU. + */ +static void deferred_free_relay_index(struct rcu_head *head) +{ + struct relay_index *index = + caa_container_of(head, struct relay_index, rcu_node); + + if (index->to_close_fd >= 0) { + int ret; + + ret = close(index->to_close_fd); + if (ret < 0) { + PERROR("Relay index to close fd %d", index->to_close_fd); + } + } + + relay_index_free(index); +} + +/* + * Allocate a new relay index object using the given stream ID and sequence + * number as the hash table key. + * + * Return allocated object or else NULL on error. + */ +struct relay_index *relay_index_create(uint64_t stream_id, + uint64_t net_seq_num) +{ + struct relay_index *index; + + DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64, + stream_id, net_seq_num); + + index = zmalloc(sizeof(*index)); + if (index == NULL) { + PERROR("Relay index zmalloc"); + goto error; + } + + index->to_close_fd = -1; + lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num); + +error: + return index; +} + +/* + * Find a relayd index in the given hash table. + * + * Return index object or else NULL on error. + */ +struct relay_index *relay_index_find(uint64_t stream_id, + uint64_t net_seq_num, struct lttng_ht *ht) +{ + struct lttng_ht_node_two_u64 *node; + struct lttng_ht_iter iter; + struct lttng_ht_two_u64 key; + struct relay_index *index = NULL; + + assert(ht); + + DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64, + stream_id, net_seq_num); + + key.key1 = stream_id; + key.key2 = net_seq_num; + + lttng_ht_lookup(ht, (void *)(&key), &iter); + node = lttng_ht_iter_get_node_two_u64(&iter); + if (node == NULL) { + goto end; + } + index = caa_container_of(node, struct relay_index, index_n); + +end: + DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, + (index == NULL) ? "NOT " : "", stream_id, net_seq_num); + return index; +} + +/* + * Add unique relay index to the given hash table. In case of a collision, the + * already existing object is put in the given _index variable. + * + * RCU read side lock MUST be acquired. + */ +void relay_index_add(struct relay_index *index, struct lttng_ht *ht, + struct relay_index **_index) +{ + struct cds_lfht_node *node_ptr; + + assert(index); + assert(ht); + assert(_index); + + DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, + index->key.key1, index->key.key2); + + node_ptr = cds_lfht_add_unique(ht->ht, + ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed), + ht->match_fct, (void *) &index->index_n.key, + &index->index_n.node); + if (node_ptr != &index->index_n.node) { + *_index = caa_container_of(node_ptr, struct relay_index, index_n.node); + } +} + +/* + * Write index on disk to the given fd. Once done error or not, it is removed + * from the hash table and destroy the object. + * + * MUST be called with a RCU read side lock held. + * + * Return 0 on success else a negative value. + */ +int relay_index_write(int fd, struct relay_index *index, struct lttng_ht *ht) +{ + int ret; + struct lttng_ht_iter iter; + + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 + " on fd %d", index->key.key1, index->key.key2, fd); + + /* Delete index from hash table. */ + iter.iter.node = &index->index_n.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); + call_rcu(&index->rcu_node, deferred_free_relay_index); + + return index_write(fd, &index->index_data, sizeof(index->index_data)); +} + +/* + * Free the given index. + */ +void relay_index_free(struct relay_index *index) +{ + free(index); +} + +/* + * Safely free the given index using a call RCU. + */ +void relay_index_free_safe(struct relay_index *index) +{ + if (!index) { + return; + } + + call_rcu(&index->rcu_node, deferred_free_relay_index); +} + +/* + * Delete index from the given hash table. + * + * RCU read side lock MUST be acquired. + */ +void relay_index_delete(struct relay_index *index, struct lttng_ht *ht) +{ + int ret; + struct lttng_ht_iter iter; + + DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64 + "deleted.", index->key.key1, index->key.key2); + + /* Delete index from hash table. */ + iter.iter.node = &index->index_n.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); +} + +/* + * Destroy every relay index with the given stream id as part of the key. + */ +void relay_index_destroy_by_stream_id(uint64_t stream_id, struct lttng_ht *ht) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + + assert(ht); + assert(ht->ht); + + rcu_read_lock(); + cds_lfht_for_each_entry(ht->ht, &iter.iter, index, index_n.node) { + if (index->key.key1 == stream_id) { + relay_index_delete(index, ht); + relay_index_free_safe(index); + } + } + rcu_read_unlock(); +} diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h new file mode 100644 index 000000000..fc184e97d --- /dev/null +++ b/src/bin/lttng-relayd/index.h @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _RELAY_INDEX_H +#define _RELAY_INDEX_H + +#include +#include + +#include +#include + +struct relay_index { + /* FD on which to write the index data. */ + int fd; + /* + * When destroying this object, this fd is checked and if valid, close it + * so this is basically a lazy close of the previous fd corresponding to + * the same stream id. This is used for the rotate file feature. + */ + int to_close_fd; + + /* Index packet data. This is the data that is written on disk. */ + struct lttng_packet_index index_data; + + /* key1 = stream_id, key2 = net_seq_num */ + struct lttng_ht_two_u64 key; + struct lttng_ht_node_two_u64 index_n; + struct rcu_head rcu_node; + pthread_mutex_t mutex; +}; + +struct relay_index *relay_index_create(uint64_t stream_id, + uint64_t net_seq_num); +struct relay_index *relay_index_find(uint64_t stream_id, + uint64_t net_seq_num, struct lttng_ht *ht); +void relay_index_add(struct relay_index *index, struct lttng_ht *ht, + struct relay_index **_index); +int relay_index_write(int fd, struct relay_index *index, struct lttng_ht *ht); +void relay_index_free(struct relay_index *index); +void relay_index_free_safe(struct relay_index *index); +void relay_index_delete(struct relay_index *index, struct lttng_ht *ht); +void relay_index_destroy_by_stream_id(uint64_t stream_id, struct lttng_ht *ht); + +#endif /* _RELAY_INDEX_H */ diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 61db23a77..c60280e0f 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -23,6 +23,7 @@ #include #include #include +#include /* * Queue used to enqueue relay requests @@ -60,6 +61,7 @@ struct relay_stream { struct relay_session *session; struct rcu_head rcu_node; int fd; + /* FD on which to write the index data. */ int index_fd; char *path_name; diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index bb038a670..ca37b8bc4 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -52,6 +52,7 @@ #include #include "cmd.h" +#include "index.h" #include "utils.h" #include "lttng-relayd.h" @@ -62,7 +63,6 @@ static struct lttng_uri *control_uri; static struct lttng_uri *data_uri; const char *progname; -static int is_root; /* Set to 1 if the daemon is running as root */ /* * Quit pipe for all threads. This permits a single cancellation point @@ -98,6 +98,13 @@ static struct relay_cmd_queue relay_cmd_queue; static char *data_buffer; static unsigned int data_buffer_size; +/* Global hash table that stores relay index object. */ +static struct lttng_ht *indexes_ht; + +/* We need those values for the file/dir creation. */ +static uid_t relayd_uid; +static gid_t relayd_gid; + /* * usage function on stderr */ @@ -758,6 +765,9 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht call_rcu(&stream->rcu_node, deferred_free_stream); } + /* Cleanup index of that stream. */ + relay_index_destroy_by_stream_id(stream->stream_handle, + indexes_ht); } } rcu_read_unlock(); @@ -765,6 +775,28 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht free(cmd->session); } +/* + * Copy index data from the control port to a given index object. + */ +static void copy_index_control_data(struct relay_index *index, + struct lttcomm_relayd_index *data) +{ + assert(index); + assert(data); + + /* + * The index on disk is encoded in big endian, so we don't need to convert + * the data received on the network. The data_offset value is NEVER + * modified here and is updated by the data thread. + */ + index->index_data.packet_size = data->packet_size; + index->index_data.content_size = data->content_size; + index->index_data.timestamp_begin = data->timestamp_begin; + index->index_data.timestamp_end = data->timestamp_end; + index->index_data.events_discarded = data->events_discarded; + index->index_data.stream_id = data->stream_id; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -856,6 +888,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->stream_handle = ++last_relay_stream_id; stream->prev_seq = -1ULL; stream->session = session; + stream->index_fd = -1; ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); if (ret < 0) { @@ -868,7 +901,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, * uses its own credentials for the stream files. */ ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, -1, -1, NULL); + stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL); if (ret < 0) { ERR("Create output file"); goto end; @@ -885,7 +918,8 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, lttng_ht_add_unique_ulong(streams_ht, &stream->stream_n); - DBG("Relay new stream added %s", stream->channel_name); + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, + stream->stream_handle); end: reply.handle = htobe64(stream->stream_handle); @@ -969,6 +1003,13 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, if (delret < 0) { PERROR("close stream"); } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } iter.iter.node = &stream->stream_n.node; delret = lttng_ht_del(streams_ht, &iter); assert(!delret); @@ -1503,12 +1544,132 @@ end_no_session: return ret; } +/* + * Receive an index for a specific stream. + * + * Return 0 on success else a negative value. + */ +static +int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *indexes_ht) +{ + int ret, send_ret, index_created = 0; + struct relay_session *session = cmd->session; + struct lttcomm_relayd_index index_info; + struct relay_index *index, *wr_index = NULL; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + uint64_t net_seq_num; + + assert(cmd); + assert(streams_ht); + assert(indexes_ht); + + DBG("Relay receiving index"); + + if (!session || cmd->version_check_done == 0) { + ERR("Trying to close a stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &index_info, + sizeof(index_info), 0); + if (ret < sizeof(index_info)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid index struct size : %d", ret); + } + ret = -1; + goto end_no_session; + } + + net_seq_num = be64toh(index_info.net_seq_num); + + rcu_read_lock(); + stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id), + streams_ht); + if (!stream) { + ret = -1; + goto end_rcu_unlock; + } + + index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream->stream_handle, net_seq_num); + if (!index) { + goto end_rcu_unlock; + } + index_created = 1; + } + + copy_index_control_data(index, &index_info); + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created, set the data in this + * object and write it on disk. + */ + relay_index_add(index, indexes_ht, &wr_index); + if (wr_index) { + copy_index_control_data(wr_index, &index_info); + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + /* Starting at 2.4, create the index file if none available. */ + if (cmd->minor >= 4 && stream->index_fd < 0) { + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + + ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (ret < 0) { + goto end_rcu_unlock; + } + } + +end_rcu_unlock: + rcu_read_unlock(); + + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (send_ret < 0) { + ERR("Relay sending close index id reply"); + ret = send_ret; + } + +end_no_session: + return ret; +} + /* * relay_process_control: Process the commands received on the control socket */ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *index_streams_ht, + struct lttng_ht *indexes_ht) { int ret = 0; @@ -1543,6 +1704,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_END_DATA_PENDING: ret = relay_end_data_pending(recv_hdr, cmd, streams_ht); break; + case RELAYD_SEND_INDEX: + ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -1559,12 +1723,14 @@ end: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) +int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *indexes_ht) { - int ret = 0; + int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; + struct relay_index *index, *wr_index = NULL; struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id; + uint64_t stream_id, data_offset; uint64_t net_seq_num; uint32_t data_size; @@ -1587,7 +1753,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) stream = relay_stream_from_stream_id(stream_id, streams_ht); if (!stream) { ret = -1; - goto end_unlock; + goto end_rcu_unlock; } data_size = be32toh(data_hdr.data_size); @@ -1599,7 +1765,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ERR("Allocating data buffer"); free(data_buffer); ret = -1; - goto end_unlock; + goto end_rcu_unlock; } data_buffer = tmp_data_ptr; data_buffer_size = data_size; @@ -1617,33 +1783,103 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG("Socket %d did an orderly shutdown", cmd->sock->fd); } ret = -1; - goto end_unlock; + goto end_rcu_unlock; } + /* Check if a rotation is needed. */ if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - ret = utils_rotate_stream_file(stream->path_name, - stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, -1, - stream->fd, &(stream->tracefile_count_current), - &stream->fd); + ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, stream->tracefile_count, + relayd_uid, relayd_gid, stream->fd, + &(stream->tracefile_count_current), &stream->fd); if (ret < 0) { - ERR("Rotating output file"); - goto end; + ERR("Rotating stream output file"); + goto end_rcu_unlock; } - stream->fd = ret; /* Reset current size because we just perform a stream rotation. */ stream->tracefile_size_current = 0; + rotate_index = 1; + } + + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence number. If on + * exists, the control thread already received the data for it thus we need + * to write it on disk. + */ + index = relay_index_find(stream_id, net_seq_num, indexes_ht); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream->stream_handle, net_seq_num); + if (!index) { + goto end_rcu_unlock; + } + index_created = 1; + } + + if (rotate_index || stream->index_fd < 0) { + index->to_close_fd = stream->index_fd; + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + /* This will close the stream's index fd if one. */ + relay_index_free_safe(index); + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + index->fd = stream->index_fd; + index->index_data.offset = data_offset; + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created and set the data. + */ + relay_index_add(index, indexes_ht, &wr_index); + if (wr_index) { + /* Copy back data from the created index. */ + wr_index->fd = index->fd; + wr_index->to_close_fd = index->to_close_fd; + wr_index->index_data.offset = data_offset; + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; } - stream->tracefile_size_current += data_size; + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + /* Starting at 2.4, create the index file if none available. */ + if (cmd->minor >= 4 && stream->index_fd < 0) { + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + + ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (ret < 0) { + goto end_rcu_unlock; + } + } + do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); if (ret < 0 || ret != data_size) { ERR("Relay error writing data to file"); ret = -1; - goto end_unlock; + goto end_rcu_unlock; } DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64, @@ -1651,8 +1887,9 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { - goto end_unlock; + goto end_rcu_unlock; } + stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size); stream->prev_seq = net_seq_num; @@ -1665,6 +1902,11 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) if (cret < 0) { PERROR("close stream process data"); } + + cret = close(stream->index_fd); + if (cret < 0) { + PERROR("close stream index_fd"); + } iter.iter.node = &stream->stream_n.node; ret = lttng_ht_del(streams_ht, &iter); assert(!ret); @@ -1673,7 +1915,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG("Closed tracefile %d after recv data", stream->fd); } -end_unlock: +end_rcu_unlock: rcu_read_unlock(); end: return ret; @@ -1769,6 +2011,7 @@ void *relay_thread_worker(void *data) struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct lttng_ht *streams_ht; + struct lttng_ht *index_streams_ht; struct lttcomm_relayd_hdr recv_hdr; DBG("[thread] Relay worker started"); @@ -1787,6 +2030,12 @@ void *relay_thread_worker(void *data) goto streams_ht_error; } + /* Tables of received indexes indexed by index handle and net_seq_num. */ + indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64); + if (!indexes_ht) { + goto indexes_ht_error; + } + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -1898,7 +2147,9 @@ restart: } ret = relay_process_control(&recv_hdr, relay_connection, - streams_ht); + streams_ht, + index_streams_ht, + indexes_ht); if (ret < 0) { /* Clear the session on error. */ relay_cleanup_poll_connection(&events, pollfd); @@ -1970,7 +2221,8 @@ restart: continue; } - ret = relay_process_data(relay_connection, streams_ht); + ret = relay_process_data(relay_connection, streams_ht, + indexes_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2014,6 +2266,8 @@ error: } rcu_read_unlock(); error_poll_create: + lttng_ht_destroy(indexes_ht); +indexes_ht_error: lttng_ht_destroy(streams_ht); streams_ht_error: lttng_ht_destroy(relay_connections_ht); @@ -2089,10 +2343,12 @@ int main(int argc, char **argv) } } - /* Check if daemon is UID = 0 */ - is_root = !getuid(); + /* We need those values for the file/dir creation. */ + relayd_uid = getuid(); + relayd_gid = getgid(); - if (!is_root) { + /* Check if daemon is UID = 0 */ + if (relayd_uid == 0) { if (control_uri->port < 1024 || data_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); ret = -1; diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 2bb5ce7e8..920948760 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -322,3 +323,35 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, /* Free stream within a RCU call. */ consumer_stream_free(stream); } + +/* + * Write index of a specific stream either on the relayd or local disk. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_write_index(struct lttng_consumer_stream *stream, + struct lttng_packet_index *index) +{ + int ret; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(index); + + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + ret = relayd_send_index(&relayd->control_sock, index, + stream->relayd_stream_id, stream->next_net_seq_num - 1); + } else { + ret = index_write(stream->index_fd, index, + sizeof(struct lttng_packet_index)); + } + if (ret < 0) { + goto error; + } + +error: + rcu_read_unlock(); + return ret; +} diff --git a/src/common/consumer-stream.h b/src/common/consumer-stream.h index 3096e1e78..956bb6328 100644 --- a/src/common/consumer-stream.h +++ b/src/common/consumer-stream.h @@ -68,4 +68,10 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, */ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream); +/* + * Write index of a specific stream either on the relayd or local disk. + */ +int consumer_stream_write_index(struct lttng_consumer_stream *stream, + struct lttng_packet_index *index); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 0661f1264..191367cd0 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -729,6 +729,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, if (ret < 0) { goto end; } + uatomic_inc(&relayd->refcount); stream->sent_to_relayd = 1; } else { diff --git a/src/common/defaults.h b/src/common/defaults.h index 7a3d59c9f..b8db779dc 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -222,6 +222,7 @@ /* Suffix of an index file. */ #define DEFAULT_INDEX_FILE_SUFFIX ".idx" +#define DEFAULT_INDEX_DIR "index" extern size_t default_channel_subbuf_size; extern size_t default_metadata_subbuf_size; diff --git a/src/common/index/index.c b/src/common/index/index.c index 3d22ca61c..89b4fd769 100644 --- a/src/common/index/index.c +++ b/src/common/index/index.c @@ -18,6 +18,7 @@ #define _GNU_SOURCE #include +#include #include #include @@ -35,8 +36,25 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid, { int ret, fd = -1; struct lttng_packet_index_file_hdr hdr; + char fullpath[PATH_MAX]; - ret = utils_create_stream_file(path_name, stream_name, size, count, uid, + ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR, + path_name); + if (ret < 0) { + PERROR("snprintf index path"); + goto error; + } + + /* Create index directory if necessary. */ + ret = run_as_mkdir(fullpath, S_IRWXU | S_IRWXG, uid, gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Index trace directory creation error"); + goto error; + } + } + + ret = utils_create_stream_file(fullpath, stream_name, size, count, uid, gid, DEFAULT_INDEX_FILE_SUFFIX); if (ret < 0) { goto error; diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index eaccce3d0..315af2eb8 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -907,18 +907,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 0; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; struct lttng_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); - /* Indicate that for this stream we have to write the index. */ - if (stream->index_fd >= 0) { - write_index = 1; - } - /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -942,11 +937,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } - if (!stream->metadata_flag && write_index) { + if (!stream->metadata_flag) { ret = get_index_values(&index, infd); if (ret < 0) { goto end; } + } else { + write_index = 0; } switch (stream->chan->output) { @@ -1028,12 +1025,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } /* Write index if needed. */ - if (write_index) { - err = index_write(stream->index_fd, &index, sizeof(index)); - if (err < 0) { - ret = -1; - goto end; - } + if (!write_index) { + goto end; + } + + err = consumer_stream_write_index(stream, &index); + if (err < 0) { + goto end; } end: diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 2283865cf..bb20a64a0 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -26,6 +26,7 @@ #include #include #include +#include #include "relayd.h" @@ -671,3 +672,63 @@ int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id, error: return ret; } + +/* + * Send index to the relayd. + */ +int relayd_send_index(struct lttcomm_relayd_sock *rsock, + struct lttng_packet_index *index, uint64_t relay_stream_id, + uint64_t net_seq_num) +{ + int ret; + struct lttcomm_relayd_index msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + if (rsock->minor < 4) { + DBG("Not sending indexes before protocol 2.4"); + ret = 0; + goto error; + } + + DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id); + + msg.relay_stream_id = htobe64(relay_stream_id); + msg.net_seq_num = htobe64(net_seq_num); + + /* The index is already in big endian. */ + msg.packet_size = index->packet_size; + msg.content_size = index->content_size; + msg.timestamp_begin = index->timestamp_begin; + msg.timestamp_end = index->timestamp_end; + msg.events_discarded = index->events_discarded; + msg.stream_id = index->stream_id; + + /* Send command */ + ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Receive response */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd send index replied error %d", reply.ret_code); + } else { + /* Success */ + ret = 0; + } + +error: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index dd435e905..a49bab733 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -43,5 +43,8 @@ int relayd_quiescent_control(struct lttcomm_relayd_sock *sock, int relayd_begin_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id); int relayd_end_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id, unsigned int *is_data_inflight); +int relayd_send_index(struct lttcomm_relayd_sock *rsock, + struct lttng_packet_index *index, uint64_t relay_stream_id, + uint64_t net_seq_num); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index ed094ac9d..3df68682a 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -26,6 +26,7 @@ #include #include +#include #include #define RELAYD_VERSION_COMM_MAJOR VERSION_MAJOR @@ -149,4 +150,18 @@ struct lttcomm_relayd_quiescent_control { uint64_t stream_id; } LTTNG_PACKED; +/* + * Index data. + */ +struct lttcomm_relayd_index { + uint64_t relay_stream_id; + uint64_t net_seq_num; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + uint64_t stream_id; +} LTTNG_PACKED; + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index cf6262e6a..39ab69bf9 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -103,6 +103,9 @@ enum lttcomm_relayd_command { RELAYD_QUIESCENT_CONTROL = 9, RELAYD_BEGIN_DATA_PENDING = 10, RELAYD_END_DATA_PENDING = 11, + RELAYD_ADD_INDEX = 12, + RELAYD_SEND_INDEX = 13, + RELAYD_CLOSE_INDEX = 14, }; /* diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index f0147af4e..113ae959e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1664,7 +1664,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 0; + int err, write_index = 1; long ret = 0; char dummy; struct ustctl_consumer_stream *ustream; @@ -1680,11 +1680,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Ease our life for what's next. */ ustream = stream->ustream; - /* Indicate that for this stream we have to write the index. */ - if (stream->index_fd >= 0) { - write_index = 1; - } - /* We can consume the 1 byte written into the wait_fd by UST */ if (stream->monitor && !stream->hangup_flush_done) { ssize_t readlen; @@ -1743,12 +1738,14 @@ retry: } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); - if (!stream->metadata_flag && write_index) { + if (!stream->metadata_flag) { index.offset = htobe64(stream->out_fd_offset); ret = get_index_values(&index, ustream); if (ret < 0) { goto end; } + } else { + write_index = 0; } /* Get the full padded subbuffer size */ @@ -1788,12 +1785,14 @@ retry: assert(err == 0); /* Write index if needed. */ - if (write_index) { - err = index_write(stream->index_fd, &index, sizeof(index)); - if (err < 0) { - ret = -1; - goto end; - } + if (!write_index) { + goto end; + } + + assert(!stream->metadata_flag); + err = consumer_stream_write_index(stream, &index); + if (err < 0) { + goto end; } end: -- 2.34.1