#include "stream.h"
#include "viewer-stream.h"
+#include <sys/types.h>
+#include <fcntl.h>
+
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
return stream;
}
+static int stream_create_data_output_file(struct relay_stream *stream)
+{
+ int ret, fd;
+ enum lttng_trace_chunk_status status;
+ const int flags = O_RDWR | O_CREAT | O_TRUNC;
+ const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+ char stream_path[LTTNG_PATH_MAX];
+
+ ASSERT_LOCKED(stream->lock);
+ assert(stream->trace_chunk);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+
+ ret = utils_stream_file_path(stream->path_name, stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count, NULL,
+ stream_path, sizeof(stream_path));
+ if (ret < 0) {
+ goto end;
+ }
+
+ DBG("Opening stream output file \"%s\"", stream_path);
+ status = lttng_trace_chunk_open_file(
+ stream->trace_chunk, stream_path, flags, mode, &fd);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to open stream file \"%s\"", stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
+ stream->stream_fd = stream_fd_create(fd);
+ if (!stream->stream_fd) {
+ if (close(ret)) {
+ PERROR("Error closing stream file descriptor %d", ret);
+ }
+ ret = -1;
+ goto end;
+ }
+end:
+ return ret;
+}
+
+static int stream_set_trace_chunk(struct relay_stream *stream,
+ struct lttng_trace_chunk *chunk)
+{
+ int ret = 0;
+ enum lttng_trace_chunk_status status;
+ bool acquired_reference;
+
+ pthread_mutex_lock(&stream->lock);
+ status = lttng_trace_chunk_create_subdirectory(chunk,
+ stream->path_name);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ lttng_trace_chunk_put(stream->trace_chunk);
+ acquired_reference = lttng_trace_chunk_get(chunk);
+ assert(acquired_reference);
+ stream->trace_chunk = chunk;
+ ret = stream_create_data_output_file(stream);
+end:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}
+
/*
* We keep ownership of path_name and channel_name.
*/
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count,
- const struct relay_stream_chunk_id *chunk_id)
+ uint64_t tracefile_count)
{
int ret;
struct relay_stream *stream = NULL;
struct relay_session *session = trace->session;
+ bool acquired_reference = false;
+ struct lttng_trace_chunk *current_trace_chunk;
stream = zmalloc(sizeof(struct relay_stream));
if (stream == NULL) {
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
- stream->current_chunk_id = *chunk_id;
- stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!stream->indexes_ht) {
- ERR("Cannot created indexes_ht");
+ pthread_mutex_lock(&trace->session->lock);
+ current_trace_chunk = trace->session->current_trace_chunk;
+ if (current_trace_chunk) {
+ acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
+ }
+ pthread_mutex_unlock(&trace->session->lock);
+ if (!acquired_reference) {
+ ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
+ channel_name);
ret = -1;
goto end;
}
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
- -1, -1);
- if (ret < 0) {
- ERR("relay creating output directory");
+ stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!stream->indexes_ht) {
+ ERR("Cannot created indexes_ht");
+ ret = -1;
goto end;
}
- /*
- * No need to use run_as API here because whatever we receive,
- * the relayd uses its own credentials for the stream files.
- */
- ret = utils_create_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, 0, -1, -1, NULL);
- if (ret < 0) {
- ERR("Create output file");
- goto end;
- }
- stream->stream_fd = stream_fd_create(ret);
- if (!stream->stream_fd) {
- if (close(ret)) {
- PERROR("Error closing file %d", ret);
- }
+ ret = stream_set_trace_chunk(stream, current_trace_chunk);
+ if (ret) {
+ ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
+ trace->session->session_name,
+ stream->channel_name);
ret = -1;
goto end;
}
ret = -1;
goto end;
}
- if (stream->tracefile_size) {
- DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
- } else {
- DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
- }
-
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, LTTNG_NAME_MAX)) {
- stream->is_metadata = 1;
- }
+ stream->is_metadata = !strcmp(stream->channel_name,
+ DEFAULT_METADATA_NAME);
stream->in_recv_list = true;
/*
stream_put(stream);
stream = NULL;
}
+ lttng_trace_chunk_put(current_trace_chunk);
return stream;
error_no_alloc:
ctf_trace_put(stream->trace);
stream->trace = NULL;
}
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
call_rcu(&stream->rcu_node, stream_destroy_rcu);
}