X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fstream.c;h=6650700838fc27596e8c8f086cc7815c8a14a098;hp=4716f9d7923f8d5ddd4ea8a2cf6ee52c790bf846;hb=348a81dcf7b6944b10a813d93dcaf86fdb5194f6;hpb=2abe796968937298012c0ec668f7fc88305683f2 diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 4716f9d79..665070083 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -29,6 +29,9 @@ #include "stream.h" #include "viewer-stream.h" +#include +#include + /* Should be called with RCU read-side lock held. */ bool stream_get(struct relay_stream *stream) { @@ -62,18 +65,88 @@ end: return stream; } +static int stream_create_data_output_file(struct relay_stream *stream) +{ + int ret, fd; + enum lttng_trace_chunk_status status; + const int flags = O_RDWR | O_CREAT | O_TRUNC; + const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + char stream_path[LTTNG_PATH_MAX]; + + ASSERT_LOCKED(stream->lock); + assert(stream->trace_chunk); + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + + ret = utils_stream_file_path(stream->path_name, stream->channel_name, + stream->tracefile_size, stream->tracefile_count, NULL, + stream_path, sizeof(stream_path)); + if (ret < 0) { + goto end; + } + + DBG("Opening stream output file \"%s\"", stream_path); + status = lttng_trace_chunk_open_file( + stream->trace_chunk, stream_path, flags, mode, &fd); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ERR("Failed to open stream file \"%s\"", stream->channel_name); + ret = -1; + goto end; + } + + stream->stream_fd = stream_fd_create(fd); + if (!stream->stream_fd) { + if (close(ret)) { + PERROR("Error closing stream file descriptor %d", ret); + } + ret = -1; + goto end; + } +end: + return ret; +} + +static int stream_set_trace_chunk(struct relay_stream *stream, + struct lttng_trace_chunk *chunk) +{ + int ret = 0; + enum lttng_trace_chunk_status status; + bool acquired_reference; + + pthread_mutex_lock(&stream->lock); + status = lttng_trace_chunk_create_subdirectory(chunk, + stream->path_name); + if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -1; + goto end; + } + + lttng_trace_chunk_put(stream->trace_chunk); + acquired_reference = lttng_trace_chunk_get(chunk); + assert(acquired_reference); + stream->trace_chunk = chunk; + ret = stream_create_data_output_file(stream); +end: + pthread_mutex_unlock(&stream->lock); + return ret; +} + /* * We keep ownership of path_name and channel_name. */ struct relay_stream *stream_create(struct ctf_trace *trace, uint64_t stream_handle, char *path_name, char *channel_name, uint64_t tracefile_size, - uint64_t tracefile_count, - const struct relay_stream_chunk_id *chunk_id) + uint64_t tracefile_count) { int ret; struct relay_stream *stream = NULL; struct relay_session *session = trace->session; + bool acquired_reference = false; + struct lttng_trace_chunk *current_trace_chunk; stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { @@ -98,37 +171,32 @@ struct relay_stream *stream_create(struct ctf_trace *trace, urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; - stream->current_chunk_id = *chunk_id; - stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!stream->indexes_ht) { - ERR("Cannot created indexes_ht"); + pthread_mutex_lock(&trace->session->lock); + current_trace_chunk = trace->session->current_trace_chunk; + if (current_trace_chunk) { + acquired_reference = lttng_trace_chunk_get(current_trace_chunk); + } + pthread_mutex_unlock(&trace->session->lock); + if (!acquired_reference) { + ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired", + channel_name); ret = -1; goto end; } - ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, - -1, -1); - if (ret < 0) { - ERR("relay creating output directory"); + stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!stream->indexes_ht) { + ERR("Cannot created indexes_ht"); + ret = -1; goto end; } - /* - * No need to use run_as API here because whatever we receive, - * the relayd uses its own credentials for the stream files. - */ - ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, -1, -1, NULL); - if (ret < 0) { - ERR("Create output file"); - goto end; - } - stream->stream_fd = stream_fd_create(ret); - if (!stream->stream_fd) { - if (close(ret)) { - PERROR("Error closing file %d", ret); - } + ret = stream_set_trace_chunk(stream, current_trace_chunk); + if (ret) { + ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"", + trace->session->session_name, + stream->channel_name); ret = -1; goto end; } @@ -137,16 +205,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace, ret = -1; goto end; } - if (stream->tracefile_size) { - DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); - } else { - DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); - } - - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) { - stream->is_metadata = 1; - } + stream->is_metadata = !strcmp(stream->channel_name, + DEFAULT_METADATA_NAME); stream->in_recv_list = true; /* @@ -178,6 +239,7 @@ end: stream_put(stream); stream = NULL; } + lttng_trace_chunk_put(current_trace_chunk); return stream; error_no_alloc: @@ -306,6 +368,8 @@ static void stream_release(struct urcu_ref *ref) ctf_trace_put(stream->trace); stream->trace = NULL; } + lttng_trace_chunk_put(stream->trace_chunk); + stream->trace_chunk = NULL; call_rcu(&stream->rcu_node, stream_destroy_rcu); }