X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=ca37b8bc41ed842111ee603ff2860e90b288b758;hp=bb038a670b0a763afd41b1343cbbb1e919fbe1f7;hb=1c20f0e29cbf8627bfb1ff444572d52d6655c4e2;hpb=3c4599b9a5c12ceff19368c6cd51e01d81824726 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index bb038a670..ca37b8bc4 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -52,6 +52,7 @@ #include #include "cmd.h" +#include "index.h" #include "utils.h" #include "lttng-relayd.h" @@ -62,7 +63,6 @@ static struct lttng_uri *control_uri; static struct lttng_uri *data_uri; const char *progname; -static int is_root; /* Set to 1 if the daemon is running as root */ /* * Quit pipe for all threads. This permits a single cancellation point @@ -98,6 +98,13 @@ static struct relay_cmd_queue relay_cmd_queue; static char *data_buffer; static unsigned int data_buffer_size; +/* Global hash table that stores relay index object. */ +static struct lttng_ht *indexes_ht; + +/* We need those values for the file/dir creation. */ +static uid_t relayd_uid; +static gid_t relayd_gid; + /* * usage function on stderr */ @@ -758,6 +765,9 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht call_rcu(&stream->rcu_node, deferred_free_stream); } + /* Cleanup index of that stream. */ + relay_index_destroy_by_stream_id(stream->stream_handle, + indexes_ht); } } rcu_read_unlock(); @@ -765,6 +775,28 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht free(cmd->session); } +/* + * Copy index data from the control port to a given index object. + */ +static void copy_index_control_data(struct relay_index *index, + struct lttcomm_relayd_index *data) +{ + assert(index); + assert(data); + + /* + * The index on disk is encoded in big endian, so we don't need to convert + * the data received on the network. The data_offset value is NEVER + * modified here and is updated by the data thread. + */ + index->index_data.packet_size = data->packet_size; + index->index_data.content_size = data->content_size; + index->index_data.timestamp_begin = data->timestamp_begin; + index->index_data.timestamp_end = data->timestamp_end; + index->index_data.events_discarded = data->events_discarded; + index->index_data.stream_id = data->stream_id; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -856,6 +888,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->stream_handle = ++last_relay_stream_id; stream->prev_seq = -1ULL; stream->session = session; + stream->index_fd = -1; ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); if (ret < 0) { @@ -868,7 +901,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, * uses its own credentials for the stream files. */ ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, -1, -1, NULL); + stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL); if (ret < 0) { ERR("Create output file"); goto end; @@ -885,7 +918,8 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, lttng_ht_add_unique_ulong(streams_ht, &stream->stream_n); - DBG("Relay new stream added %s", stream->channel_name); + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, + stream->stream_handle); end: reply.handle = htobe64(stream->stream_handle); @@ -969,6 +1003,13 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, if (delret < 0) { PERROR("close stream"); } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } iter.iter.node = &stream->stream_n.node; delret = lttng_ht_del(streams_ht, &iter); assert(!delret); @@ -1503,12 +1544,132 @@ end_no_session: return ret; } +/* + * Receive an index for a specific stream. + * + * Return 0 on success else a negative value. + */ +static +int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *indexes_ht) +{ + int ret, send_ret, index_created = 0; + struct relay_session *session = cmd->session; + struct lttcomm_relayd_index index_info; + struct relay_index *index, *wr_index = NULL; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + uint64_t net_seq_num; + + assert(cmd); + assert(streams_ht); + assert(indexes_ht); + + DBG("Relay receiving index"); + + if (!session || cmd->version_check_done == 0) { + ERR("Trying to close a stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &index_info, + sizeof(index_info), 0); + if (ret < sizeof(index_info)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid index struct size : %d", ret); + } + ret = -1; + goto end_no_session; + } + + net_seq_num = be64toh(index_info.net_seq_num); + + rcu_read_lock(); + stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id), + streams_ht); + if (!stream) { + ret = -1; + goto end_rcu_unlock; + } + + index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream->stream_handle, net_seq_num); + if (!index) { + goto end_rcu_unlock; + } + index_created = 1; + } + + copy_index_control_data(index, &index_info); + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created, set the data in this + * object and write it on disk. + */ + relay_index_add(index, indexes_ht, &wr_index); + if (wr_index) { + copy_index_control_data(wr_index, &index_info); + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + /* Starting at 2.4, create the index file if none available. */ + if (cmd->minor >= 4 && stream->index_fd < 0) { + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + + ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (ret < 0) { + goto end_rcu_unlock; + } + } + +end_rcu_unlock: + rcu_read_unlock(); + + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (send_ret < 0) { + ERR("Relay sending close index id reply"); + ret = send_ret; + } + +end_no_session: + return ret; +} + /* * relay_process_control: Process the commands received on the control socket */ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *index_streams_ht, + struct lttng_ht *indexes_ht) { int ret = 0; @@ -1543,6 +1704,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_END_DATA_PENDING: ret = relay_end_data_pending(recv_hdr, cmd, streams_ht); break; + case RELAYD_SEND_INDEX: + ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -1559,12 +1723,14 @@ end: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) +int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *indexes_ht) { - int ret = 0; + int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; + struct relay_index *index, *wr_index = NULL; struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id; + uint64_t stream_id, data_offset; uint64_t net_seq_num; uint32_t data_size; @@ -1587,7 +1753,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) stream = relay_stream_from_stream_id(stream_id, streams_ht); if (!stream) { ret = -1; - goto end_unlock; + goto end_rcu_unlock; } data_size = be32toh(data_hdr.data_size); @@ -1599,7 +1765,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ERR("Allocating data buffer"); free(data_buffer); ret = -1; - goto end_unlock; + goto end_rcu_unlock; } data_buffer = tmp_data_ptr; data_buffer_size = data_size; @@ -1617,33 +1783,103 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG("Socket %d did an orderly shutdown", cmd->sock->fd); } ret = -1; - goto end_unlock; + goto end_rcu_unlock; } + /* Check if a rotation is needed. */ if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - ret = utils_rotate_stream_file(stream->path_name, - stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, -1, - stream->fd, &(stream->tracefile_count_current), - &stream->fd); + ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, stream->tracefile_count, + relayd_uid, relayd_gid, stream->fd, + &(stream->tracefile_count_current), &stream->fd); if (ret < 0) { - ERR("Rotating output file"); - goto end; + ERR("Rotating stream output file"); + goto end_rcu_unlock; } - stream->fd = ret; /* Reset current size because we just perform a stream rotation. */ stream->tracefile_size_current = 0; + rotate_index = 1; + } + + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence number. If on + * exists, the control thread already received the data for it thus we need + * to write it on disk. + */ + index = relay_index_find(stream_id, net_seq_num, indexes_ht); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream->stream_handle, net_seq_num); + if (!index) { + goto end_rcu_unlock; + } + index_created = 1; + } + + if (rotate_index || stream->index_fd < 0) { + index->to_close_fd = stream->index_fd; + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + /* This will close the stream's index fd if one. */ + relay_index_free_safe(index); + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + index->fd = stream->index_fd; + index->index_data.offset = data_offset; + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created and set the data. + */ + relay_index_add(index, indexes_ht, &wr_index); + if (wr_index) { + /* Copy back data from the created index. */ + wr_index->fd = index->fd; + wr_index->to_close_fd = index->to_close_fd; + wr_index->index_data.offset = data_offset; + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; } - stream->tracefile_size_current += data_size; + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + /* Starting at 2.4, create the index file if none available. */ + if (cmd->minor >= 4 && stream->index_fd < 0) { + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + + ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (ret < 0) { + goto end_rcu_unlock; + } + } + do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); if (ret < 0 || ret != data_size) { ERR("Relay error writing data to file"); ret = -1; - goto end_unlock; + goto end_rcu_unlock; } DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64, @@ -1651,8 +1887,9 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { - goto end_unlock; + goto end_rcu_unlock; } + stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size); stream->prev_seq = net_seq_num; @@ -1665,6 +1902,11 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) if (cret < 0) { PERROR("close stream process data"); } + + cret = close(stream->index_fd); + if (cret < 0) { + PERROR("close stream index_fd"); + } iter.iter.node = &stream->stream_n.node; ret = lttng_ht_del(streams_ht, &iter); assert(!ret); @@ -1673,7 +1915,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG("Closed tracefile %d after recv data", stream->fd); } -end_unlock: +end_rcu_unlock: rcu_read_unlock(); end: return ret; @@ -1769,6 +2011,7 @@ void *relay_thread_worker(void *data) struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct lttng_ht *streams_ht; + struct lttng_ht *index_streams_ht; struct lttcomm_relayd_hdr recv_hdr; DBG("[thread] Relay worker started"); @@ -1787,6 +2030,12 @@ void *relay_thread_worker(void *data) goto streams_ht_error; } + /* Tables of received indexes indexed by index handle and net_seq_num. */ + indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64); + if (!indexes_ht) { + goto indexes_ht_error; + } + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -1898,7 +2147,9 @@ restart: } ret = relay_process_control(&recv_hdr, relay_connection, - streams_ht); + streams_ht, + index_streams_ht, + indexes_ht); if (ret < 0) { /* Clear the session on error. */ relay_cleanup_poll_connection(&events, pollfd); @@ -1970,7 +2221,8 @@ restart: continue; } - ret = relay_process_data(relay_connection, streams_ht); + ret = relay_process_data(relay_connection, streams_ht, + indexes_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2014,6 +2266,8 @@ error: } rcu_read_unlock(); error_poll_create: + lttng_ht_destroy(indexes_ht); +indexes_ht_error: lttng_ht_destroy(streams_ht); streams_ht_error: lttng_ht_destroy(relay_connections_ht); @@ -2089,10 +2343,12 @@ int main(int argc, char **argv) } } - /* Check if daemon is UID = 0 */ - is_root = !getuid(); + /* We need those values for the file/dir creation. */ + relayd_uid = getuid(); + relayd_gid = getgid(); - if (!is_root) { + /* Check if daemon is UID = 0 */ + if (relayd_uid == 0) { if (control_uri->port < 1024 || data_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); ret = -1;