X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=8318d79d9a09e1451bf90c715d714bb9962f52e0;hp=522b3cd5cd8ac0c45876c12dd7e344c8c7c27118;hb=ab5be9fa2eb5ba9600a82cd18fd3cfcbac69169a;hpb=f8f3885cc52af9d3c951da78989d6f4a25270411 diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 522b3cd5c..8318d79d9 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -1,20 +1,10 @@ /* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * Copyright (C) 2013 - David Goulet + * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 Mathieu Desnoyers + * Copyright (C) 2013 David Goulet * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE @@ -73,12 +63,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, stream->next_net_seq_num - 1); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - DBG("Unable to close stream on the relayd. Continuing"); - /* - * Continue here. There is nothing we can do for the relayd. - * Chances are that the relayd has closed the socket so we just - * continue cleaning up. - */ + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } /* Both conditions are met, we destroy the relayd. */ @@ -168,6 +154,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->index_file = NULL; } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; + /* Check and cleanup relayd if needed. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -347,6 +336,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, } /* Free stream within a RCU call. */ + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; consumer_stream_free(stream); } @@ -359,18 +350,33 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, struct ctf_packet_index *element) { int ret; - struct consumer_relayd_sock_pair *relayd; assert(stream); assert(element); rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_send_index(&relayd->control_sock, element, + if (stream->net_seq_idx != (uint64_t) -1ULL) { + struct consumer_relayd_sock_pair *relayd; + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_send_index(&relayd->control_sock, element, stream->relayd_stream_id, stream->next_net_seq_num - 1); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + /* + * Communication error with lttng-relayd, + * perform cleanup now + */ + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + ret = -1; + } + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", + stream->key, stream->net_seq_idx); + ret = -1; + } } else { if (lttng_index_file_write(stream->index_file, element)) { ret = -1; @@ -545,3 +551,97 @@ end: rcu_read_unlock(); return ret; } + +int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, + bool create_index) +{ + int ret; + enum lttng_trace_chunk_status chunk_status; + const int flags = O_WRONLY | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + char stream_path[LTTNG_PATH_MAX]; + + ASSERT_LOCKED(stream->lock); + assert(stream->trace_chunk); + + ret = utils_stream_file_path(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, NULL, + stream_path, sizeof(stream_path)); + if (ret < 0) { + goto end; + } + + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Failed to close stream file \"%s\"", + stream->name); + goto end; + } + stream->out_fd = -1; + } + + DBG("Opening stream output file \"%s\"", stream_path); + chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, + flags, mode, &stream->out_fd, false); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to open stream file \"%s\"", stream->name); + ret = -1; + goto end; + } + + if (!stream->metadata_flag && (create_index || stream->index_file)) { + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + } + chunk_status = lttng_index_file_create_from_trace_chunk( + stream->trace_chunk, + stream->chan->pathname, + stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR, + false, &stream->index_file); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + } + + /* Reset current size because we just perform a rotation. */ + stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; +end: + return ret; +} + +int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream) +{ + int ret; + + stream->tracefile_count_current++; + if (stream->chan->tracefile_count > 0) { + stream->tracefile_count_current %= + stream->chan->tracefile_count; + } + + DBG("Rotating output files of stream \"%s\"", stream->name); + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + goto end; + } + +end: + return ret; +} + +bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream) +{ + /* + * This function does not take a const stream since + * cds_lfht_is_node_deleted was not const before liburcu 0.12. + */ + assert(stream); + return cds_lfht_is_node_deleted(&stream->node.node); +}