Dedicated function to wakeup the consumer metadata pipe
[lttng-tools.git] / src / common / consumer / consumer.c
index f0bacfb8ff0c897fd3259168a7733f8427243dc3..2fa65f4d4e53666e18b0278cea052b20177a6158 100644 (file)
@@ -47,6 +47,7 @@
 #include <common/consumer/consumer-stream.h>
 #include <common/consumer/consumer-testpoint.h>
 #include <common/align.h>
+#include <common/consumer/consumer-metadata-cache.h>
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -66,13 +67,16 @@ struct consumer_channel_msg {
        uint64_t key;                           /* del */
 };
 
+/* Flag used to temporarily pause data consumption from testpoints. */
+int data_consumption_paused;
+
 /*
  * Flag to inform the polling thread to quit when all fd hung up. Updated by
  * the consumer_thread_receive_fds when it notices that all fds has hung up.
  * Also updated by the signal handler (consumer_should_exit()). Read by the
  * polling threads.
  */
-volatile int consumer_quit;
+int consumer_quit;
 
 /*
  * Global hash table containing respectively metadata and data streams. The
@@ -367,6 +371,9 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->live_timer_enabled == 1) {
                consumer_timer_live_stop(channel);
        }
+       if (channel->monitor_timer_enabled == 1) {
+               consumer_timer_monitor_stop(channel);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -534,6 +541,16 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
        consumer_stream_destroy(stream, metadata_ht);
 }
 
+void consumer_stream_update_channel_attributes(
+               struct lttng_consumer_stream *stream,
+               struct lttng_consumer_channel *channel)
+{
+       stream->channel_read_only_attributes.tracefile_size =
+                       channel->tracefile_size;
+       memcpy(stream->channel_read_only_attributes.path, channel->pathname,
+                       sizeof(stream->channel_read_only_attributes.path));
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -570,7 +587,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->session_id = session_id;
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
-       stream->index_fd = -1;
+       stream->index_file = NULL;
+       stream->last_sequence_number = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -622,10 +640,9 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-int consumer_add_data_stream(struct lttng_consumer_stream *stream)
+void consumer_add_data_stream(struct lttng_consumer_stream *stream)
 {
        struct lttng_ht *ht = data_ht;
-       int ret = 0;
 
        assert(stream);
        assert(ht);
@@ -675,8 +692,6 @@ int consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
-
-       return ret;
 }
 
 void consumer_del_data_stream(struct lttng_consumer_stream *stream)
@@ -1020,7 +1035,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
 
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
-       DBG("Allocated channel (key %" PRIu64 ")", channel->key)
+       DBG("Allocated channel (key %" PRIu64 ")", channel->key);
 
 end:
        return channel;
@@ -1069,7 +1084,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
  */
 static int update_poll_array(struct lttng_consumer_local_data *ctx,
                struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
-               struct lttng_ht *ht)
+               struct lttng_ht *ht, int *nb_inactive_fd)
 {
        int i = 0;
        struct lttng_ht_iter iter;
@@ -1081,6 +1096,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
        assert(local_stream);
 
        DBG("Updating poll fd array");
+       *nb_inactive_fd = 0;
        rcu_read_lock();
        cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
                /*
@@ -1091,9 +1107,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                 * just after the check. However, this is OK since the stream(s) will
                 * be deleted once the thread is notified that the end point state has
                 * changed where this function will be called back again.
+                *
+                * We track the number of inactive FDs because they still need to be
+                * closed by the polling thread after a wakeup on the data_pipe or
+                * metadata_pipe.
                 */
                if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
                                stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                       (*nb_inactive_fd)++;
                        continue;
                }
                /*
@@ -1219,7 +1240,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
 
-       consumer_quit = 1;
+       CMM_STORE_SHARED(consumer_quit, 1);
        ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
        if (ret < 1) {
                PERROR("write consumer quit");
@@ -1228,9 +1249,15 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
        DBG("Consumer flag that it should quit");
 }
 
+
+/*
+ * Flush pending writes to trace output disk file.
+ */
+static
 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
                off_t orig_offset)
 {
+       int ret;
        int outfd = stream->out_fd;
 
        /*
@@ -1261,8 +1288,12 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
         * defined. So it can be expected to lead to lower throughput in
         * streaming.
         */
-       posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+       ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size,
                        stream->max_sb_size, POSIX_FADV_DONTNEED);
+       if (ret && ret != -ENOSYS) {
+               errno = ret;
+               PERROR("posix_fadvise on fd %i", outfd);
+       }
 }
 
 /*
@@ -1336,6 +1367,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_metadata_pipe;
        }
 
+       ctx->channel_monitor_pipe = -1;
+
        return ctx;
 
 error_metadata_pipe:
@@ -1519,7 +1552,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
                if (ret < 0) {
-                       ret = -errno;
                        PERROR("tracer ctl get_mmap_read_offset");
                        goto end;
                }
@@ -1555,6 +1587,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                if (stream->metadata_flag) {
                        /* Metadata requires the control socket. */
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+                       if (stream->reset_metadata_flag) {
+                               ret = relayd_reset_metadata(&relayd->control_sock,
+                                               stream->relayd_stream_id,
+                                               stream->metadata_version);
+                               if (ret < 0) {
+                                       relayd_hang_up = 1;
+                                       goto write_error;
+                               }
+                               stream->reset_metadata_flag = 0;
+                       }
                        netlen += sizeof(struct lttcomm_relayd_metadata_payload);
                }
 
@@ -1578,6 +1620,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                /* No streaming, we have to set the len with the full padding */
                len += padding;
 
+               if (stream->metadata_flag && stream->reset_metadata_flag) {
+                       ret = utils_truncate_stream_file(stream->out_fd, 0);
+                       if (ret < 0) {
+                               ERR("Reset metadata file");
+                               goto end;
+                       }
+                       stream->reset_metadata_flag = 0;
+               }
+
                /*
                 * Check if we need to change the tracefile before writing the packet.
                 */
@@ -1595,15 +1646,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1656,8 +1708,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                lttng_sync_file_range(outfd, stream->out_fd_offset, len,
                                SYNC_FILE_RANGE_WRITE);
                stream->out_fd_offset += len;
+               lttng_consumer_sync_trace_file(stream, orig_offset);
        }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
 
 write_error:
        /*
@@ -1737,6 +1789,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                         */
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
 
+                       if (stream->reset_metadata_flag) {
+                               ret = relayd_reset_metadata(&relayd->control_sock,
+                                               stream->relayd_stream_id,
+                                               stream->metadata_version);
+                               if (ret < 0) {
+                                       relayd_hang_up = 1;
+                                       goto write_error;
+                               }
+                               stream->reset_metadata_flag = 0;
+                       }
                        ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
                                        padding);
                        if (ret < 0) {
@@ -1760,6 +1822,14 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                /* No streaming, we have to set the len with the full padding */
                len += padding;
 
+               if (stream->metadata_flag && stream->reset_metadata_flag) {
+                       ret = utils_truncate_stream_file(stream->out_fd, 0);
+                       if (ret < 0) {
+                               ERR("Reset metadata file");
+                               goto end;
+                       }
+                       stream->reset_metadata_flag = 0;
+               }
                /*
                 * Check if we need to change the tracefile before writing the packet.
                 */
@@ -1778,22 +1848,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = close(stream->index_fd);
-                               if (ret < 0) {
-                                       PERROR("Closing index");
-                                       goto end;
-                               }
-                               stream->index_fd = -1;
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
-                                       written = ret;
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1868,7 +1932,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                stream->output_written += ret_splice;
                written += ret_splice;
        }
-       lttng_consumer_sync_trace_file(stream, orig_offset);
+       if (!relayd) {
+               lttng_consumer_sync_trace_file(stream, orig_offset);
+       }
        goto end;
 
 write_error:
@@ -2008,6 +2074,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
+       if (stream->chan->metadata_cache) {
+               /* Only applicable to userspace consumers. */
+               pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       }
 
        /* Remove any reference to that stream. */
        consumer_stream_delete(stream, ht);
@@ -2031,6 +2101,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
         */
        stream->chan->metadata_stream = NULL;
 
+       if (stream->chan->metadata_cache) {
+               pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
@@ -2046,10 +2119,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
+void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 {
        struct lttng_ht *ht = metadata_ht;
-       int ret = 0;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
 
@@ -2093,7 +2165,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 
        lttng_ht_add_unique_u64(ht, &stream->node);
 
-       lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+       lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
                &stream->node_channel_id);
 
        /*
@@ -2109,7 +2181,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&consumer_data.lock);
-       return ret;
 }
 
 /*
@@ -2217,10 +2288,10 @@ restart:
                DBG("Metadata poll return from wait with %d fd(s)",
                                LTTNG_POLL_GETNB(&events));
                health_poll_exit();
-               DBG("Metadata event catched in thread");
+               DBG("Metadata event caught in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
-                               ERR("Poll EINTR catched");
+                               ERR("Poll EINTR caught");
                                goto restart;
                        }
                        if (LTTNG_POLL_GETNB(&events) == 0) {
@@ -2318,7 +2389,7 @@ restart:
                                        len = ctx->on_buffer_ready(stream, ctx);
                                        /*
                                         * We don't check the return value here since if we get
-                                        * a negative len, it means an error occured thus we
+                                        * a negative len, it means an error occurred thus we
                                         * simply remove it from the poll set and free the
                                         * stream.
                                         */
@@ -2345,7 +2416,7 @@ restart:
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
-                                                * a negative len, it means an error occured thus we
+                                                * a negative len, it means an error occurred thus we
                                                 * simply remove it from the poll set and free the
                                                 * stream.
                                                 */
@@ -2396,7 +2467,9 @@ void *consumer_thread_data_poll(void *data)
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
-       int nb_fd = 0;
+       int nb_fd = 0, nb_pipes_fd;
+       /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
+       int nb_inactive_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
@@ -2435,17 +2508,19 @@ void *consumer_thread_data_poll(void *data)
                        local_stream = NULL;
 
                        /*
-                        * Allocate for all fds +1 for the consumer_data_pipe and +1 for
-                        * wake up pipe.
+                        * Allocate for all fds + 2:
+                        *   +1 for the consumer_data_pipe
+                        *   +1 for wake up pipe
                         */
-                       pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd));
+                       nb_pipes_fd = 2;
+                       pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
                                goto end;
                        }
 
-                       local_stream = zmalloc((consumer_data.stream_count + 2) *
+                       local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
                                        sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
@@ -2453,7 +2528,7 @@ void *consumer_thread_data_poll(void *data)
                                goto end;
                        }
                        ret = update_poll_array(ctx, &pollfd, local_stream,
-                                       data_ht);
+                                       data_ht, &nb_inactive_fd);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
@@ -2466,15 +2541,19 @@ void *consumer_thread_data_poll(void *data)
                pthread_mutex_unlock(&consumer_data.lock);
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && consumer_quit == 1) {
+               if (nb_fd == 0 && nb_inactive_fd == 0 &&
+                               CMM_LOAD_SHARED(consumer_quit) == 1) {
                        err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
        restart:
-               DBG("polling on %d fd", nb_fd + 2);
+               DBG("polling on %d fd", nb_fd + nb_pipes_fd);
+               if (testpoint(consumerd_thread_data_poll)) {
+                       goto end;
+               }
                health_poll_entry();
-               num_rdy = poll(pollfd, nb_fd + 2, -1);
+               num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
                health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
@@ -2492,6 +2571,12 @@ void *consumer_thread_data_poll(void *data)
                        goto end;
                }
 
+               if (caa_unlikely(data_consumption_paused)) {
+                       DBG("Data consumption paused, sleeping...");
+                       sleep(1);
+                       goto restart;
+               }
+
                /*
                 * If the consumer_data_pipe triggered poll go directly to the
                 * beginning of the loop to update the array. We want to prioritize
@@ -2800,10 +2885,10 @@ restart:
                DBG("Channel poll return from wait with %d fd(s)",
                                LTTNG_POLL_GETNB(&events));
                health_poll_exit();
-               DBG("Channel event catched in thread");
+               DBG("Channel event caught in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
-                               ERR("Poll EINTR catched");
+                               ERR("Poll EINTR caught");
                                goto restart;
                        }
                        if (LTTNG_POLL_GETNB(&events) == 0) {
@@ -3134,7 +3219,7 @@ void *consumer_thread_sessiond_poll(void *data)
                        err = 0;
                        goto end;
                }
-               if (consumer_quit) {
+               if (CMM_LOAD_SHARED(consumer_quit)) {
                        DBG("consumer_thread_receive_fds received quit from signal");
                        err = 0;        /* All is OK */
                        goto end;
@@ -3159,7 +3244,7 @@ end:
         * when all fds have hung up, the polling thread
         * can exit cleanly
         */
-       consumer_quit = 1;
+       CMM_STORE_SHARED(consumer_quit, 1);
 
        /*
         * Notify the data poll thread to poll back again and test the
@@ -3291,7 +3376,7 @@ error:
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
-int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
+ void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
                struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
@@ -3313,7 +3398,6 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
-                       ret = -ENOMEM;
                        ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                } else {
@@ -3346,14 +3430,12 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        if (ret) {
                /* Needing to exit in the middle of a command: error. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
-               ret = -EINTR;
                goto error_nosignal;
        }
 
        /* Get relayd socket from session daemon */
        ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
        if (ret != sizeof(fd)) {
-               ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
 
                /*
@@ -3427,7 +3509,6 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                break;
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
-               ret = -1;
                ret_code = LTTCOMM_CONSUMERD_FATAL;
                goto error;
        }
@@ -3451,7 +3532,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        add_relayd(relayd);
 
        /* All good! */
-       return 0;
+       return;
 
 error:
        if (consumer_send_status_msg(sock, ret_code) < 0) {
@@ -3469,8 +3550,6 @@ error_nosignal:
        if (relayd_created) {
                free(relayd);
        }
-
-       return ret;
 }
 
 /*
@@ -3716,3 +3795,108 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        }
        return start_pos;
 }
+
+static
+int rotate_rename_local(const char *old_path, const char *new_path,
+               uid_t uid, gid_t gid)
+{
+       int ret;
+
+       assert(old_path);
+       assert(new_path);
+
+       ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG, uid, gid);
+       if (ret < 0) {
+               ERR("Create directory on rotate");
+               goto end;
+       }
+
+       ret = rename(old_path, new_path);
+       if (ret < 0 && errno != ENOENT) {
+               PERROR("Rename completed rotation chunk");
+               goto end;
+       }
+
+       ret = 0;
+end:
+       return ret;
+}
+
+static
+int rotate_rename_relay(const char *old_path, const char *new_path,
+               uint64_t relayd_id)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd while running rotate_rename_relay command");
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end:
+       return ret;
+}
+
+int lttng_consumer_rotate_rename(const char *old_path, const char *new_path,
+               uid_t uid, gid_t gid, uint64_t relayd_id)
+{
+       if (relayd_id != -1ULL) {
+               return rotate_rename_relay(old_path, new_path, relayd_id);
+       } else {
+               return rotate_rename_local(old_path, new_path, uid, gid);
+       }
+}
+
+static
+int mkdir_local(const char *path, uid_t uid, gid_t gid)
+{
+       int ret;
+
+       ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, uid, gid);
+       if (ret < 0) {
+               /* utils_mkdir_recursive logs an error. */
+               goto end;
+       }
+
+       ret = 0;
+end:
+       return ret;
+}
+
+static
+int mkdir_relay(const char *path, uint64_t relayd_id)
+{
+       int ret;
+       struct consumer_relayd_sock_pair *relayd;
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd");
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_mkdir(&relayd->control_sock, path);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+
+end:
+       return ret;
+
+}
+
+int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
+               uint64_t relayd_id)
+{
+       if (relayd_id != -1ULL) {
+               return mkdir_relay(path, relayd_id);
+       } else {
+               return mkdir_local(path, uid, gid);
+       }
+}
This page took 0.03243 seconds and 4 git commands to generate.