X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=32441ea2cd313327cfacf22ab47772f6fea1d39f;hb=348a81dcf7b6944b10a813d93dcaf86fdb5194f6;hp=d0b1ddef2259fc57342cac15ef5003f19fc03e6f;hpb=23c910e54c8399aceb95704cb2ae1dc5cc981001;p=lttng-tools.git diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index d0b1ddef2..32441ea2c 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -73,12 +73,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, stream->next_net_seq_num - 1); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - DBG("Unable to close stream on the relayd. Continuing"); - /* - * Continue here. There is nothing we can do for the relayd. - * Chances are that the relayd has closed the socket so we just - * continue cleaning up. - */ + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } /* Both conditions are met, we destroy the relayd. */ @@ -168,6 +164,9 @@ void consumer_stream_close(struct lttng_consumer_stream *stream) stream->index_file = NULL; } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; + /* Check and cleanup relayd if needed. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); @@ -347,6 +346,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, } /* Free stream within a RCU call. */ + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; consumer_stream_free(stream); } @@ -371,6 +372,15 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_index(&relayd->control_sock, element, stream->relayd_stream_id, stream->next_net_seq_num - 1); + if (ret < 0) { + /* + * Communication error with lttng-relayd, + * perform cleanup now + */ + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + ret = -1; + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", @@ -551,3 +561,87 @@ end: rcu_read_unlock(); return ret; } + +int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, + bool create_index) +{ + int ret; + enum lttng_trace_chunk_status chunk_status; + const int flags = O_WRONLY | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + char stream_path[LTTNG_PATH_MAX]; + + ASSERT_LOCKED(stream->lock); + assert(stream->trace_chunk); + + ret = utils_stream_file_path(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, + stream->chan->tracefile_count, NULL, + stream_path, sizeof(stream_path)); + if (ret < 0) { + goto end; + } + + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Failed to close stream file \"%s\"", + stream->name); + goto end; + } + stream->out_fd = -1; + } + + DBG("Opening stream output file \"%s\"", stream_path); + chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path, + flags, mode, &stream->out_fd); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to open stream file \"%s\"", stream->name); + ret = -1; + goto end; + } + + if (!stream->metadata_flag && (create_index || stream->index_file)) { + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + } + stream->index_file = lttng_index_file_create_from_trace_chunk( + stream->trace_chunk, + stream->chan->pathname, + stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR, + false); + if (!stream->index_file) { + ret = -1; + goto end; + } + } + + /* Reset current size because we just perform a rotation. */ + stream->tracefile_size_current = 0; + stream->out_fd_offset = 0; +end: + return ret; +} + +int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream) +{ + int ret; + + stream->tracefile_count_current++; + if (stream->chan->tracefile_count > 0) { + stream->tracefile_count_current %= + stream->chan->tracefile_count; + } + + DBG("Rotating output files of stream \"%s\"", stream->name); + ret = consumer_stream_create_output_files(stream, true); + if (ret) { + goto end; + } + +end: + return ret; +}