Run clang-format on the whole tree
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.cpp
index 872bc1abf8f89c322e217b365138e88c5341d7f8..ea3e3619106654d556bdbf543afca0ce75308bd2 100644 (file)
@@ -8,41 +8,42 @@
  */
 
 #define _LGPL_SOURCE
+#include "ust-consumer.hpp"
+
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/index/index.hpp>
+#include <common/optional.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/shm.hpp>
+#include <common/utils.hpp>
+
 #include <lttng/ust-ctl.h>
 #include <lttng/ust-sigbus.h>
+
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdint.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
 #include <sys/types.h>
-#include <inttypes.h>
 #include <unistd.h>
 #include <urcu/list.h>
-#include <signal.h>
-#include <stdbool.h>
-#include <stdint.h>
 
-#include <bin/lttng-consumerd/health-consumerd.hpp>
-#include <common/common.hpp>
-#include <common/sessiond-comm/sessiond-comm.hpp>
-#include <common/relayd/relayd.hpp>
-#include <common/compat/fcntl.hpp>
-#include <common/compat/endian.hpp>
-#include <common/consumer/consumer-metadata-cache.hpp>
-#include <common/consumer/consumer-stream.hpp>
-#include <common/consumer/consumer-timer.hpp>
-#include <common/utils.hpp>
-#include <common/index/index.hpp>
-#include <common/consumer/consumer.hpp>
-#include <common/shm.hpp>
-#include <common/optional.hpp>
-
-#include "ust-consumer.hpp"
-
-#define INT_MAX_STR_LEN 12     /* includes \0 */
+#define INT_MAX_STR_LEN 12 /* includes \0 */
 
 extern struct lttng_consumer_global_data the_consumer_data;
 extern int consumer_poll_timeout;
@@ -55,7 +56,7 @@ LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
  * Returns 0 on success or else a negative value.
  */
 static int add_channel(struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx)
+                      struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
 
@@ -87,9 +88,11 @@ error:
  *
  * Return NULL on error else the newly allocated stream object.
  */
-static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
-               struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx, int *_alloc_ret)
+static struct lttng_consumer_stream *allocate_stream(int cpu,
+                                                    int key,
+                                                    struct lttng_consumer_channel *channel,
+                                                    struct lttng_consumer_local_data *ctx,
+                                                    int *_alloc_ret)
 {
        int alloc_ret;
        struct lttng_consumer_stream *stream = NULL;
@@ -97,18 +100,17 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(ctx);
 
-       stream = consumer_stream_create(
-                       channel,
-                       channel->key,
-                       key,
-                       channel->name,
-                       channel->relayd_id,
-                       channel->session_id,
-                       channel->trace_chunk,
-                       cpu,
-                       &alloc_ret,
-                       channel->type,
-                       channel->monitor);
+       stream = consumer_stream_create(channel,
+                                       channel->key,
+                                       key,
+                                       channel->name,
+                                       channel->relayd_id,
+                                       channel->session_id,
+                                       channel->trace_chunk,
+                                       cpu,
+                                       &alloc_ret,
+                                       channel->type,
+                                       channel->monitor);
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
@@ -142,7 +144,7 @@ error:
  * Returns 0 on success else a negative value.
  */
 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+                                struct lttng_consumer_local_data *ctx)
 {
        int ret;
        struct lttng_pipe *stream_pipe;
@@ -168,8 +170,8 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
        ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
        if (ret < 0) {
                ERR("Consumer write %s stream to pipe %d",
-                               stream->metadata_flag ? "metadata" : "data",
-                               lttng_pipe_get_writefd(stream_pipe));
+                   stream->metadata_flag ? "metadata" : "data",
+                   lttng_pipe_get_writefd(stream_pipe));
                if (stream->metadata_flag) {
                        consumer_del_stream_for_metadata(stream);
                } else {
@@ -182,10 +184,9 @@ error:
        return ret;
 }
 
-static
-int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
+static int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
 {
-       char cpu_nr[INT_MAX_STR_LEN];  /* int max len */
+       char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
        int ret;
 
        strncpy(stream_shm_path, shm_path, PATH_MAX);
@@ -195,8 +196,7 @@ int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
                PERROR("snprintf");
                goto end;
        }
-       strncat(stream_shm_path, cpu_nr,
-               PATH_MAX - strlen(stream_shm_path) - 1);
+       strncat(stream_shm_path, cpu_nr, PATH_MAX - strlen(stream_shm_path) - 1);
        ret = 0;
 end:
        return ret;
@@ -209,7 +209,7 @@ end:
  * Return 0 on success else a negative value.
  */
 static int create_ust_streams(struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx)
+                             struct lttng_consumer_local_data *ctx)
 {
        int ret, cpu = 0;
        struct lttng_ust_ctl_consumer_stream *ustream;
@@ -269,11 +269,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                 */
                cds_list_add_tail(&stream->send_node, &channel->streams.head);
 
-               ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream,
-                               &stream->max_sb_size);
+               ret = lttng_ust_ctl_get_max_subbuf_size(stream->ustream, &stream->max_sb_size);
                if (ret < 0) {
-                       ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s",
-                                       stream->name);
+                       ERR("lttng_ust_ctl_get_max_subbuf_size failed for stream %s", stream->name);
                        goto error;
                }
 
@@ -286,7 +284,9 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                }
 
                DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
-                               stream->name, stream->key, stream->relayd_stream_id);
+                   stream->name,
+                   stream->key,
+                   stream->relayd_stream_id);
 
                /* Set next CPU stream. */
                channel->streams.count = ++cpu;
@@ -297,8 +297,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                        if (channel->monitor) {
                                /* Set metadata poll pipe if we created one */
                                memcpy(stream->ust_metadata_poll_pipe,
-                                               ust_metadata_pipe,
-                                               sizeof(ust_metadata_pipe));
+                                      ust_metadata_pipe,
+                                      sizeof(ust_metadata_pipe));
                        }
                }
                pthread_mutex_unlock(&stream->lock);
@@ -315,8 +315,9 @@ error_alloc:
        return ret;
 }
 
-static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
-               const struct lttng_credentials *session_credentials)
+static int open_ust_stream_fd(struct lttng_consumer_channel *channel,
+                             int cpu,
+                             const struct lttng_credentials *session_credentials)
 {
        char shm_path[PATH_MAX];
        int ret;
@@ -329,9 +330,10 @@ static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
                goto error_shm_path;
        }
        return run_as_open(shm_path,
-               O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
-               lttng_credentials_get_uid(session_credentials),
-               lttng_credentials_get_gid(session_credentials));
+                          O_RDWR | O_CREAT | O_EXCL,
+                          S_IRUSR | S_IWUSR,
+                          lttng_credentials_get_uid(session_credentials),
+                          lttng_credentials_get_gid(session_credentials));
 
 error_shm_path:
        return -1;
@@ -344,8 +346,8 @@ error_shm_path:
  * Return 0 on success or else a negative value.
  */
 static int create_ust_channel(struct lttng_consumer_channel *channel,
-               struct lttng_ust_ctl_consumer_channel_attr *attr,
-               struct lttng_ust_ctl_consumer_channel **ust_chanp)
+                             struct lttng_ust_ctl_consumer_channel_attr *attr,
+                             struct lttng_ust_ctl_consumer_channel **ust_chanp)
 {
        int ret, nr_stream_fds, i, j;
        int *stream_fds;
@@ -357,11 +359,16 @@ static int create_ust_channel(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(channel->buffer_credentials.is_set);
 
        DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
-                       "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
-                       "switch_timer_interval: %u, read_timer_interval: %u, "
-                       "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
-                       attr->num_subbuf, attr->switch_timer_interval,
-                       attr->read_timer_interval, attr->output, attr->type);
+            "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
+            "switch_timer_interval: %u, read_timer_interval: %u, "
+            "output: %d, type: %d",
+            attr->overwrite,
+            attr->subbuf_size,
+            attr->num_subbuf,
+            attr->switch_timer_interval,
+            attr->read_timer_interval,
+            attr->output,
+            attr->type);
 
        if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
                nr_stream_fds = 1;
@@ -373,8 +380,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel,
                goto error_alloc;
        }
        for (i = 0; i < nr_stream_fds; i++) {
-               stream_fds[i] = open_ust_stream_fd(channel, i,
-                               &channel->buffer_credentials.value);
+               stream_fds[i] = open_ust_stream_fd(channel, i, &channel->buffer_credentials.value);
                if (stream_fds[i] < 0) {
                        ret = -1;
                        goto error_open;
@@ -403,16 +409,15 @@ error_open:
                if (channel->shm_path[0]) {
                        char shm_path[PATH_MAX];
 
-                       closeret = get_stream_shm_path(shm_path,
-                                       channel->shm_path, j);
+                       closeret = get_stream_shm_path(shm_path, channel->shm_path, j);
                        if (closeret) {
                                ERR("Cannot get stream shm path");
                        }
                        closeret = run_as_unlink(shm_path,
-                                       lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
-                                                       channel->buffer_credentials)),
-                                       lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
-                                                       channel->buffer_credentials)));
+                                                lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                                        channel->buffer_credentials)),
+                                                lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                                        channel->buffer_credentials)));
                        if (closeret) {
                                PERROR("unlink %s", shm_path);
                        }
@@ -421,11 +426,11 @@ error_open:
        /* Try to rmdir all directories under shm_path root. */
        if (channel->root_shm_path[0]) {
                (void) run_as_rmdir_recursive(channel->root_shm_path,
-                               lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
-                                               channel->buffer_credentials)),
-                               lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
-                                               channel->buffer_credentials)),
-                               LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
+                                             lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                                     channel->buffer_credentials)),
+                                             lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                                     channel->buffer_credentials)),
+                                             LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
        }
        free(stream_fds);
 error_alloc:
@@ -462,8 +467,9 @@ error:
  * Return 0 on success or else a negative value.
  */
 static int send_channel_to_sessiond_and_relayd(int sock,
-               struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx, int *relayd_error)
+                                              struct lttng_consumer_channel *channel,
+                                              struct lttng_consumer_local_data *ctx,
+                                              int *relayd_error)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
@@ -476,13 +482,13 @@ static int send_channel_to_sessiond_and_relayd(int sock,
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
        if (channel->relayd_id != (uint64_t) -1ULL) {
-               cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
+               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                        health_code_update();
 
                        /* Try to send the stream to the relayd if one is available. */
                        DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd",
-                                       stream->key, channel->name);
+                           stream->key,
+                           channel->name);
                        ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
                        if (ret < 0) {
                                /*
@@ -522,8 +528,7 @@ static int send_channel_to_sessiond_and_relayd(int sock,
        }
 
        /* The channel was sent successfully to the sessiond at this point. */
-       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
+       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                health_code_update();
 
                /* Send stream to session daemon. */
@@ -559,8 +564,8 @@ error:
  * MUST be destroyed by consumer_del_channel().
  */
 static int ask_channel(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel,
-               struct lttng_ust_ctl_consumer_channel_attr *attr)
+                      struct lttng_consumer_channel *channel,
+                      struct lttng_ust_ctl_consumer_channel_attr *attr)
 {
        int ret;
 
@@ -618,7 +623,7 @@ end:
  * On error, return a negative value else 0 on success.
  */
 static int send_streams_to_thread(struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx)
+                                 struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
        struct lttng_consumer_stream *stream, *stmp;
@@ -627,9 +632,7 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(ctx);
 
        /* Send streams to the corresponding thread. */
-       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
-                       send_node) {
-
+       cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
                health_code_update();
 
                /* Sending the stream to the thread. */
@@ -674,9 +677,13 @@ static int flush_channel(uint64_t chan_key)
 
        /* For each stream of the channel id, flush it. */
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
-                       &channel->key, &iter.iter, stream, node_channel_id.node) {
-
+                                         ht->hash_fct(&channel->key, lttng_ht_seed),
+                                         ht->match_fct,
+                                         &channel->key,
+                                         &iter.iter,
+                                         stream,
+                                         node_channel_id.node)
+       {
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
@@ -691,15 +698,17 @@ static int flush_channel(uint64_t chan_key)
                if (!stream->quiescent) {
                        ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
                        if (ret) {
-                               ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64 ", channel name = '%s'",
-                                               chan_key, channel->name);
+                               ERR("Failed to flush buffer while flushing channel: channel key = %" PRIu64
+                                   ", channel name = '%s'",
+                                   chan_key,
+                                   channel->name);
                                ret = LTTNG_ERR_BUFFER_FLUSH_FAILED;
                                pthread_mutex_unlock(&stream->lock);
                                goto error;
                        }
                        stream->quiescent = true;
                }
-next:
+       next:
                pthread_mutex_unlock(&stream->lock);
        }
 
@@ -743,9 +752,13 @@ static int clear_quiescent_channel(uint64_t chan_key)
 
        /* For each stream of the channel id, clear quiescent state. */
        cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
-                       &channel->key, &iter.iter, stream, node_channel_id.node) {
-
+                                         ht->hash_fct(&channel->key, lttng_ht_seed),
+                                         ht->match_fct,
+                                         &channel->key,
+                                         &iter.iter,
+                                         stream,
+                                         node_channel_id.node)
+       {
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
@@ -877,14 +890,12 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
 
        /* Send metadata stream to relayd if needed. */
        if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
-               ret = consumer_send_relayd_stream(metadata->metadata_stream,
-                               metadata->pathname);
+               ret = consumer_send_relayd_stream(metadata->metadata_stream, metadata->pathname);
                if (ret < 0) {
                        ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                        goto error;
                }
-               ret = consumer_send_relayd_streams_sent(
-                               metadata->metadata_stream->net_seq_idx);
+               ret = consumer_send_relayd_streams_sent(metadata->metadata_stream->net_seq_idx);
                if (ret < 0) {
                        ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                        goto error;
@@ -932,8 +943,10 @@ end:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
-               uint64_t key, char *path, uint64_t relayd_id,
-               struct lttng_consumer_local_data *ctx)
+                            uint64_t key,
+                            char *path,
+                            uint64_t relayd_id,
+                            struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
        struct lttng_consumer_stream *metadata_stream;
@@ -942,8 +955,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
        LTTNG_ASSERT(ctx);
        ASSERT_RCU_READ_LOCKED();
 
-       DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
-                       key, path);
+       DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
 
        rcu_read_lock();
 
@@ -979,8 +991,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
                metadata_stream->net_seq_idx = relayd_id;
                ret = consumer_send_relayd_stream(metadata_stream, path);
        } else {
-               ret = consumer_stream_create_output_files(metadata_stream,
-                               false);
+               ret = consumer_stream_create_output_files(metadata_stream, false);
        }
        if (ret < 0) {
                goto error_stream;
@@ -1008,9 +1019,7 @@ error:
        return ret;
 }
 
-static
-int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
-               const char **addr)
+static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
 {
        int ret;
        unsigned long mmap_offset;
@@ -1018,8 +1027,7 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
 
        mmap_base = (const char *) lttng_ust_ctl_get_mmap_base(stream->ustream);
        if (!mmap_base) {
-               ERR("Failed to get mmap base for stream `%s`",
-                               stream->name);
+               ERR("Failed to get mmap base for stream `%s`", stream->name);
                ret = -EPERM;
                goto error;
        }
@@ -1034,7 +1042,6 @@ int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
        *addr = mmap_base + mmap_offset;
 error:
        return ret;
-
 }
 
 /*
@@ -1044,9 +1051,11 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(struct lttng_consumer_channel *channel,
-               uint64_t key, char *path, uint64_t relayd_id,
-               uint64_t nb_packets_per_stream,
-               struct lttng_consumer_local_data *ctx)
+                           uint64_t key,
+                           char *path,
+                           uint64_t relayd_id,
+                           uint64_t nb_packets_per_stream,
+                           struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -1066,7 +1075,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(!channel->monitor);
        DBG("UST consumer snapshot channel %" PRIu64, key);
 
-       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                health_code_update();
 
                /* Lock stream because we are about to change its state. */
@@ -1092,13 +1101,11 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                                goto error_close_stream;
                        }
                } else {
-                       ret = consumer_stream_create_output_files(stream,
-                                       false);
+                       ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
                                goto error_close_stream;
                        }
-                       DBG("UST consumer snapshot stream (%" PRIu64 ")",
-                                       stream->key);
+                       DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
 
                /*
@@ -1108,8 +1115,10 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                if (!stream->quiescent) {
                        ret = lttng_ust_ctl_flush_buffer(stream->ustream, 0);
                        if (ret < 0) {
-                               ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64 ", channel name = '%s'",
-                                               channel->key, channel->name);
+                               ERR("Failed to flush buffer during snapshot of channel: channel key = %" PRIu64
+                                   ", channel name = '%s'",
+                                   channel->key,
+                                   channel->name);
                                goto error_unlock;
                        }
                }
@@ -1138,9 +1147,8 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                 * daemon should never send a maximum stream size that is lower than
                 * subbuffer size.
                 */
-               consumed_pos = consumer_get_consume_start_pos(consumed_pos,
-                               produced_pos, nb_packets_per_stream,
-                               stream->max_sb_size);
+               consumed_pos = consumer_get_consume_start_pos(
+                       consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
 
                while ((long) (consumed_pos - produced_pos) < 0) {
                        ssize_t read_len;
@@ -1181,10 +1189,9 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                                goto error_put_subbuf;
                        }
 
-                       subbuf_view = lttng_buffer_view_init(
-                                       subbuf_addr, 0, padded_len);
+                       subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
                        read_len = lttng_consumer_on_read_subbuffer_mmap(
-                                       stream, &subbuf_view, padded_len - len);
+                               stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
                                        ret = -EPERM;
@@ -1225,14 +1232,11 @@ error_unlock:
        return ret;
 }
 
-static
-void metadata_stream_reset_cache_consumed_position(
-               struct lttng_consumer_stream *stream)
+static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
 {
        ASSERT_LOCKED(stream->lock);
 
-       DBG("Reset metadata cache of session %" PRIu64,
-                       stream->chan->session_id);
+       DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id);
        stream->ust_metadata_pushed = 0;
 }
 
@@ -1244,9 +1248,14 @@ void metadata_stream_reset_cache_consumed_position(
  * the metadata cache flush to concurrently progress in order to
  * complete.
  */
-int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
-               uint64_t len, uint64_t version,
-               struct lttng_consumer_channel *channel, int timer, int wait)
+int lttng_ustconsumer_recv_metadata(int sock,
+                                   uint64_t key,
+                                   uint64_t offset,
+                                   uint64_t len,
+                                   uint64_t version,
+                                   struct lttng_consumer_channel *channel,
+                                   int timer,
+                                   int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
@@ -1275,8 +1284,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
        cache_write_status = consumer_metadata_cache_write(
-                       channel->metadata_cache, offset, len, version,
-                       metadata_str);
+               channel->metadata_cache, offset, len, version, metadata_str);
        pthread_mutex_unlock(&channel->metadata_cache->lock);
        switch (cache_write_status) {
        case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
@@ -1300,8 +1308,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                 */
                if (channel->metadata_stream != NULL) {
                        pthread_mutex_lock(&channel->metadata_stream->lock);
-                       metadata_stream_reset_cache_consumed_position(
-                                       channel->metadata_stream);
+                       metadata_stream_reset_cache_consumed_position(channel->metadata_stream);
                        pthread_mutex_unlock(&channel->metadata_stream->lock);
                } else {
                        /* Validate we are in snapshot mode. */
@@ -1355,7 +1362,8 @@ end:
  * Return 1 on success else a negative value or 0.
  */
 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
-               int sock, struct pollfd *consumer_sockpoll)
+                              int sock,
+                              struct pollfd *consumer_sockpoll)
 {
        int ret_func;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
@@ -1370,14 +1378,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
                if (ret_recv != sizeof(msg)) {
                        DBG("Consumer received unexpected message size %zd (expects %zu)",
-                                       ret_recv, sizeof(msg));
+                           ret_recv,
+                           sizeof(msg));
                        /*
                         * The ret value might 0 meaning an orderly shutdown but this is ok
                         * since the caller handles this.
                         */
                        if (ret_recv > 0) {
-                               lttng_consumer_send_error(ctx,
-                                               LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+                               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                                ret_recv = -1;
                        }
                        return ret_recv;
@@ -1400,15 +1408,19 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                uint32_t major = msg.u.relayd_sock.major;
                uint32_t minor = msg.u.relayd_sock.minor;
                enum lttcomm_sock_proto protocol =
-                               (enum lttcomm_sock_proto) msg.u.relayd_sock
-                                               .relayd_socket_protocol;
+                       (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
 
                /* Session daemon status message are handled in the following call. */
                consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
-                               msg.u.relayd_sock.type, ctx, sock,
-                               consumer_sockpoll, msg.u.relayd_sock.session_id,
-                               msg.u.relayd_sock.relayd_session_id, major,
-                               minor, protocol);
+                                          msg.u.relayd_sock.type,
+                                          ctx,
+                                          sock,
+                                          consumer_sockpoll,
+                                          msg.u.relayd_sock.session_id,
+                                          msg.u.relayd_sock.relayd_session_id,
+                                          major,
+                                          minor,
+                                          protocol);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
@@ -1457,11 +1469,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                is_data_pending = consumer_data_pending(id);
 
                /* Send back returned value to session daemon */
-               ret_send = lttcomm_send_unix_sock(sock, &is_data_pending,
-                               sizeof(is_data_pending));
+               ret_send = lttcomm_send_unix_sock(sock, &is_data_pending, sizeof(is_data_pending));
                if (ret_send < 0) {
-                       DBG("Error when sending the data pending ret code: %zd",
-                                       ret_send);
+                       DBG("Error when sending the data pending ret code: %zd", ret_send);
                        goto error_fatal;
                }
 
@@ -1483,28 +1493,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Create a plain object and reserve a channel key. */
                channel = consumer_allocate_channel(
-                               msg.u.ask_channel.key,
-                               msg.u.ask_channel.session_id,
-                               msg.u.ask_channel.chunk_id.is_set ?
-                                               &chunk_id : NULL,
-                               msg.u.ask_channel.pathname,
-                               msg.u.ask_channel.name,
-                               msg.u.ask_channel.relayd_id,
-                               (enum lttng_event_output) msg.u.ask_channel.output,
-                               msg.u.ask_channel.tracefile_size,
-                               msg.u.ask_channel.tracefile_count,
-                               msg.u.ask_channel.session_id_per_pid,
-                               msg.u.ask_channel.monitor,
-                               msg.u.ask_channel.live_timer_interval,
-                               msg.u.ask_channel.is_live,
-                               msg.u.ask_channel.root_shm_path,
-                               msg.u.ask_channel.shm_path);
+                       msg.u.ask_channel.key,
+                       msg.u.ask_channel.session_id,
+                       msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
+                       msg.u.ask_channel.pathname,
+                       msg.u.ask_channel.name,
+                       msg.u.ask_channel.relayd_id,
+                       (enum lttng_event_output) msg.u.ask_channel.output,
+                       msg.u.ask_channel.tracefile_size,
+                       msg.u.ask_channel.tracefile_count,
+                       msg.u.ask_channel.session_id_per_pid,
+                       msg.u.ask_channel.monitor,
+                       msg.u.ask_channel.live_timer_interval,
+                       msg.u.ask_channel.is_live,
+                       msg.u.ask_channel.root_shm_path,
+                       msg.u.ask_channel.shm_path);
                if (!channel) {
                        goto end_channel_error;
                }
 
-               LTTNG_OPTIONAL_SET(&channel->buffer_credentials,
-                               buffer_credentials);
+               LTTNG_OPTIONAL_SET(&channel->buffer_credentials, buffer_credentials);
 
                /*
                 * Assign UST application UID to the channel. This value is ignored for
@@ -1521,7 +1529,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
                attr.chan_id = msg.u.ask_channel.chan_id;
                memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
-               attr.blocking_timeout= msg.u.ask_channel.blocking_timeout;
+               attr.blocking_timeout = msg.u.ask_channel.blocking_timeout;
 
                /* Match channel buffer type to the UST abi. */
                switch (msg.u.ask_channel.output) {
@@ -1562,8 +1570,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (msg.u.ask_channel.type == LTTNG_UST_ABI_CHAN_METADATA) {
                        int ret_allocate;
 
-                       ret_allocate = consumer_metadata_cache_allocate(
-                                       channel);
+                       ret_allocate = consumer_metadata_cache_allocate(channel);
                        if (ret_allocate < 0) {
                                ERR("Allocating metadata cache");
                                goto end_channel_error;
@@ -1573,11 +1580,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        int monitor_start_ret;
 
-                       consumer_timer_live_start(channel,
-                                       msg.u.ask_channel.live_timer_interval);
+                       consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
                        monitor_start_ret = consumer_timer_monitor_start(
-                                       channel,
-                                       msg.u.ask_channel.monitor_timer_interval);
+                               channel, msg.u.ask_channel.monitor_timer_interval);
                        if (monitor_start_ret < 0) {
                                ERR("Starting channel monitoring timer failed");
                                goto end_channel_error;
@@ -1643,8 +1648,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                /* Send the channel to sessiond (and relayd, if applicable). */
-               ret = send_channel_to_sessiond_and_relayd(
-                               sock, found_channel, ctx, &relayd_err);
+               ret = send_channel_to_sessiond_and_relayd(sock, found_channel, ctx, &relayd_err);
                if (ret < 0) {
                        if (relayd_err) {
                                /*
@@ -1682,11 +1686,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                /* List MUST be empty after or else it could be reused. */
                LTTNG_ASSERT(cds_list_empty(&found_channel->streams.head));
-end_get_channel:
+       end_get_channel:
                goto end_msg_sessiond;
-error_get_channel_fatal:
+       error_get_channel_fatal:
                goto error_fatal;
-end_get_channel_nosignal:
+       end_get_channel_nosignal:
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
@@ -1727,8 +1731,7 @@ end_get_channel_nosignal:
        {
                int ret;
 
-               ret = clear_quiescent_channel(
-                               msg.u.clear_quiescent_channel.key);
+               ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
                if (ret != 0) {
                        ret_code = (lttcomm_return_code) ret;
                }
@@ -1744,8 +1747,7 @@ end_get_channel_nosignal:
                uint64_t version = msg.u.push_metadata.version;
                struct lttng_consumer_channel *found_channel;
 
-               DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
-                               len);
+               DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
 
                found_channel = consumer_find_channel(key);
                if (!found_channel) {
@@ -1791,8 +1793,8 @@ end_get_channel_nosignal:
 
                health_code_update();
 
-               ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
-                               version, found_channel, 0, 1);
+               ret = lttng_ustconsumer_recv_metadata(
+                       sock, key, offset, len, version, found_channel, 0, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
@@ -1800,9 +1802,9 @@ end_get_channel_nosignal:
                        ret_code = (lttcomm_return_code) ret;
                        goto end_push_metadata_msg_sessiond;
                }
-end_push_metadata_msg_sessiond:
+       end_push_metadata_msg_sessiond:
                goto end_msg_sessiond;
-error_push_metadata_fatal:
+       error_push_metadata_fatal:
                goto error_fatal;
        }
        case LTTNG_CONSUMER_SETUP_METADATA:
@@ -1830,10 +1832,10 @@ error_push_metadata_fatal:
                                int ret_snapshot;
 
                                ret_snapshot = snapshot_metadata(found_channel,
-                                               key,
-                                               msg.u.snapshot_channel.pathname,
-                                               msg.u.snapshot_channel.relayd_id,
-                                               ctx);
+                                                                key,
+                                                                msg.u.snapshot_channel.pathname,
+                                                                msg.u.snapshot_channel.relayd_id,
+                                                                ctx);
                                if (ret_snapshot < 0) {
                                        ERR("Snapshot metadata failed");
                                        ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
@@ -1841,13 +1843,13 @@ error_push_metadata_fatal:
                        } else {
                                int ret_snapshot;
 
-                               ret_snapshot = snapshot_channel(found_channel,
-                                               key,
-                                               msg.u.snapshot_channel.pathname,
-                                               msg.u.snapshot_channel.relayd_id,
-                                               msg.u.snapshot_channel
-                                                               .nb_packets_per_stream,
-                                               ctx);
+                               ret_snapshot = snapshot_channel(
+                                       found_channel,
+                                       key,
+                                       msg.u.snapshot_channel.pathname,
+                                       msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.nb_packets_per_stream,
+                                       ctx);
                                if (ret_snapshot < 0) {
                                        ERR("Snapshot channel failed");
                                        ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
@@ -1873,8 +1875,7 @@ error_push_metadata_fatal:
                uint64_t id = msg.u.discarded_events.session_id;
                uint64_t key = msg.u.discarded_events.channel_key;
 
-               DBG("UST consumer discarded events command for session id %"
-                               PRIu64, id);
+               DBG("UST consumer discarded events command for session id %" PRIu64, id);
                rcu_read_lock();
                pthread_mutex_lock(&the_consumer_data.lock);
 
@@ -1889,9 +1890,13 @@ error_push_metadata_fatal:
                 */
                discarded_events = 0;
                cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&id, lttng_ht_seed),
-                               ht->match_fct, &id,
-                               &iter.iter, stream, node_session_id.node) {
+                                                 ht->hash_fct(&id, lttng_ht_seed),
+                                                 ht->match_fct,
+                                                 &id,
+                                                 &iter.iter,
+                                                 stream,
+                                                 node_session_id.node)
+               {
                        if (stream->chan->key == key) {
                                discarded_events = stream->chan->discarded_events;
                                break;
@@ -1900,8 +1905,10 @@ error_push_metadata_fatal:
                pthread_mutex_unlock(&the_consumer_data.lock);
                rcu_read_unlock();
 
-               DBG("UST consumer discarded events command for session id %"
-                               PRIu64 ", channel key %" PRIu64, id, key);
+               DBG("UST consumer discarded events command for session id %" PRIu64
+                   ", channel key %" PRIu64,
+                   id,
+                   key);
 
                health_code_update();
 
@@ -1916,7 +1923,7 @@ error_push_metadata_fatal:
        }
        case LTTNG_CONSUMER_LOST_PACKETS:
        {
-               int ret;
+               int ret;
                uint64_t lost_packets;
                struct lttng_ht_iter iter;
                struct lttng_ht *ht;
@@ -1924,8 +1931,7 @@ error_push_metadata_fatal:
                uint64_t id = msg.u.lost_packets.session_id;
                uint64_t key = msg.u.lost_packets.channel_key;
 
-               DBG("UST consumer lost packets command for session id %"
-                               PRIu64, id);
+               DBG("UST consumer lost packets command for session id %" PRIu64, id);
                rcu_read_lock();
                pthread_mutex_lock(&the_consumer_data.lock);
 
@@ -1937,27 +1943,32 @@ error_push_metadata_fatal:
                 * to extract the information we need, we default to 0 if not
                 * found (no packets lost if the channel is not yet in use).
                 */
-               lost_packets = 0;
+               lost_packets = 0;
                cds_lfht_for_each_entry_duplicate(ht->ht,
-                               ht->hash_fct(&id, lttng_ht_seed),
-                               ht->match_fct, &id,
-                               &iter.iter, stream, node_session_id.node) {
+                                                 ht->hash_fct(&id, lttng_ht_seed),
+                                                 ht->match_fct,
+                                                 &id,
+                                                 &iter.iter,
+                                                 stream,
+                                                 node_session_id.node)
+               {
                        if (stream->chan->key == key) {
-                               lost_packets = stream->chan->lost_packets;
+                               lost_packets = stream->chan->lost_packets;
                                break;
                        }
                }
                pthread_mutex_unlock(&the_consumer_data.lock);
                rcu_read_unlock();
 
-               DBG("UST consumer lost packets command for session id %"
-                               PRIu64 ", channel key %" PRIu64, id, key);
+               DBG("UST consumer lost packets command for session id %" PRIu64
+                   ", channel key %" PRIu64,
+                   id,
+                   key);
 
                health_code_update();
 
                /* Send back returned value to session daemon */
-               ret = lttcomm_send_unix_sock(sock, &lost_packets,
-                               sizeof(lost_packets));
+               ret = lttcomm_send_unix_sock(sock, &lost_packets, sizeof(lost_packets));
                if (ret < 0) {
                        PERROR("send lost packets");
                        goto error_fatal;
@@ -1967,8 +1978,7 @@ error_push_metadata_fatal:
        }
        case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
        {
-               int channel_monitor_pipe, ret_send,
-                               ret_set_channel_monitor_pipe;
+               int channel_monitor_pipe, ret_send, ret_set_channel_monitor_pipe;
                ssize_t ret_recv;
 
                ret_code = LTTCOMM_CONSUMERD_SUCCESS;
@@ -1978,8 +1988,7 @@ error_push_metadata_fatal:
                        goto error_fatal;
                }
 
-               ret_recv = lttcomm_recv_fds_unix_sock(
-                               sock, &channel_monitor_pipe, 1);
+               ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
                if (ret_recv != sizeof(channel_monitor_pipe)) {
                        ERR("Failed to receive channel monitor pipe");
                        goto error_fatal;
@@ -1987,8 +1996,7 @@ error_push_metadata_fatal:
 
                DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
                ret_set_channel_monitor_pipe =
-                               consumer_timer_thread_set_channel_monitor_pipe(
-                                               channel_monitor_pipe);
+                       consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
                if (!ret_set_channel_monitor_pipe) {
                        int flags;
                        int ret_fcntl;
@@ -2002,8 +2010,7 @@ error_push_metadata_fatal:
                        }
                        flags = ret_fcntl;
 
-                       ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
-                                       flags | O_NONBLOCK);
+                       ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
                        if (ret_fcntl == -1) {
                                PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
                                goto error_fatal;
@@ -2032,8 +2039,7 @@ error_push_metadata_fatal:
                         * this channel.
                         */
                        rotate_channel = lttng_consumer_rotate_channel(
-                                       found_channel, key,
-                                       msg.u.rotate_channel.relayd_id);
+                               found_channel, key, msg.u.rotate_channel.relayd_id);
                        if (rotate_channel < 0) {
                                ERR("Rotate channel failed");
                                ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
@@ -2059,14 +2065,13 @@ error_push_metadata_fatal:
                        int ret_rotate_read_streams;
 
                        ret_rotate_read_streams =
-                                       lttng_consumer_rotate_ready_streams(
-                                                       found_channel, key);
+                               lttng_consumer_rotate_ready_streams(found_channel, key);
                        if (ret_rotate_read_streams < 0) {
                                ERR("Rotate channel failed");
                        }
                }
                break;
-end_rotate_channel_nosignal:
+       end_rotate_channel_nosignal:
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_CLEAR_CHANNEL:
@@ -2082,8 +2087,7 @@ end_rotate_channel_nosignal:
                } else {
                        int ret_clear_channel;
 
-                       ret_clear_channel = lttng_consumer_clear_channel(
-                                       found_channel);
+                       ret_clear_channel = lttng_consumer_clear_channel(found_channel);
                        if (ret_clear_channel) {
                                ERR("Clear channel failed key %" PRIu64, key);
                                ret_code = (lttcomm_return_code) ret_clear_channel;
@@ -2103,8 +2107,9 @@ end_rotate_channel_nosignal:
                int ret_send_status;
                lttng_uuid sessiond_uuid;
 
-               std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
-                               sessiond_uuid.begin());
+               std::copy(std::begin(msg.u.init.sessiond_uuid),
+                         std::end(msg.u.init.sessiond_uuid),
+                         sessiond_uuid.begin());
                ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
                health_code_update();
                ret_send_status = consumer_send_status_msg(sock, ret_code);
@@ -2117,17 +2122,16 @@ end_rotate_channel_nosignal:
        case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
        {
                const struct lttng_credentials credentials = {
-                       .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
-                       .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
+                       .uid = LTTNG_OPTIONAL_INIT_VALUE(
+                               msg.u.create_trace_chunk.credentials.value.uid),
+                       .gid = LTTNG_OPTIONAL_INIT_VALUE(
+                               msg.u.create_trace_chunk.credentials.value.gid),
                };
-               const bool is_local_trace =
-                               !msg.u.create_trace_chunk.relayd_id.is_set;
-               const uint64_t relayd_id =
-                               msg.u.create_trace_chunk.relayd_id.value;
-               const char *chunk_override_name =
-                               *msg.u.create_trace_chunk.override_name ?
-                                       msg.u.create_trace_chunk.override_name :
-                                       NULL;
+               const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
+               const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
+               const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
+                       msg.u.create_trace_chunk.override_name :
+                       NULL;
                struct lttng_directory_handle *chunk_directory_handle = NULL;
 
                /*
@@ -2140,8 +2144,7 @@ end_rotate_channel_nosignal:
                        ssize_t ret_recv;
 
                        /* Acnowledge the reception of the command. */
-                       ret_send_status = consumer_send_status_msg(
-                                       sock, LTTCOMM_CONSUMERD_SUCCESS);
+                       ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
                        if (ret_send_status < 0) {
                                /* Somehow, the session daemon is not responding anymore. */
                                goto end_nosignal;
@@ -2150,17 +2153,15 @@ end_rotate_channel_nosignal:
                        /*
                         * Receive trace chunk domain dirfd.
                         */
-                       ret_recv = lttcomm_recv_fds_unix_sock(
-                                       sock, &chunk_dirfd, 1);
+                       ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
                        if (ret_recv != sizeof(chunk_dirfd)) {
                                ERR("Failed to receive trace chunk domain directory file descriptor");
                                goto error_fatal;
                        }
 
-                       DBG("Received trace chunk domain directory fd (%d)",
-                                       chunk_dirfd);
-                       chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
-                                       chunk_dirfd);
+                       DBG("Received trace chunk domain directory fd (%d)", chunk_dirfd);
+                       chunk_directory_handle =
+                               lttng_directory_handle_create_from_dirfd(chunk_dirfd);
                        if (!chunk_directory_handle) {
                                ERR("Failed to initialize chunk domain directory handle from directory file descriptor");
                                if (close(chunk_dirfd)) {
@@ -2171,48 +2172,39 @@ end_rotate_channel_nosignal:
                }
 
                ret_code = lttng_consumer_create_trace_chunk(
-                               !is_local_trace ? &relayd_id : NULL,
-                               msg.u.create_trace_chunk.session_id,
-                               msg.u.create_trace_chunk.chunk_id,
-                               (time_t) msg.u.create_trace_chunk
-                                               .creation_timestamp,
-                               chunk_override_name,
-                               msg.u.create_trace_chunk.credentials.is_set ?
-                                               &credentials :
-                                               NULL,
-                               chunk_directory_handle);
+                       !is_local_trace ? &relayd_id : NULL,
+                       msg.u.create_trace_chunk.session_id,
+                       msg.u.create_trace_chunk.chunk_id,
+                       (time_t) msg.u.create_trace_chunk.creation_timestamp,
+                       chunk_override_name,
+                       msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL,
+                       chunk_directory_handle);
                lttng_directory_handle_put(chunk_directory_handle);
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
        {
                enum lttng_trace_chunk_command_type close_command =
-                       (lttng_trace_chunk_command_type)
-                               msg.u.close_trace_chunk.close_command.value;
-               const uint64_t relayd_id =
-                               msg.u.close_trace_chunk.relayd_id.value;
+                       (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
+               const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
                struct lttcomm_consumer_close_trace_chunk_reply reply;
                char closed_trace_chunk_path[LTTNG_PATH_MAX] = {};
                int ret;
 
                ret_code = lttng_consumer_close_trace_chunk(
-                               msg.u.close_trace_chunk.relayd_id.is_set ?
-                                               &relayd_id :
-                                               NULL,
-                               msg.u.close_trace_chunk.session_id,
-                               msg.u.close_trace_chunk.chunk_id,
-                               (time_t) msg.u.close_trace_chunk.close_timestamp,
-                               msg.u.close_trace_chunk.close_command.is_set ?
-                                               &close_command :
-                                               NULL, closed_trace_chunk_path);
+                       msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL,
+                       msg.u.close_trace_chunk.session_id,
+                       msg.u.close_trace_chunk.chunk_id,
+                       (time_t) msg.u.close_trace_chunk.close_timestamp,
+                       msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL,
+                       closed_trace_chunk_path);
                reply.ret_code = ret_code;
                reply.path_length = strlen(closed_trace_chunk_path) + 1;
                ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
                if (ret != sizeof(reply)) {
                        goto error_fatal;
                }
-               ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
-                               reply.path_length);
+               ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path, reply.path_length);
                if (ret != reply.path_length) {
                        goto error_fatal;
                }
@@ -2220,26 +2212,22 @@ end_rotate_channel_nosignal:
        }
        case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
        {
-               const uint64_t relayd_id =
-                               msg.u.trace_chunk_exists.relayd_id.value;
+               const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
 
                ret_code = lttng_consumer_trace_chunk_exists(
-                               msg.u.trace_chunk_exists.relayd_id.is_set ?
-                                               &relayd_id : NULL,
-                               msg.u.trace_chunk_exists.session_id,
-                               msg.u.trace_chunk_exists.chunk_id);
+                       msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL,
+                       msg.u.trace_chunk_exists.session_id,
+                       msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
        case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
        {
                const uint64_t key = msg.u.open_channel_packets.key;
-               struct lttng_consumer_channel *found_channel =
-                               consumer_find_channel(key);
+               struct lttng_consumer_channel *found_channel = consumer_find_channel(key);
 
                if (found_channel) {
                        pthread_mutex_lock(&found_channel->lock);
-                       ret_code = lttng_consumer_open_channel_packets(
-                                       found_channel);
+                       ret_code = lttng_consumer_open_channel_packets(found_channel);
                        pthread_mutex_unlock(&found_channel->lock);
                } else {
                        /*
@@ -2312,8 +2300,7 @@ end:
        return ret_func;
 }
 
-int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream,
-               int producer_active)
+int lttng_ust_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2339,8 +2326,7 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
  *
  * Returns 0 on success, < 0 on error.
  */
-int lttng_ustconsumer_sample_snapshot_positions(
-               struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2353,8 +2339,8 @@ int lttng_ustconsumer_sample_snapshot_positions(
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_ustconsumer_get_produced_snapshot(
-               struct lttng_consumer_stream *stream, unsigned long *pos)
+int lttng_ustconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
+                                           unsigned long *pos)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2368,8 +2354,8 @@ int lttng_ustconsumer_get_produced_snapshot(
  *
  * Returns 0 on success, < 0 on error
  */
-int lttng_ustconsumer_get_consumed_snapshot(
-               struct lttng_consumer_stream *stream, unsigned long *pos)
+int lttng_ustconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
+                                           unsigned long *pos)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2378,8 +2364,7 @@ int lttng_ustconsumer_get_consumed_snapshot(
        return lttng_ust_ctl_snapshot_get_consumed(stream->ustream, pos);
 }
 
-int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
-               int producer)
+int lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2395,8 +2380,7 @@ int lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
        return lttng_ust_ctl_clear_buffer(stream->ustream);
 }
 
-int lttng_ustconsumer_get_current_timestamp(
-               struct lttng_consumer_stream *stream, uint64_t *ts)
+int lttng_ustconsumer_get_current_timestamp(struct lttng_consumer_stream *stream, uint64_t *ts)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2405,8 +2389,7 @@ int lttng_ustconsumer_get_current_timestamp(
        return lttng_ust_ctl_get_current_timestamp(stream->ustream, ts);
 }
 
-int lttng_ustconsumer_get_sequence_number(
-               struct lttng_consumer_stream *stream, uint64_t *seq)
+int lttng_ustconsumer_get_sequence_number(struct lttng_consumer_stream *stream, uint64_t *seq)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream->ustream);
@@ -2462,10 +2445,10 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
                                ERR("Cannot get stream shm path");
                        }
                        ret = run_as_unlink(shm_path,
-                                       lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
-                                                       chan->buffer_credentials)),
-                                       lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
-                                                       chan->buffer_credentials)));
+                                           lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
+                                                   chan->buffer_credentials)),
+                                           lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
+                                                   chan->buffer_credentials)));
                        if (ret) {
                                PERROR("unlink %s", shm_path);
                        }
@@ -2483,12 +2466,11 @@ void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
        lttng_ust_ctl_destroy_channel(chan->uchan);
        /* Try to rmdir all directories under shm_path root. */
        if (chan->root_shm_path[0]) {
-               (void) run_as_rmdir_recursive(chan->root_shm_path,
-                               lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(
-                                               chan->buffer_credentials)),
-                               lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(
-                                               chan->buffer_credentials)),
-                               LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
+               (void) run_as_rmdir_recursive(
+                       chan->root_shm_path,
+                       lttng_credentials_get_uid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
+                       lttng_credentials_get_gid(LTTNG_OPTIONAL_GET_PTR(chan->buffer_credentials)),
+                       LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
        }
        free(chan->stream_fds);
 }
@@ -2526,15 +2508,13 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
  * Returns the number of bytes pushed from the cache into the ring buffer, or a
  * negative value on error.
  */
-static
-int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
 {
        ssize_t write_len;
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-       if (stream->chan->metadata_cache->contents.size ==
-                       stream->ust_metadata_pushed) {
+       if (stream->chan->metadata_cache->contents.size == stream->ust_metadata_pushed) {
                /*
                 * In the context of a user space metadata channel, a
                 * change in version can be detected in two ways:
@@ -2559,21 +2539,20 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                 * occur as part of the pre-consume) until the metadata size
                 * exceeded the cache size.
                 */
-               if (stream->metadata_version !=
-                               stream->chan->metadata_cache->version) {
+               if (stream->metadata_version != stream->chan->metadata_cache->version) {
                        metadata_stream_reset_cache_consumed_position(stream);
                        consumer_stream_metadata_set_version(stream,
-                                       stream->chan->metadata_cache->version);
+                                                            stream->chan->metadata_cache->version);
                } else {
                        ret = 0;
                        goto end;
                }
        }
 
-       write_len = lttng_ust_ctl_write_one_packet_to_channel(stream->chan->uchan,
-                       &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
-                       stream->chan->metadata_cache->contents.size -
-                                       stream->ust_metadata_pushed);
+       write_len = lttng_ust_ctl_write_one_packet_to_channel(
+               stream->chan->uchan,
+               &stream->chan->metadata_cache->contents.data[stream->ust_metadata_pushed],
+               stream->chan->metadata_cache->contents.size - stream->ust_metadata_pushed);
        LTTNG_ASSERT(write_len != 0);
        if (write_len < 0) {
                ERR("Writing one metadata packet");
@@ -2582,8 +2561,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        }
        stream->ust_metadata_pushed += write_len;
 
-       LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >=
-                       stream->ust_metadata_pushed);
+       LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
        ret = write_len;
 
        /*
@@ -2602,7 +2580,6 @@ end:
        return ret;
 }
 
-
 /*
  * Sync metadata meaning request them to the session daemon and snapshot to the
  * metadata thread can consumer them.
@@ -2613,9 +2590,9 @@ end:
  *
  * The RCU read side lock must be held by the caller.
  */
-enum sync_metadata_status lttng_ustconsumer_sync_metadata(
-               struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *metadata_stream)
+enum sync_metadata_status
+lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+                               struct lttng_consumer_stream *metadata_stream)
 {
        int ret;
        enum sync_metadata_status status;
@@ -2651,7 +2628,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata(
         */
        if (consumer_stream_is_deleted(metadata_stream)) {
                DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
-                               metadata_stream->key);
+                   metadata_stream->key);
                status = SYNC_METADATA_STATUS_NO_DATA;
                goto end;
        }
@@ -2669,7 +2646,8 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata(
 
        ret = lttng_ust_ctl_snapshot(metadata_stream->ustream);
        if (ret < 0) {
-               ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret);
+               ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d",
+                   ret);
                status = SYNC_METADATA_STATUS_ERROR;
                goto end;
        }
@@ -2682,7 +2660,7 @@ end:
  * Return 0 on success else a negative value.
  */
 static int notify_if_more_data(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+                              struct lttng_consumer_local_data *ctx)
 {
        int ret;
        struct lttng_ust_ctl_consumer_stream *ustream;
@@ -2758,18 +2736,17 @@ static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream)
 }
 
 static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuf)
+                                        struct stream_subbuffer *subbuf)
 {
        int ret;
 
-       ret = lttng_ust_ctl_get_subbuf_size(
-                       stream->ustream, &subbuf->info.data.subbuf_size);
+       ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &subbuf->info.data.subbuf_size);
        if (ret) {
                goto end;
        }
 
-       ret = lttng_ust_ctl_get_padded_subbuf_size(
-                       stream->ustream, &subbuf->info.data.padded_subbuf_size);
+       ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream,
+                                                  &subbuf->info.data.padded_subbuf_size);
        if (ret) {
                goto end;
        }
@@ -2779,7 +2756,7 @@ end:
 }
 
 static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuf)
+                                          struct stream_subbuffer *subbuf)
 {
        int ret;
 
@@ -2795,7 +2772,7 @@ end:
 }
 
 static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuf)
+                                      struct stream_subbuffer *subbuf)
 {
        int ret;
 
@@ -2804,43 +2781,40 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       ret = lttng_ust_ctl_get_packet_size(
-                       stream->ustream, &subbuf->info.data.packet_size);
+       ret = lttng_ust_ctl_get_packet_size(stream->ustream, &subbuf->info.data.packet_size);
        if (ret < 0) {
                PERROR("Failed to get sub-buffer packet size");
                goto end;
        }
 
-       ret = lttng_ust_ctl_get_content_size(
-                       stream->ustream, &subbuf->info.data.content_size);
+       ret = lttng_ust_ctl_get_content_size(stream->ustream, &subbuf->info.data.content_size);
        if (ret < 0) {
                PERROR("Failed to get sub-buffer content size");
                goto end;
        }
 
-       ret = lttng_ust_ctl_get_timestamp_begin(
-                       stream->ustream, &subbuf->info.data.timestamp_begin);
+       ret = lttng_ust_ctl_get_timestamp_begin(stream->ustream,
+                                               &subbuf->info.data.timestamp_begin);
        if (ret < 0) {
                PERROR("Failed to get sub-buffer begin timestamp");
                goto end;
        }
 
-       ret = lttng_ust_ctl_get_timestamp_end(
-                       stream->ustream, &subbuf->info.data.timestamp_end);
+       ret = lttng_ust_ctl_get_timestamp_end(stream->ustream, &subbuf->info.data.timestamp_end);
        if (ret < 0) {
                PERROR("Failed to get sub-buffer end timestamp");
                goto end;
        }
 
-       ret = lttng_ust_ctl_get_events_discarded(
-                       stream->ustream, &subbuf->info.data.events_discarded);
+       ret = lttng_ust_ctl_get_events_discarded(stream->ustream,
+                                                &subbuf->info.data.events_discarded);
        if (ret) {
                PERROR("Failed to get sub-buffer events discarded count");
                goto end;
        }
 
        ret = lttng_ust_ctl_get_sequence_number(stream->ustream,
-                       &subbuf->info.data.sequence_number.value);
+                                               &subbuf->info.data.sequence_number.value);
        if (ret) {
                /* May not be supported by older LTTng-modules. */
                if (ret != -ENOTTY) {
@@ -2851,15 +2825,14 @@ static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
                subbuf->info.data.sequence_number.is_set = true;
        }
 
-       ret = lttng_ust_ctl_get_stream_id(
-                       stream->ustream, &subbuf->info.data.stream_id);
+       ret = lttng_ust_ctl_get_stream_id(stream->ustream, &subbuf->info.data.stream_id);
        if (ret < 0) {
                PERROR("Failed to get stream id");
                goto end;
        }
 
        ret = lttng_ust_ctl_get_instance_id(stream->ustream,
-                       &subbuf->info.data.stream_instance_id.value);
+                                           &subbuf->info.data.stream_instance_id.value);
        if (ret) {
                /* May not be supported by older LTTng-modules. */
                if (ret != -ENOTTY) {
@@ -2874,13 +2847,12 @@ end:
 }
 
 static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuffer)
+                                    struct stream_subbuffer *subbuffer)
 {
        int ret;
        const char *addr;
 
-       ret = stream->read_subbuffer_ops.extract_subbuffer_info(
-                       stream, subbuffer);
+       ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
        if (ret) {
                goto end;
        }
@@ -2890,16 +2862,15 @@ static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       subbuffer->buffer.buffer = lttng_buffer_view_init(
-                       addr, 0, subbuffer->info.data.padded_subbuf_size);
+       subbuffer->buffer.buffer =
+               lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
        LTTNG_ASSERT(subbuffer->buffer.buffer.data != NULL);
 end:
        return ret;
 }
 
-static enum get_next_subbuffer_status get_next_subbuffer(
-               struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status get_next_subbuffer(struct lttng_consumer_stream *stream,
+                                                        struct stream_subbuffer *subbuffer)
 {
        int ret;
        enum get_next_subbuffer_status status;
@@ -2910,7 +2881,7 @@ static enum get_next_subbuffer_status get_next_subbuffer(
                status = GET_NEXT_SUBBUFFER_STATUS_OK;
                break;
        case -ENODATA:
-               case -EAGAIN:
+       case -EAGAIN:
                /*
                 * The caller only expects -ENODATA when there is no data to
                 * read, but the kernel tracer returns -EAGAIN when there is
@@ -2934,9 +2905,9 @@ end:
        return status;
 }
 
-static enum get_next_subbuffer_status get_next_subbuffer_metadata(
-               struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuffer)
+static enum get_next_subbuffer_status
+get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
+                           struct stream_subbuffer *subbuffer)
 {
        int ret;
        bool cache_empty;
@@ -2979,7 +2950,7 @@ static enum get_next_subbuffer_status get_next_subbuffer_metadata(
                } else {
                        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
                        cache_empty = stream->chan->metadata_cache->contents.size ==
-                                       stream->ust_metadata_pushed;
+                               stream->ust_metadata_pushed;
                        pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                }
        } while (!got_subbuffer);
@@ -3034,7 +3005,7 @@ end:
 }
 
 static int put_next_subbuffer(struct lttng_consumer_stream *stream,
-               struct stream_subbuffer *subbuffer __attribute__((unused)))
+                             struct stream_subbuffer *subbuffer __attribute__((unused)))
 {
        const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
 
@@ -3043,42 +3014,35 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream,
 }
 
 static int signal_metadata(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx __attribute__((unused)))
+                          struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        ASSERT_LOCKED(stream->metadata_rdv_lock);
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
 }
 
-static int lttng_ustconsumer_set_stream_ops(
-               struct lttng_consumer_stream *stream)
+static int lttng_ustconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
 
        stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
        if (stream->metadata_flag) {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_metadata;
-               stream->read_subbuffer_ops.extract_subbuffer_info =
-                               extract_metadata_subbuffer_info;
+               stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_metadata;
+               stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
                stream->read_subbuffer_ops.reset_metadata =
-                               metadata_stream_reset_cache_consumed_position;
+                       metadata_stream_reset_cache_consumed_position;
                if (stream->chan->is_live) {
                        stream->read_subbuffer_ops.on_sleep = signal_metadata;
-                       ret = consumer_stream_enable_metadata_bucketization(
-                                       stream);
+                       ret = consumer_stream_enable_metadata_bucketization(stream);
                        if (ret) {
                                goto end;
                        }
                }
        } else {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer;
-               stream->read_subbuffer_ops.extract_subbuffer_info =
-                               extract_data_subbuffer_info;
+               stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer;
+               stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
                stream->read_subbuffer_ops.on_sleep = notify_if_more_data;
                if (stream->chan->is_live) {
-                       stream->read_subbuffer_ops.send_live_beacon =
-                                       consumer_flush_ust_index;
+                       stream->read_subbuffer_ops.send_live_beacon = consumer_flush_ust_index;
                }
        }
 
@@ -3103,7 +3067,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
         * no current trace chunk on the parent channel.
         */
        if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
-                       stream->chan->trace_chunk) {
+           stream->chan->trace_chunk) {
                ret = consumer_stream_create_output_files(stream, true);
                if (ret) {
                        goto error;
@@ -3159,12 +3123,14 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                 * whetnever ust_metadata_pushed is incremented, the associated
                 * metadata has been consumed from the metadata stream.
                 */
-               DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
-                               contiguous, pushed);
+               DBG("UST consumer metadata pending check: contiguous %" PRIu64
+                   " vs pushed %" PRIu64,
+                   contiguous,
+                   pushed);
                LTTNG_ASSERT(((int64_t) (contiguous - pushed)) >= 0);
                if ((contiguous != pushed) ||
-                               (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
-                       ret = 1;        /* Data is pending */
+                   (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
+                       ret = 1; /* Data is pending */
                        goto end;
                }
        } else {
@@ -3176,7 +3142,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
                         */
                        ret = lttng_ust_ctl_put_subbuf(stream->ustream);
                        LTTNG_ASSERT(ret == 0);
-                       ret = 1;        /* Data is pending */
+                       ret = 1; /* Data is pending */
                        goto end;
                }
        }
@@ -3247,15 +3213,12 @@ void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
        DBG("UST consumer closing all metadata streams");
 
        rcu_read_lock();
-       cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
-                       node.node) {
-
+       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                health_code_update();
 
                pthread_mutex_lock(&stream->chan->lock);
                lttng_ustconsumer_close_metadata(stream->chan);
                pthread_mutex_unlock(&stream->chan->lock);
-
        }
        rcu_read_unlock();
 }
@@ -3281,7 +3244,9 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * pushed out due to concurrent interaction with the session daemon.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer, int wait)
+                                      struct lttng_consumer_channel *channel,
+                                      int timer,
+                                      int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
@@ -3317,17 +3282,18 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        request.uid = channel->ust_app_uid;
        request.key = channel->key;
 
-       DBG("Sending metadata request to sessiond, session id %" PRIu64
-                       ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
-                       request.session_id, request.session_id_per_pid, request.uid,
-                       request.key);
+       DBG("Sending metadata request to sessiond, session id %" PRIu64 ", per-pid %" PRIu64
+           ", app UID %u and channel key %" PRIu64,
+           request.session_id,
+           request.session_id_per_pid,
+           request.uid,
+           request.key);
 
        pthread_mutex_lock(&ctx->metadata_socket_lock);
 
        health_code_update();
 
-       ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
-                       sizeof(request));
+       ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request, sizeof(request));
        if (ret < 0) {
                ERR("Asking metadata to sessiond");
                goto end;
@@ -3336,11 +3302,9 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        /* Receive the metadata from sessiond */
-       ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
-                       sizeof(msg));
+       ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
-               DBG("Consumer received unexpected message size %d (expects %zu)",
-                       ret, sizeof(msg));
+               DBG("Consumer received unexpected message size %d (expects %zu)", ret, sizeof(msg));
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                /*
                 * The ret value might 0 meaning an orderly shutdown but this is ok
@@ -3353,8 +3317,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        if (msg.cmd_type == LTTNG_ERR_UND) {
                /* No registry found */
-               (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
-                               ret_code);
+               (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret_code);
                ret = 0;
                goto end;
        } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
@@ -3376,8 +3339,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        /* Tell session daemon we are ready to receive the metadata. */
-       ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
-                       LTTCOMM_CONSUMERD_SUCCESS);
+       ret = consumer_send_status_msg(ctx->consumer_metadata_socket, LTTCOMM_CONSUMERD_SUCCESS);
        if (ret < 0 || len == 0) {
                /*
                 * Somehow, the session daemon is not responding anymore or there is
@@ -3388,8 +3350,8 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
 
        health_code_update();
 
-       ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
-                       key, offset, len, version, channel, timer, wait);
+       ret = lttng_ustconsumer_recv_metadata(
+               ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
@@ -3409,8 +3371,7 @@ end:
 /*
  * Return the ustctl call for the get stream id.
  */
-int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
-               uint64_t *stream_id)
+int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id)
 {
        LTTNG_ASSERT(stream);
        LTTNG_ASSERT(stream_id);
This page took 0.08377 seconds and 4 git commands to generate.