X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.cpp;fp=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.cpp;h=c40f3b6947ed72f3f4ae735022e3d3994398b770;hp=ce1d3d8293463898d09b1d9f12eeab018dbbbf9e;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/common/kernel-consumer/kernel-consumer.cpp b/src/common/kernel-consumer/kernel-consumer.cpp index ce1d3d829..c40f3b694 100644 --- a/src/common/kernel-consumer/kernel-consumer.cpp +++ b/src/common/kernel-consumer/kernel-consumer.cpp @@ -8,37 +8,37 @@ */ #define _LGPL_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "kernel-consumer.hpp" -#include +#include #include -#include -#include -#include -#include #include -#include -#include -#include +#include #include -#include #include -#include -#include #include #include +#include +#include +#include +#include +#include +#include +#include +#include -#include "kernel-consumer.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include extern struct lttng_consumer_global_data the_consumer_data; extern int consumer_poll_timeout; @@ -70,8 +70,7 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) * * Returns 0 on success, < 0 on error. */ -int lttng_kconsumer_sample_snapshot_positions( - struct lttng_consumer_stream *stream) +int lttng_kconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream) { LTTNG_ASSERT(stream); @@ -83,8 +82,7 @@ int lttng_kconsumer_sample_snapshot_positions( * * Returns 0 on success, < 0 on error */ -int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos) +int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos) { int ret; int infd = stream->wait_fd; @@ -102,8 +100,7 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, * * Returns 0 on success, < 0 on error */ -int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos) +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos) { int ret; int infd = stream->wait_fd; @@ -116,9 +113,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } -static -int get_current_subbuf_addr(struct lttng_consumer_stream *stream, - const char **addr) +static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr) { int ret; unsigned long mmap_offset; @@ -142,10 +137,11 @@ error: * * Returns 0 on success, < 0 on error */ -static int lttng_kconsumer_snapshot_channel( - struct lttng_consumer_channel *channel, - uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream) +static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *channel, + uint64_t key, + char *path, + uint64_t relayd_id, + uint64_t nb_packets_per_stream) { int ret; struct lttng_consumer_stream *stream; @@ -160,12 +156,12 @@ static int lttng_kconsumer_snapshot_channel( /* Splice is not supported yet for channel snapshot. */ if (channel->output != CONSUMER_CHANNEL_MMAP) { ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot", - channel->name); + channel->name); ret = -1; goto end; } - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + cds_list_for_each_entry (stream, &channel->streams.head, send_node) { unsigned long consumed_pos, produced_pos; health_code_update(); @@ -201,13 +197,11 @@ static int lttng_kconsumer_snapshot_channel( goto error_close_stream_output; } } else { - ret = consumer_stream_create_output_files(stream, - false); + ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { goto error_close_stream_output; } - DBG("Kernel consumer snapshot stream (%" PRIu64 ")", - stream->key); + DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key); } ret = kernctl_buffer_flush_empty(stream->wait_fd); @@ -245,9 +239,8 @@ static int lttng_kconsumer_snapshot_channel( goto error_close_stream_output; } - consumed_pos = consumer_get_consume_start_pos(consumed_pos, - produced_pos, nb_packets_per_stream, - stream->max_sb_size); + consumed_pos = consumer_get_consume_start_pos( + consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size); while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; @@ -287,11 +280,9 @@ static int lttng_kconsumer_snapshot_channel( goto error_put_subbuf; } - subbuf_view = lttng_buffer_view_init( - subbuf_addr, 0, padded_len); + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap( - stream, &subbuf_view, - padded_len - len); + stream, &subbuf_view, padded_len - len); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -300,12 +291,14 @@ static int lttng_kconsumer_snapshot_channel( if (relayd_id != (uint64_t) -1ULL) { if (read_len != len) { ERR("Error sending to the relay (ret: %zd != len: %lu)", - read_len, len); + read_len, + len); } } else { if (read_len != padded_len) { ERR("Error writing to tracefile (ret: %zd != len: %lu)", - read_len, padded_len); + read_len, + padded_len); } } @@ -347,10 +340,11 @@ end: * * Returns 0 on success, < 0 on error */ -static int lttng_kconsumer_snapshot_metadata( - struct lttng_consumer_channel *metadata_channel, - uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) +static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *metadata_channel, + uint64_t key, + char *path, + uint64_t relayd_id, + struct lttng_consumer_local_data *ctx) { int ret, use_relayd = 0; ssize_t ret_read; @@ -358,8 +352,7 @@ static int lttng_kconsumer_snapshot_metadata( LTTNG_ASSERT(ctx); - DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", - key, path); + DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); rcu_read_lock(); @@ -381,8 +374,7 @@ static int lttng_kconsumer_snapshot_metadata( goto error_snapshot; } } else { - ret = consumer_stream_create_output_files(metadata_stream, - false); + ret = consumer_stream_create_output_files(metadata_stream, false); if (ret < 0) { goto error_snapshot; } @@ -393,8 +385,7 @@ static int lttng_kconsumer_snapshot_metadata( ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret_read < 0) { - ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", - ret_read); + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read); ret = ret_read; goto error_snapshot; } @@ -434,7 +425,8 @@ error_snapshot: * Return 1 on success else a negative value or 0. */ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll) + int sock, + struct pollfd *consumer_sockpoll) { int ret_func; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -448,8 +440,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret_recv != sizeof(msg)) { if (ret_recv > 0) { - lttng_consumer_send_error(ctx, - LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); ret_recv = -1; } return ret_recv; @@ -471,15 +462,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { uint32_t major = msg.u.relayd_sock.major; uint32_t minor = msg.u.relayd_sock.minor; - enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto) - msg.u.relayd_sock.relayd_socket_protocol; + enum lttcomm_sock_proto protocol = + (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol; /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, - consumer_sockpoll, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id, major, - minor, protocol); + msg.u.relayd_sock.type, + ctx, + sock, + consumer_sockpoll, + msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, + major, + minor, + protocol); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -500,19 +496,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); - new_channel = consumer_allocate_channel(msg.u.channel.channel_key, - msg.u.channel.session_id, - msg.u.channel.chunk_id.is_set ? - &chunk_id : NULL, - msg.u.channel.pathname, - msg.u.channel.name, - msg.u.channel.relayd_id, msg.u.channel.output, - msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count, 0, - msg.u.channel.monitor, - msg.u.channel.live_timer_interval, - msg.u.channel.is_live, - NULL, NULL); + new_channel = + consumer_allocate_channel(msg.u.channel.channel_key, + msg.u.channel.session_id, + msg.u.channel.chunk_id.is_set ? &chunk_id : NULL, + msg.u.channel.pathname, + msg.u.channel.name, + msg.u.channel.relayd_id, + msg.u.channel.output, + msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count, + 0, + msg.u.channel.monitor, + msg.u.channel.live_timer_interval, + msg.u.channel.is_live, + NULL, + NULL); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -544,28 +543,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); if (ctx->on_recv_channel != NULL) { - int ret_recv_channel = - ctx->on_recv_channel(new_channel); + int ret_recv_channel = ctx->on_recv_channel(new_channel); if (ret_recv_channel == 0) { - ret_add_channel = consumer_add_channel( - new_channel, ctx); + ret_add_channel = consumer_add_channel(new_channel, ctx); } else if (ret_recv_channel < 0) { goto end_nosignal; } } else { - ret_add_channel = - consumer_add_channel(new_channel, ctx); + ret_add_channel = consumer_add_channel(new_channel, ctx); } - if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && - !ret_add_channel) { + if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret_add_channel) { int monitor_start_ret; DBG("Consumer starting monitor timer"); - consumer_timer_live_start(new_channel, - msg.u.channel.live_timer_interval); + consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); monitor_start_ret = consumer_timer_monitor_start( - new_channel, - msg.u.channel.monitor_timer_interval); + new_channel, msg.u.channel.monitor_timer_interval); if (monitor_start_ret < 0) { ERR("Starting channel monitoring timer failed"); goto end_nosignal; @@ -576,8 +569,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* If we received an error in add_channel, we need to report it. */ if (ret_add_channel < 0) { - ret_send_status = consumer_send_status_msg( - sock, ret_add_channel); + ret_send_status = consumer_send_status_msg(sock, ret_add_channel); if (ret_send_status < 0) { goto error_fatal; } @@ -660,18 +652,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); pthread_mutex_lock(&channel->lock); - new_stream = consumer_stream_create( - channel, - channel->key, - fd, - channel->name, - channel->relayd_id, - channel->session_id, - channel->trace_chunk, - msg.u.stream.cpu, - &alloc_ret, - channel->type, - channel->monitor); + new_stream = consumer_stream_create(channel, + channel->key, + fd, + channel->name, + channel->relayd_id, + channel->session_id, + channel->trace_chunk, + msg.u.stream.cpu, + &alloc_ret, + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -685,16 +676,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } new_stream->wait_fd = fd; - ret_get_max_subbuf_size = kernctl_get_max_subbuf_size( - new_stream->wait_fd, &new_stream->max_sb_size); + ret_get_max_subbuf_size = + kernctl_get_max_subbuf_size(new_stream->wait_fd, &new_stream->max_sb_size); if (ret_get_max_subbuf_size < 0) { pthread_mutex_unlock(&channel->lock); ERR("Failed to get kernel maximal subbuffer size"); goto error_add_stream_nosignal; } - consumer_stream_update_channel_attributes(new_stream, - channel); + consumer_stream_update_channel_attributes(new_stream, channel); /* * We've just assigned the channel to the stream so increment the @@ -737,8 +727,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Do not monitor this stream. */ if (!channel->monitor) { DBG("Kernel consumer add stream %s in no monitor mode with " - "relayd id %" PRIu64, new_stream->name, - new_stream->net_seq_idx); + "relayd id %" PRIu64, + new_stream->name, + new_stream->net_seq_idx); cds_list_add(&new_stream->send_node, &channel->streams.head); pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); @@ -749,8 +740,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (new_stream->net_seq_idx != (uint64_t) -1ULL) { int ret_send_relayd_stream; - ret_send_relayd_stream = consumer_send_relayd_stream( - new_stream, new_stream->chan->pathname); + ret_send_relayd_stream = + consumer_send_relayd_stream(new_stream, new_stream->chan->pathname); if (ret_send_relayd_stream < 0) { pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); @@ -767,8 +758,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int ret_send_relayd_streams_sent; ret_send_relayd_streams_sent = - consumer_send_relayd_streams_sent( - new_stream->net_seq_idx); + consumer_send_relayd_streams_sent(new_stream->net_seq_idx); if (ret_send_relayd_streams_sent < 0) { pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); @@ -793,12 +783,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - ret_pipe_write = lttng_pipe_write( - stream_pipe, &new_stream, sizeof(new_stream)); + ret_pipe_write = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret_pipe_write < 0) { ERR("Consumer write %s stream to pipe %d", - new_stream->metadata_flag ? "metadata" : "data", - lttng_pipe_get_writefd(stream_pipe)); + new_stream->metadata_flag ? "metadata" : "data", + lttng_pipe_get_writefd(stream_pipe)); if (new_stream->metadata_flag) { consumer_del_stream_for_metadata(new_stream); } else { @@ -808,12 +797,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64, - new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id); -end_add_stream: + new_stream->name, + fd, + new_stream->chan->pathname, + new_stream->relayd_stream_id); + end_add_stream: break; -error_add_stream_nosignal: + error_add_stream_nosignal: goto end_nosignal; -error_add_stream_fatal: + error_add_stream_fatal: goto error_fatal; } case LTTNG_CONSUMER_STREAMS_SENT: @@ -831,8 +823,7 @@ error_add_stream_fatal: * We could not find the channel. Can happen if cpu hotplug * happens while tearing down. */ - ERR("Unable to find channel key %" PRIu64, - msg.u.sent_streams.channel_key); + ERR("Unable to find channel key %" PRIu64, msg.u.sent_streams.channel_key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } @@ -842,8 +833,7 @@ error_add_stream_fatal: * Send status code to session daemon. */ ret_send_status = consumer_send_status_msg(sock, ret_code); - if (ret_send_status < 0 || - ret_code != LTTCOMM_CONSUMERD_SUCCESS) { + if (ret_send_status < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Somehow, the session daemon is not responding anymore. */ goto error_streams_sent_nosignal; } @@ -863,16 +853,16 @@ error_add_stream_fatal: if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) { int ret_send_relay_streams; - ret_send_relay_streams = consumer_send_relayd_streams_sent( - msg.u.sent_streams.net_seq_idx); + ret_send_relay_streams = + consumer_send_relayd_streams_sent(msg.u.sent_streams.net_seq_idx); if (ret_send_relay_streams < 0) { goto error_streams_sent_nosignal; } channel->streams_sent_to_relayd = true; } -end_error_streams_sent: + end_error_streams_sent: break; -error_streams_sent_nosignal: + error_streams_sent_nosignal: goto end_nosignal; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -932,8 +922,8 @@ error_streams_sent_nosignal: health_code_update(); /* Send back returned value to session daemon */ - ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending, - sizeof(ret_data_pending)); + ret_send = + lttcomm_send_unix_sock(sock, &ret_data_pending, sizeof(ret_data_pending)); if (ret_send < 0) { PERROR("send data pending ret code"); goto error_fatal; @@ -960,10 +950,11 @@ error_streams_sent_nosignal: int ret_snapshot; ret_snapshot = lttng_kconsumer_snapshot_metadata( - channel, key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - ctx); + channel, + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + ctx); if (ret_snapshot < 0) { ERR("Snapshot metadata failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -972,11 +963,11 @@ error_streams_sent_nosignal: int ret_snapshot; ret_snapshot = lttng_kconsumer_snapshot_channel( - channel, key, - msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel - .nb_packets_per_stream); + channel, + key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.nb_packets_per_stream); if (ret_snapshot < 0) { ERR("Snapshot channel failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; @@ -1032,7 +1023,7 @@ error_streams_sent_nosignal: LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1)); consumer_del_channel(channel); -end_destroy_channel: + end_destroy_channel: goto end_nosignal; } case LTTNG_CONSUMER_DISCARDED_EVENTS: @@ -1043,13 +1034,14 @@ end_destroy_channel: uint64_t id = msg.u.discarded_events.session_id; uint64_t key = msg.u.discarded_events.channel_key; - DBG("Kernel consumer discarded events command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("Kernel consumer discarded events command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); channel = consumer_find_channel(key); if (!channel) { - ERR("Kernel consumer discarded events channel %" - PRIu64 " not found", key); + ERR("Kernel consumer discarded events channel %" PRIu64 " not found", key); count = 0; } else { count = channel->discarded_events; @@ -1074,13 +1066,14 @@ end_destroy_channel: uint64_t id = msg.u.lost_packets.session_id; uint64_t key = msg.u.lost_packets.channel_key; - DBG("Kernel consumer lost packets command for session id %" - PRIu64 ", channel key %" PRIu64, id, key); + DBG("Kernel consumer lost packets command for session id %" PRIu64 + ", channel key %" PRIu64, + id, + key); channel = consumer_find_channel(key); if (!channel) { - ERR("Kernel consumer lost packets channel %" - PRIu64 " not found", key); + ERR("Kernel consumer lost packets channel %" PRIu64 " not found", key); count = 0; } else { count = channel->lost_packets; @@ -1110,8 +1103,7 @@ end_destroy_channel: goto error_fatal; } - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &channel_monitor_pipe, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1); if (ret_recv != sizeof(channel_monitor_pipe)) { ERR("Failed to receive channel monitor pipe"); goto error_fatal; @@ -1119,8 +1111,7 @@ end_destroy_channel: DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); ret_set_channel_monitor_pipe = - consumer_timer_thread_set_channel_monitor_pipe( - channel_monitor_pipe); + consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe); if (!ret_set_channel_monitor_pipe) { int flags; int ret_fcntl; @@ -1134,8 +1125,7 @@ end_destroy_channel: } flags = ret_fcntl; - ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, - flags | O_NONBLOCK); + ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK); if (ret_fcntl == -1) { PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); goto error_fatal; @@ -1169,8 +1159,7 @@ end_destroy_channel: int ret_rotate_channel; ret_rotate_channel = lttng_consumer_rotate_channel( - channel, key, - msg.u.rotate_channel.relayd_id); + channel, key, msg.u.rotate_channel.relayd_id); if (ret_rotate_channel < 0) { ERR("Rotate channel failed"); ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; @@ -1188,14 +1177,13 @@ end_destroy_channel: /* Rotate the streams that are ready right now. */ int ret_rotate; - ret_rotate = lttng_consumer_rotate_ready_streams( - channel, key); + ret_rotate = lttng_consumer_rotate_ready_streams(channel, key); if (ret_rotate < 0) { ERR("Rotate ready streams failed"); } } break; -error_rotate_channel: + error_rotate_channel: goto end_nosignal; } case LTTNG_CONSUMER_CLEAR_CHANNEL: @@ -1211,8 +1199,7 @@ error_rotate_channel: } else { int ret_clear_channel; - ret_clear_channel = - lttng_consumer_clear_channel(channel); + ret_clear_channel = lttng_consumer_clear_channel(channel); if (ret_clear_channel) { ERR("Clear channel failed"); ret_code = (lttcomm_return_code) ret_clear_channel; @@ -1234,11 +1221,11 @@ error_rotate_channel: int ret_send_status; lttng_uuid sessiond_uuid; - std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid), - sessiond_uuid.begin()); + std::copy(std::begin(msg.u.init.sessiond_uuid), + std::end(msg.u.init.sessiond_uuid), + sessiond_uuid.begin()); - ret_code = lttng_consumer_init_command(ctx, - sessiond_uuid); + ret_code = lttng_consumer_init_command(ctx, sessiond_uuid); health_code_update(); ret_send_status = consumer_send_status_msg(sock, ret_code); if (ret_send_status < 0) { @@ -1250,17 +1237,16 @@ error_rotate_channel: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), + .uid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE( + msg.u.create_trace_chunk.credentials.value.gid), }; - const bool is_local_trace = - !msg.u.create_trace_chunk.relayd_id.is_set; - const uint64_t relayd_id = - msg.u.create_trace_chunk.relayd_id.value; - const char *chunk_override_name = - *msg.u.create_trace_chunk.override_name ? - msg.u.create_trace_chunk.override_name : - NULL; + const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; + const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value; + const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ? + msg.u.create_trace_chunk.override_name : + NULL; struct lttng_directory_handle *chunk_directory_handle = NULL; /* @@ -1273,24 +1259,21 @@ error_rotate_channel: ssize_t ret_recv; /* Acnowledge the reception of the command. */ - ret_send_status = consumer_send_status_msg( - sock, LTTCOMM_CONSUMERD_SUCCESS); + ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS); if (ret_send_status < 0) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } - ret_recv = lttcomm_recv_fds_unix_sock( - sock, &chunk_dirfd, 1); + ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1); if (ret_recv != sizeof(chunk_dirfd)) { ERR("Failed to receive trace chunk directory file descriptor"); goto error_fatal; } - DBG("Received trace chunk directory fd (%d)", - chunk_dirfd); - chunk_directory_handle = lttng_directory_handle_create_from_dirfd( - chunk_dirfd); + DBG("Received trace chunk directory fd (%d)", chunk_dirfd); + chunk_directory_handle = + lttng_directory_handle_create_from_dirfd(chunk_dirfd); if (!chunk_directory_handle) { ERR("Failed to initialize chunk directory handle from directory file descriptor"); if (close(chunk_dirfd)) { @@ -1301,47 +1284,39 @@ error_rotate_channel: } ret_code = lttng_consumer_create_trace_chunk( - !is_local_trace ? &relayd_id : NULL, - msg.u.create_trace_chunk.session_id, - msg.u.create_trace_chunk.chunk_id, - (time_t) msg.u.create_trace_chunk - .creation_timestamp, - chunk_override_name, - msg.u.create_trace_chunk.credentials.is_set ? - &credentials : - NULL, - chunk_directory_handle); + !is_local_trace ? &relayd_id : NULL, + msg.u.create_trace_chunk.session_id, + msg.u.create_trace_chunk.chunk_id, + (time_t) msg.u.create_trace_chunk.creation_timestamp, + chunk_override_name, + msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL, + chunk_directory_handle); lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { enum lttng_trace_chunk_command_type close_command = - (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value; - const uint64_t relayd_id = - msg.u.close_trace_chunk.relayd_id.value; + (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value; + const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; struct lttcomm_consumer_close_trace_chunk_reply reply; char path[LTTNG_PATH_MAX]; ssize_t ret_send; ret_code = lttng_consumer_close_trace_chunk( - msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : - NULL, - msg.u.close_trace_chunk.session_id, - msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp, - msg.u.close_trace_chunk.close_command.is_set ? - &close_command : - NULL, path); + msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : NULL, + msg.u.close_trace_chunk.session_id, + msg.u.close_trace_chunk.chunk_id, + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? &close_command : NULL, + path); reply.ret_code = ret_code; reply.path_length = strlen(path) + 1; ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); if (ret_send != sizeof(reply)) { goto error_fatal; } - ret_send = lttcomm_send_unix_sock( - sock, path, reply.path_length); + ret_send = lttcomm_send_unix_sock(sock, path, reply.path_length); if (ret_send != reply.path_length) { goto error_fatal; } @@ -1349,21 +1324,18 @@ error_rotate_channel: } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { - const uint64_t relayd_id = - msg.u.trace_chunk_exists.relayd_id.value; + const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value; ret_code = lttng_consumer_trace_chunk_exists( - msg.u.trace_chunk_exists.relayd_id.is_set ? - &relayd_id : NULL, - msg.u.trace_chunk_exists.session_id, - msg.u.trace_chunk_exists.chunk_id); + msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : NULL, + msg.u.trace_chunk_exists.session_id, + msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: { const uint64_t key = msg.u.open_channel_packets.key; - struct lttng_consumer_channel *channel = - consumer_find_channel(key); + struct lttng_consumer_channel *channel = consumer_find_channel(key); if (channel) { pthread_mutex_lock(&channel->lock); @@ -1421,8 +1393,7 @@ end: * * Metadata stream lock MUST be acquired. */ -enum sync_metadata_status lttng_kconsumer_sync_metadata( - struct lttng_consumer_stream *metadata) +enum sync_metadata_status lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) { int ret; enum sync_metadata_status status; @@ -1454,20 +1425,18 @@ end: return status; } -static -int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) +static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - ret = kernctl_get_subbuf_size( - stream->wait_fd, &subbuf->info.data.subbuf_size); + ret = kernctl_get_subbuf_size(stream->wait_fd, &subbuf->info.data.subbuf_size); if (ret) { goto end; } - ret = kernctl_get_padded_subbuf_size( - stream->wait_fd, &subbuf->info.data.padded_subbuf_size); + ret = kernctl_get_padded_subbuf_size(stream->wait_fd, + &subbuf->info.data.padded_subbuf_size); if (ret) { goto end; } @@ -1476,9 +1445,8 @@ end: return ret; } -static -int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) +static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; @@ -1487,8 +1455,7 @@ int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - ret = kernctl_get_metadata_version( - stream->wait_fd, &subbuf->info.metadata.version); + ret = kernctl_get_metadata_version(stream->wait_fd, &subbuf->info.metadata.version); if (ret) { goto end; } @@ -1497,9 +1464,8 @@ end: return ret; } -static -int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuf) +static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; @@ -1508,43 +1474,38 @@ int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - ret = kernctl_get_packet_size( - stream->wait_fd, &subbuf->info.data.packet_size); + ret = kernctl_get_packet_size(stream->wait_fd, &subbuf->info.data.packet_size); if (ret < 0) { PERROR("Failed to get sub-buffer packet size"); goto end; } - ret = kernctl_get_content_size( - stream->wait_fd, &subbuf->info.data.content_size); + ret = kernctl_get_content_size(stream->wait_fd, &subbuf->info.data.content_size); if (ret < 0) { PERROR("Failed to get sub-buffer content size"); goto end; } - ret = kernctl_get_timestamp_begin( - stream->wait_fd, &subbuf->info.data.timestamp_begin); + ret = kernctl_get_timestamp_begin(stream->wait_fd, &subbuf->info.data.timestamp_begin); if (ret < 0) { PERROR("Failed to get sub-buffer begin timestamp"); goto end; } - ret = kernctl_get_timestamp_end( - stream->wait_fd, &subbuf->info.data.timestamp_end); + ret = kernctl_get_timestamp_end(stream->wait_fd, &subbuf->info.data.timestamp_end); if (ret < 0) { PERROR("Failed to get sub-buffer end timestamp"); goto end; } - ret = kernctl_get_events_discarded( - stream->wait_fd, &subbuf->info.data.events_discarded); + ret = kernctl_get_events_discarded(stream->wait_fd, &subbuf->info.data.events_discarded); if (ret) { PERROR("Failed to get sub-buffer events discarded count"); goto end; } ret = kernctl_get_sequence_number(stream->wait_fd, - &subbuf->info.data.sequence_number.value); + &subbuf->info.data.sequence_number.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -1555,15 +1516,13 @@ int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, subbuf->info.data.sequence_number.is_set = true; } - ret = kernctl_get_stream_id( - stream->wait_fd, &subbuf->info.data.stream_id); + ret = kernctl_get_stream_id(stream->wait_fd, &subbuf->info.data.stream_id); if (ret < 0) { PERROR("Failed to get stream id"); goto end; } - ret = kernctl_get_instance_id(stream->wait_fd, - &subbuf->info.data.stream_instance_id.value); + ret = kernctl_get_instance_id(stream->wait_fd, &subbuf->info.data.stream_instance_id.value); if (ret) { /* May not be supported by older LTTng-modules. */ if (ret != -ENOTTY) { @@ -1577,10 +1536,8 @@ end: return ret; } -static -enum get_next_subbuffer_status get_subbuffer_common( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status get_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; enum get_next_subbuffer_status status; @@ -1591,7 +1548,7 @@ enum get_next_subbuffer_status get_subbuffer_common( status = GET_NEXT_SUBBUFFER_STATUS_OK; break; case -ENODATA: - case -EAGAIN: + case -EAGAIN: /* * The caller only expects -ENODATA when there is no data to * read, but the kernel tracer returns -EAGAIN when there is @@ -1606,8 +1563,7 @@ enum get_next_subbuffer_status get_subbuffer_common( goto end; } - ret = stream->read_subbuffer_ops.extract_subbuffer_info( - stream, subbuffer); + ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer); if (ret) { status = GET_NEXT_SUBBUFFER_STATUS_ERROR; } @@ -1615,13 +1571,10 @@ end: return status; } -static -enum get_next_subbuffer_status get_next_subbuffer_splice( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status +get_next_subbuffer_splice(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { - const enum get_next_subbuffer_status status = - get_subbuffer_common(stream, subbuffer); + const enum get_next_subbuffer_status status = get_subbuffer_common(stream, subbuffer); if (status != GET_NEXT_SUBBUFFER_STATUS_OK) { goto end; @@ -1632,10 +1585,8 @@ end: return status; } -static -enum get_next_subbuffer_status get_next_subbuffer_mmap( - struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status get_next_subbuffer_mmap(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; enum get_next_subbuffer_status status; @@ -1652,29 +1603,27 @@ enum get_next_subbuffer_status get_next_subbuffer_mmap( goto end; } - subbuffer->buffer.buffer = lttng_buffer_view_init( - addr, 0, subbuffer->info.data.padded_subbuf_size); + subbuffer->buffer.buffer = + lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); end: return status; } -static -enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) +static enum get_next_subbuffer_status +get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) { int ret; const char *addr; bool coherent; enum get_next_subbuffer_status status; - ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, - &coherent); + ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, &coherent); if (ret) { goto end; } - ret = stream->read_subbuffer_ops.extract_subbuffer_info( - stream, subbuffer); + ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer); if (ret) { goto end; } @@ -1686,11 +1635,11 @@ enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_co goto end; } - subbuffer->buffer.buffer = lttng_buffer_view_init( - addr, 0, subbuffer->info.data.padded_subbuf_size); + subbuffer->buffer.buffer = + lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size); DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s", - subbuffer->info.metadata.padded_subbuf_size, - coherent ? "true" : "false"); + subbuffer->info.metadata.padded_subbuf_size, + coherent ? "true" : "false"); end: /* * The caller only expects -ENODATA when there is no data to read, but @@ -1721,9 +1670,8 @@ end: return status; } -static -int put_next_subbuffer(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer __attribute__((unused))) +static int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer __attribute__((unused))) { const int ret = kernctl_put_next_subbuf(stream->wait_fd); @@ -1739,8 +1687,7 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream, return ret; } -static -bool is_get_next_check_metadata_available(int tracer_fd) +static bool is_get_next_check_metadata_available(int tracer_fd) { const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL); const bool available = ret != -ENOTTY; @@ -1753,17 +1700,14 @@ bool is_get_next_check_metadata_available(int tracer_fd) return available; } -static -int signal_metadata(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx __attribute__((unused))) +static int signal_metadata(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx __attribute__((unused))) { ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static -int lttng_kconsumer_set_stream_ops( - struct lttng_consumer_stream *stream) +static int lttng_kconsumer_set_stream_ops(struct lttng_consumer_stream *stream) { int ret = 0; @@ -1772,9 +1716,8 @@ int lttng_kconsumer_set_stream_ops( if (is_get_next_check_metadata_available(stream->wait_fd)) { DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached"); stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_metadata_check; - ret = consumer_stream_enable_metadata_bucketization( - stream); + get_next_subbuffer_metadata_check; + ret = consumer_stream_enable_metadata_bucketization(stream); if (ret) { goto end; } @@ -1797,23 +1740,18 @@ int lttng_kconsumer_set_stream_ops( if (!stream->read_subbuffer_ops.get_next_subbuffer) { if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_mmap; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_mmap; } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_splice; + stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_splice; } } if (stream->metadata_flag) { - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_metadata_subbuffer_info; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; } else { - stream->read_subbuffer_ops.extract_subbuffer_info = - extract_data_subbuffer_info; + stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info; if (stream->chan->is_live) { - stream->read_subbuffer_ops.send_live_beacon = - consumer_flush_kernel_index; + stream->read_subbuffer_ops.send_live_beacon = consumer_flush_kernel_index; } } @@ -1833,7 +1771,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) * no current trace chunk on the parent channel. */ if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor && - stream->chan->trace_chunk) { + stream->chan->trace_chunk) { ret = consumer_stream_create_output_files(stream, true); if (ret) { goto error; @@ -1851,8 +1789,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } stream->mmap_len = (size_t) mmap_len; - stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ, - MAP_PRIVATE, stream->wait_fd, 0); + stream->mmap_base = + mmap(NULL, stream->mmap_len, PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); if (stream->mmap_base == MAP_FAILED) { PERROR("Error mmaping"); ret = -1; @@ -1904,7 +1842,7 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) /* There is still data so let's put back this subbuffer. */ ret = kernctl_put_subbuf(stream->wait_fd); LTTNG_ASSERT(ret == 0); - ret = 1; /* Data is pending */ + ret = 1; /* Data is pending */ goto end; }