Dedicated function to wakeup the consumer metadata pipe
[lttng-tools.git] / src / common / consumer / consumer.c
index b449c1637875ec3d9fd480e49982975ca17ab193..2fa65f4d4e53666e18b0278cea052b20177a6158 100644 (file)
@@ -541,6 +541,16 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
        consumer_stream_destroy(stream, metadata_ht);
 }
 
+void consumer_stream_update_channel_attributes(
+               struct lttng_consumer_stream *stream,
+               struct lttng_consumer_channel *channel)
+{
+       stream->channel_read_only_attributes.tracefile_size =
+                       channel->tracefile_size;
+       memcpy(stream->channel_read_only_attributes.path, channel->pathname,
+                       sizeof(stream->channel_read_only_attributes.path));
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -2457,7 +2467,7 @@ void *consumer_thread_data_poll(void *data)
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
-       int nb_fd = 0;
+       int nb_fd = 0, nb_pipes_fd;
        /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
        int nb_inactive_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
@@ -2498,17 +2508,19 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /*
-                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
-                        * wake up pipe.
+                        * Allocate for all fds + 2:
+                        *   +1 for the consumer_data_pipe
+                        *   +1 for wake up pipe
                         */
-                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+                       nb_pipes_fd = 2;
+                       pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = zmalloc((consumer_data.stream_count + 2) *
+                       local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2536,12 +2548,12 @@ void *consumer_thread_data_poll(void *data)
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 2);
+               DBG("polling on %d fd", nb_fd + nb_pipes_fd);
                if (testpoint(consumerd_thread_data_poll)) {
                        goto end;
                }
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 2, -1);
+               num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -3784,6 +3796,63 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
+static
+int rotate_rename_local(const char *old_path, const char *new_path,
+               uid_t uid, gid_t gid)
+{
+       int ret;
+
+       assert(old_path);
+       assert(new_path);
+
+       ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid);
+       if (ret < 0) {
+               ERR("Create directory on rotate");
+               goto end;
+       }
+
+       ret = rename(old_path, new_path);
+       if (ret < 0 && errno != ENOENT) {
+               PERROR("Rename completed rotation chunk");
+               goto end;
+       }
+
+       ret = 0;
+end:
+       return ret;
+}
+
+static
+int rotate_rename_relay(const char *old_path, const char *new_path,
+               uint64_t relayd_id)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd while running rotate_rename_relay command");
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end:
+       return ret;
+}
+
+int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
+               uid_t uid, gid_t gid, uint64_t relayd_id)
+{
+       if (relayd_id != -1ULL) {
+               return rotate_rename_relay(old_path, new_path, relayd_id);
+       } else {
+               return rotate_rename_local(old_path, new_path, uid, gid);
+       }
+}
+
 static
 int mkdir_local(const char *path, uid_t uid, gid_t gid)
 {
This page took 0.026311 seconds and 4 git commands to generate.