relayd: create stream files relative to a session's trace chunk
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 4716f9d7923f8d5ddd4ea8a2cf6ee52c790bf846..6650700838fc27596e8c8f086cc7815c8a14a098 100644 (file)
@@ -29,6 +29,9 @@
 #include "stream.h"
 #include "viewer-stream.h"
 
+#include <sys/types.h>
+#include <fcntl.h>
+
 /* Should be called with RCU read-side lock held. */
 bool stream_get(struct relay_stream *stream)
 {
@@ -62,18 +65,88 @@ end:
        return stream;
 }
 
+static int stream_create_data_output_file(struct relay_stream *stream)
+{
+       int ret, fd;
+       enum lttng_trace_chunk_status status;
+       const int flags = O_RDWR | O_CREAT | O_TRUNC;
+       const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+       char stream_path[LTTNG_PATH_MAX];
+
+       ASSERT_LOCKED(stream->lock);
+       assert(stream->trace_chunk);
+
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+
+       ret = utils_stream_file_path(stream->path_name, stream->channel_name,
+                       stream->tracefile_size, stream->tracefile_count, NULL,
+                       stream_path, sizeof(stream_path));
+       if (ret < 0) {
+               goto end;
+       }
+
+       DBG("Opening stream output file \"%s\"", stream_path);
+       status = lttng_trace_chunk_open_file(
+                       stream->trace_chunk, stream_path, flags, mode, &fd);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to open stream file \"%s\"", stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
+       stream->stream_fd = stream_fd_create(fd);
+       if (!stream->stream_fd) {
+               if (close(ret)) {
+                       PERROR("Error closing stream file descriptor %d", ret);
+               }
+               ret = -1;
+               goto end;
+       }
+end:
+       return ret;
+}
+
+static int stream_set_trace_chunk(struct relay_stream *stream,
+               struct lttng_trace_chunk *chunk)
+{
+       int ret = 0;
+       enum lttng_trace_chunk_status status;
+       bool acquired_reference;
+
+       pthread_mutex_lock(&stream->lock);
+       status = lttng_trace_chunk_create_subdirectory(chunk,
+                       stream->path_name);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = -1;
+               goto end;
+       }
+
+       lttng_trace_chunk_put(stream->trace_chunk);
+       acquired_reference = lttng_trace_chunk_get(chunk);
+       assert(acquired_reference);
+       stream->trace_chunk = chunk;
+       ret = stream_create_data_output_file(stream);
+end:
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
+}
+
 /*
  * We keep ownership of path_name and channel_name.
  */
 struct relay_stream *stream_create(struct ctf_trace *trace,
        uint64_t stream_handle, char *path_name,
        char *channel_name, uint64_t tracefile_size,
-       uint64_t tracefile_count,
-       const struct relay_stream_chunk_id *chunk_id)
+       uint64_t tracefile_count)
 {
        int ret;
        struct relay_stream *stream = NULL;
        struct relay_session *session = trace->session;
+       bool acquired_reference = false;
+       struct lttng_trace_chunk *current_trace_chunk;
 
        stream = zmalloc(sizeof(struct relay_stream));
        if (stream == NULL) {
@@ -98,37 +171,32 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
        urcu_ref_init(&stream->ref);
        ctf_trace_get(trace);
        stream->trace = trace;
-       stream->current_chunk_id = *chunk_id;
 
-       stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-       if (!stream->indexes_ht) {
-               ERR("Cannot created indexes_ht");
+       pthread_mutex_lock(&trace->session->lock);
+       current_trace_chunk = trace->session->current_trace_chunk;
+       if (current_trace_chunk) {
+               acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
+       }
+       pthread_mutex_unlock(&trace->session->lock);
+       if (!acquired_reference) {
+               ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
+                               channel_name);
                ret = -1;
                goto end;
        }
 
-       ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
-                       -1, -1);
-       if (ret < 0) {
-               ERR("relay creating output directory");
+       stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!stream->indexes_ht) {
+               ERR("Cannot created indexes_ht");
+               ret = -1;
                goto end;
        }
 
-       /*
-        * No need to use run_as API here because whatever we receive,
-        * the relayd uses its own credentials for the stream files.
-        */
-       ret = utils_create_stream_file(stream->path_name, stream->channel_name,
-                       stream->tracefile_size, 0, -1, -1, NULL);
-       if (ret < 0) {
-               ERR("Create output file");
-               goto end;
-       }
-       stream->stream_fd = stream_fd_create(ret);
-       if (!stream->stream_fd) {
-               if (close(ret)) {
-                       PERROR("Error closing file %d", ret);
-               }
+       ret = stream_set_trace_chunk(stream, current_trace_chunk);
+       if (ret) {
+               ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
+                               trace->session->session_name,
+                               stream->channel_name);
                ret = -1;
                goto end;
        }
@@ -137,16 +205,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
                ret = -1;
                goto end;
        }
-       if (stream->tracefile_size) {
-               DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
-       } else {
-               DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
-       }
-
-       if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) {
-               stream->is_metadata = 1;
-       }
 
+       stream->is_metadata = !strcmp(stream->channel_name,
+                       DEFAULT_METADATA_NAME);
        stream->in_recv_list = true;
 
        /*
@@ -178,6 +239,7 @@ end:
                stream_put(stream);
                stream = NULL;
        }
+       lttng_trace_chunk_put(current_trace_chunk);
        return stream;
 
 error_no_alloc:
@@ -306,6 +368,8 @@ static void stream_release(struct urcu_ref *ref)
                ctf_trace_put(stream->trace);
                stream->trace = NULL;
        }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
 
        call_rcu(&stream->rcu_node, stream_destroy_rcu);
 }
This page took 0.024998 seconds and 4 git commands to generate.