Move to kernel style SPDX license identifiers
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index a62cef272294d360953e5d685fca745d7509bcff..8318d79d9a09e1451bf90c715d714bb9962f52e0 100644 (file)
@@ -1,20 +1,10 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #define _LGPL_SOURCE
@@ -73,12 +63,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
                        stream->next_net_seq_num - 1);
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        if (ret < 0) {
-               DBG("Unable to close stream on the relayd. Continuing");
-               /*
-                * Continue here. There is nothing we can do for the relayd.
-                * Chances are that the relayd has closed the socket so we just
-                * continue cleaning up.
-                */
+               ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
        /* Both conditions are met, we destroy the relayd. */
@@ -163,14 +149,14 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
                stream->out_fd = -1;
        }
 
-       if (stream->index_fd >= 0) {
-               ret = close(stream->index_fd);
-               if (ret) {
-                       PERROR("close stream index_fd");
-               }
-               stream->index_fd = -1;
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
        }
 
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
+
        /* Check and cleanup relayd if needed. */
        rcu_read_lock();
        relayd = consumer_find_relayd(stream->net_seq_idx);
@@ -350,6 +336,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
        }
 
        /* Free stream within a RCU call. */
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        consumer_stream_free(stream);
 }
 
@@ -359,27 +347,38 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
  * Return 0 on success or else a negative value.
  */
 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
-               struct ctf_packet_index *index)
+               struct ctf_packet_index *element)
 {
        int ret;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
-       assert(index);
+       assert(element);
 
        rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd) {
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_index(&relayd->control_sock, index,
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               struct consumer_relayd_sock_pair *relayd;
+               relayd = consumer_find_relayd(stream->net_seq_idx);
+               if (relayd) {
+                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       ret = relayd_send_index(&relayd->control_sock, element,
                                stream->relayd_stream_id, stream->next_net_seq_num - 1);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                       if (ret < 0) {
+                               /*
+                                * Communication error with lttng-relayd,
+                                * perform cleanup now
+                                */
+                               ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               ret = -1;
+                       }
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               } else {
+                       ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
+                                       stream->key, stream->net_seq_idx);
+                       ret = -1;
+               }
        } else {
-               ssize_t size_ret;
-
-               size_ret = index_write(stream->index_fd, index,
-                               sizeof(struct ctf_packet_index));
-               if (size_ret < sizeof(struct ctf_packet_index)) {
+               if (lttng_index_file_write(stream->index_file, element)) {
                        ret = -1;
                } else {
                        ret = 0;
@@ -552,3 +551,97 @@ end:
        rcu_read_unlock();
        return ret;
 }
+
+int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
+               bool create_index)
+{
+       int ret;
+       enum lttng_trace_chunk_status chunk_status;
+       const int flags = O_WRONLY | O_CREAT | O_TRUNC;
+       const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+       char stream_path[LTTNG_PATH_MAX];
+
+       ASSERT_LOCKED(stream->lock);
+       assert(stream->trace_chunk);
+
+       ret = utils_stream_file_path(stream->chan->pathname, stream->name,
+                       stream->chan->tracefile_size,
+                       stream->tracefile_count_current, NULL,
+                       stream_path, sizeof(stream_path));
+       if (ret < 0) {
+               goto end;
+       }
+
+       if (stream->out_fd >= 0) {
+               ret = close(stream->out_fd);
+               if (ret < 0) {
+                       PERROR("Failed to close stream file \"%s\"",
+                                       stream->name);
+                       goto end;
+               }
+               stream->out_fd = -1;
+        }
+
+       DBG("Opening stream output file \"%s\"", stream_path);
+       chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
+                       flags, mode, &stream->out_fd, false);
+        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ERR("Failed to open stream file \"%s\"", stream->name);
+               ret = -1;
+               goto end;
+        }
+
+       if (!stream->metadata_flag && (create_index || stream->index_file)) {
+               if (stream->index_file) {
+                       lttng_index_file_put(stream->index_file);
+               }
+               chunk_status = lttng_index_file_create_from_trace_chunk(
+                               stream->trace_chunk,
+                               stream->chan->pathname,
+                               stream->name,
+                               stream->chan->tracefile_size,
+                               stream->tracefile_count_current,
+                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
+                               false, &stream->index_file);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret = -1;
+                       goto end;
+               }
+       }
+
+       /* Reset current size because we just perform a rotation. */
+       stream->tracefile_size_current = 0;
+       stream->out_fd_offset = 0;
+end:
+       return ret;
+}
+
+int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       stream->tracefile_count_current++;
+       if (stream->chan->tracefile_count > 0) {
+               stream->tracefile_count_current %=
+                               stream->chan->tracefile_count;
+       }
+
+       DBG("Rotating output files of stream \"%s\"", stream->name);
+       ret = consumer_stream_create_output_files(stream, true);
+       if (ret) {
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
+bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
+{
+       /*
+        * This function does not take a const stream since
+        * cds_lfht_is_node_deleted was not const before liburcu 0.12.
+        */
+       assert(stream);
+       return cds_lfht_is_node_deleted(&stream->node.node);
+}
This page took 0.025548 seconds and 4 git commands to generate.