On-disk multiple tracefiles circular buffer
[lttng-tools.git] / src / common / consumer.c
index 300fd2a2fc896108ac60c43ce920a5f633be111b..b6e440a486fc5995f9a7caca0eac39631bf3340c 100644 (file)
@@ -28,6 +28,7 @@
 #include <sys/types.h>
 #include <unistd.h>
 #include <inttypes.h>
+#include <signal.h>
 
 #include <common/common.h>
 #include <common/utils.h>
@@ -820,14 +821,16 @@ error:
  *
  * On error, return NULL.
  */
-struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
                const char *name,
                uid_t uid,
                gid_t gid,
                int relayd_id,
-               enum lttng_event_output output)
+               enum lttng_event_output output,
+               uint64_t tracefile_size,
+               uint64_t tracefile_count)
 {
        struct lttng_consumer_channel *channel;
 
@@ -844,6 +847,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
        channel->gid = gid;
        channel->relayd_id = relayd_id;
        channel->output = output;
+       channel->tracefile_size = tracefile_size;
+       channel->tracefile_count = tracefile_count;
 
        strncpy(channel->pathname, pathname, sizeof(channel->pathname));
        channel->pathname[sizeof(channel->pathname) - 1] = '\0';
@@ -876,8 +881,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
 
-       lttng_ht_lookup(consumer_data.channel_ht,
-                       &channel->key, &iter);
+       lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
        node = lttng_ht_iter_get_node_u64(&iter);
        if (node != NULL) {
                /* Channel already exist. Ignore the insertion */
@@ -936,7 +940,12 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                                stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
                        continue;
                }
-               DBG("Active FD %d", stream->wait_fd);
+               /*
+                * This clobbers way too much the debug output. Uncomment that if you
+                * need it for debugging purposes.
+                *
+                * DBG("Active FD %d", stream->wait_fd);
+                */
                (*pollfd)[i].fd = stream->wait_fd;
                (*pollfd)[i].events = POLLIN | POLLPRI;
                local_stream[i] = stream;
@@ -1137,6 +1146,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        }
 
        ctx->consumer_error_socket = -1;
+       ctx->consumer_metadata_socket = -1;
        /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
        ctx->on_recv_channel = recv_channel;
@@ -1223,6 +1233,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
+       ret = close(ctx->consumer_metadata_socket);
+       if (ret) {
+               PERROR("close");
+       }
        utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        utils_close_pipe(ctx->consumer_data_pipe);
@@ -1273,6 +1287,93 @@ end:
        return ret;
 }
 
+/*
+ * Create the tracefile on disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int lttng_create_output_file(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       char full_path[PATH_MAX];
+       char *path_name_id = NULL;
+       char *path;
+
+       assert(stream);
+       assert(stream->net_seq_idx == (uint64_t) -1ULL);
+
+       ret = snprintf(full_path, sizeof(full_path), "%s/%s",
+                       stream->chan->pathname, stream->name);
+       if (ret < 0) {
+               PERROR("snprintf create output file");
+               goto error;
+       }
+
+       /*
+        * If we split the trace in multiple files, we have to add the tracefile
+        * current count at the end of the tracefile name
+        */
+       if (stream->chan->tracefile_size > 0) {
+               ret = asprintf(&path_name_id, "%s_%" PRIu64, full_path,
+                               stream->tracefile_count_current);
+               if (ret < 0) {
+                       PERROR("Allocating path name ID");
+                       goto error;
+               }
+               path = path_name_id;
+       } else {
+               path = full_path;
+       }
+
+       ret = run_as_open(path, O_WRONLY | O_CREAT | O_TRUNC,
+                       S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
+       if (ret < 0) {
+               PERROR("open stream path %s", path);
+               goto error_open;
+       }
+       stream->out_fd = ret;
+       stream->tracefile_size_current = 0;
+
+error_open:
+       free(path_name_id);
+error:
+       return ret;
+}
+
+/*
+ * Change the output tracefile according to the tracefile_size and
+ * tracefile_count parameters. The stream lock MUST be held before calling this
+ * function because we are modifying the stream status.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int rotate_output_file(struct lttng_consumer_stream *stream)
+{
+       int ret;
+
+       assert(stream);
+       assert(stream->tracefile_size_current);
+
+       ret = close(stream->out_fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               goto end;
+       }
+
+       if (stream->chan->tracefile_count > 0) {
+               stream->tracefile_count_current =
+                       (stream->tracefile_count_current + 1) %
+                       stream->chan->tracefile_count;
+       } else {
+               stream->tracefile_count_current++;
+       }
+
+       return lttng_create_output_file(stream);
+
+end:
+       return ret;
+}
+
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile. This is a
  * core function for writing trace buffers to either the local filesystem or
@@ -1324,6 +1425,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
+
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -1379,6 +1481,21 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        } else {
                /* No streaming, we have to set the len with the full padding */
                len += padding;
+
+               /*
+                * Check if we need to change the tracefile before writing the packet.
+                */
+               if (stream->chan->tracefile_size > 0 &&
+                               (stream->tracefile_size_current + len) >
+                               stream->chan->tracefile_size) {
+                       ret = rotate_output_file(stream);
+                       if (ret < 0) {
+                               ERR("Rotating output file");
+                               goto end;
+                       }
+                       outfd = stream->out_fd;
+               }
+               stream->tracefile_size_current += len;
        }
 
        while (len > 0) {
@@ -1541,6 +1658,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        } else {
                /* No streaming, we have to set the len with the full padding */
                len += padding;
+
+               /*
+                * Check if we need to change the tracefile before writing the packet.
+                */
+               if (stream->chan->tracefile_size > 0 &&
+                               (stream->tracefile_size_current + len) >
+                               stream->chan->tracefile_size) {
+                       ret = rotate_output_file(stream);
+                       if (ret < 0) {
+                               ERR("Rotating output file");
+                               goto end;
+                       }
+                       outfd = stream->out_fd;
+               }
+               stream->tracefile_size_current += len;
        }
 
        while (len > 0) {
@@ -2703,6 +2835,33 @@ end_ht:
        return NULL;
 }
 
+static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
+               struct pollfd *sockpoll, int client_socket)
+{
+       int ret;
+
+       assert(ctx);
+       assert(sockpoll);
+
+       if (lttng_consumer_poll_socket(sockpoll) < 0) {
+               ret = -1;
+               goto error;
+       }
+       DBG("Metadata connection on client_socket");
+
+       /* Blocking call, waiting for transmission */
+       ctx->consumer_metadata_socket = lttcomm_accept_unix_sock(client_socket);
+       if (ctx->consumer_metadata_socket < 0) {
+               WARN("On accept metadata");
+               ret = -1;
+               goto error;
+       }
+       ret = 0;
+
+error:
+       return ret;
+}
+
 /*
  * This thread listens on the consumerd socket and receives the file
  * descriptors from the session daemon.
@@ -2769,6 +2928,15 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
+       /*
+        * Setup metadata socket which is the second socket connection on the
+        * command unix socket.
+        */
+       ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
+       if (ret < 0) {
+               goto end;
+       }
+
        /* This socket is not useful anymore. */
        ret = close(client_socket);
        if (ret < 0) {
This page took 0.025752 seconds and 4 git commands to generate.