X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;fp=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=32441ea2cd313327cfacf22ab47772f6fea1d39f;hp=ca2c4536cb78f99758ba1a8ec2e2b81fead12f4f;hb=d295668767ac8234e83984e1812d342d03293d88;hpb=fb9a95c4d6242bd8336b638c90a7d8f846125659 diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index ca2c4536c..32441ea2c 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -164,6 +164,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->index_file = NULL; } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; + /* Check and cleanup relayd if needed. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -343,6 +346,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, } /* Free stream within a RCU call. */ + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; consumer_stream_free(stream); } @@ -556,3 +561,87 @@ end: rcu_read_unlock(); return ret; } + +int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, + bool create_index) +{ + int ret; + enum lttng_trace_chunk_status chunk_status; + const int flags = O_WRONLY | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + char stream_path[LTTNG_PATH_MAX]; + + ASSERT_LOCKED(stream->lock); + assert(stream->trace_chunk); + + ret = utils_stream_file_path(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, + stream->chan->tracefile_count, NULL, + stream_path, sizeof(stream_path)); + if (ret < 0) { + goto end; + } + + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Failed to close stream file \"%s\"", + stream->name); + goto end; + } + stream->out_fd = -1; + } + + DBG("Opening stream output file \"%s\"", stream_path); + chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, + flags, mode, &stream->out_fd); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to open stream file \"%s\"", stream->name); + ret = -1; + goto end; + } + + if (!stream->metadata_flag && (create_index || stream->index_file)) { + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + } + stream->index_file = lttng_index_file_create_from_trace_chunk( + stream->trace_chunk, + stream->chan->pathname, + stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR, + false); + if (!stream->index_file) { + ret = -1; + goto end; + } + } + + /* Reset current size because we just perform a rotation. */ + stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; +end: + return ret; +} + +int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream) +{ + int ret; + + stream->tracefile_count_current++; + if (stream->chan->tracefile_count > 0) { + stream->tracefile_count_current %= + stream->chan->tracefile_count; + } + + DBG("Rotating output files of stream \"%s\"", stream->name); + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + goto end; + } + +end: + return ret; +}