X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fkernel-consumer.cpp;h=5efe2c2ae146bcfb7e701c300dd8ce555f1db8b2;hb=cd9adb8b829564212158943a0d279bb35322ab30;hp=844f535416a80ee02dd055083e3555964b29b9f1;hpb=c9e313bc594f40a86eed237dce222c0fc99c957f;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/kernel-consumer.cpp b/src/bin/lttng-sessiond/kernel-consumer.cpp index 844f53541..5efe2c2ae 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.cpp +++ b/src/bin/lttng-sessiond/kernel-consumer.cpp @@ -6,59 +6,60 @@ */ #define _LGPL_SOURCE -#include -#include -#include -#include -#include - -#include -#include -#include - #include "consumer.hpp" #include "health-sessiond.hpp" #include "kernel-consumer.hpp" +#include "lttng-sessiond.hpp" #include "notification-thread-commands.hpp" #include "session.hpp" -#include "lttng-sessiond.hpp" -static char *create_channel_path(struct consumer_output *consumer, - size_t *consumer_path_offset) +#include +#include +#include + +#include +#include +#include +#include +#include + +static char *create_channel_path(struct consumer_output *consumer, size_t *consumer_path_offset) { int ret; char tmp_path[PATH_MAX]; - char *pathname = NULL; + char *pathname = nullptr; LTTNG_ASSERT(consumer); /* Get the right path name destination */ if (consumer->type == CONSUMER_DST_LOCAL || - (consumer->type == CONSUMER_DST_NET && - consumer->relay_major_version == 2 && - consumer->relay_minor_version >= 11)) { + (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 && + consumer->relay_minor_version >= 11)) { pathname = strdup(consumer->domain_subdir); if (!pathname) { PERROR("Failed to copy domain subdirectory string %s", - consumer->domain_subdir); + consumer->domain_subdir); goto error; } *consumer_path_offset = strlen(consumer->domain_subdir); DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"", - pathname); + pathname); } else { /* Network output, relayd < 2.11. */ - ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s", - consumer->dst.net.base_dir, - consumer->domain_subdir); + ret = snprintf(tmp_path, + sizeof(tmp_path), + "%s%s", + consumer->dst.net.base_dir, + consumer->domain_subdir); if (ret < 0) { PERROR("snprintf kernel metadata path"); goto error; } else if (ret >= sizeof(tmp_path)) { ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"", - sizeof(tmp_path), ret, - consumer->dst.net.base_dir, - consumer->domain_subdir); + sizeof(tmp_path), + ret, + consumer->dst.net.base_dir, + consumer->domain_subdir); goto error; } pathname = lttng_strndup(tmp_path, sizeof(tmp_path)); @@ -74,24 +75,23 @@ static char *create_channel_path(struct consumer_output *consumer, error: free(pathname); - return NULL; + return nullptr; } /* * Sending a single channel to the consumer with command ADD_CHANNEL. */ -static -int kernel_consumer_add_channel(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, - struct ltt_kernel_session *ksession, - unsigned int monitor) +static int kernel_consumer_add_channel(struct consumer_socket *sock, + struct ltt_kernel_channel *channel, + struct ltt_kernel_session *ksession, + unsigned int monitor) { int ret; - char *pathname = NULL; + char *pathname = nullptr; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; enum lttng_error_code status; - struct ltt_session *session = NULL; + struct ltt_session *session = nullptr; struct lttng_channel_extended *channel_attr_extended; bool is_local_trace; size_t consumer_path_offset = 0; @@ -102,11 +102,10 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, LTTNG_ASSERT(ksession->consumer); consumer = ksession->consumer; - channel_attr_extended = (struct lttng_channel_extended *) - channel->channel->attr.extended.ptr; + channel_attr_extended = + (struct lttng_channel_extended *) channel->channel->attr.extended.ptr; - DBG("Kernel consumer adding channel %s to kernel consumer", - channel->channel->name); + DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name); is_local_trace = consumer->net_seq_index == -1ULL; pathname = create_channel_path(consumer, &consumer_path_offset); @@ -119,8 +118,7 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, enum lttng_trace_chunk_status chunk_status; char *pathname_index; - ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, - pathname); + ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname); if (ret < 0) { ERR("Failed to format channel index directory"); ret = -1; @@ -131,8 +129,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, * Create the index subdirectory which will take care * of implicitly creating the channel's path. */ - chunk_status = lttng_trace_chunk_create_subdirectory( - ksession->current_trace_chunk, pathname_index); + chunk_status = lttng_trace_chunk_create_subdirectory(ksession->current_trace_chunk, + pathname_index); free(pathname_index); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; @@ -142,21 +140,21 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, - channel->key, - ksession->id, - &pathname[consumer_path_offset], - consumer->net_seq_index, - channel->channel->name, - channel->stream_count, - channel->channel->attr.output, - CONSUMER_CHANNEL_TYPE_DATA, - channel->channel->attr.tracefile_size, - channel->channel->attr.tracefile_count, - monitor, - channel->channel->attr.live_timer_interval, - ksession->is_live_session, - channel_attr_extended->monitor_timer_interval, - ksession->current_trace_chunk); + channel->key, + ksession->id, + &pathname[consumer_path_offset], + consumer->net_seq_index, + channel->channel->name, + channel->stream_count, + channel->channel->attr.output, + CONSUMER_CHANNEL_TYPE_DATA, + channel->channel->attr.tracefile_size, + channel->channel->attr.tracefile_count, + monitor, + channel->channel->attr.live_timer_interval, + ksession->is_live_session, + channel_attr_extended->monitor_timer_interval, + ksession->current_trace_chunk); health_code_update(); @@ -169,15 +167,16 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, rcu_read_lock(); session = session_find_by_id(ksession->id); LTTNG_ASSERT(session); - LTTNG_ASSERT(pthread_mutex_trylock(&session->lock)); - LTTNG_ASSERT(session_trylock_list()); - - status = notification_thread_command_add_channel( - the_notification_thread_handle, session->name, - ksession->uid, ksession->gid, channel->channel->name, - channel->key, LTTNG_DOMAIN_KERNEL, - channel->channel->attr.subbuf_size * - channel->channel->attr.num_subbuf); + ASSERT_LOCKED(session->lock); + ASSERT_SESSION_LIST_LOCKED(); + + status = notification_thread_command_add_channel(the_notification_thread_handle, + session->id, + channel->channel->name, + channel->key, + LTTNG_DOMAIN_KERNEL, + channel->channel->attr.subbuf_size * + channel->channel->attr.num_subbuf); rcu_read_unlock(); if (status != LTTNG_OK) { ret = -1; @@ -200,7 +199,8 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_add_metadata(struct consumer_socket *sock, - struct ltt_kernel_session *ksession, unsigned int monitor) + struct ltt_kernel_session *ksession, + unsigned int monitor) { int ret; struct lttcomm_consumer_msg lkm; @@ -213,29 +213,28 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, LTTNG_ASSERT(ksession->consumer); LTTNG_ASSERT(sock); - DBG("Sending metadata %d to kernel consumer", - ksession->metadata_stream_fd); + DBG("Sending metadata %d to kernel consumer", ksession->metadata_stream_fd); /* Get consumer output pointer */ consumer = ksession->consumer; /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, - ksession->metadata->key, - ksession->id, - "", - consumer->net_seq_index, - ksession->metadata->conf->name, - 1, - ksession->metadata->conf->attr.output, - CONSUMER_CHANNEL_TYPE_METADATA, - ksession->metadata->conf->attr.tracefile_size, - ksession->metadata->conf->attr.tracefile_count, - monitor, - ksession->metadata->conf->attr.live_timer_interval, - ksession->is_live_session, - 0, - ksession->current_trace_chunk); + ksession->metadata->key, + ksession->id, + "", + consumer->net_seq_index, + ksession->metadata->conf->name, + 1, + ksession->metadata->conf->attr.output, + CONSUMER_CHANNEL_TYPE_METADATA, + ksession->metadata->conf->attr.tracefile_size, + ksession->metadata->conf->attr.tracefile_count, + monitor, + ksession->metadata->conf->attr.live_timer_interval, + ksession->is_live_session, + 0, + ksession->current_trace_chunk); health_code_update(); @@ -248,15 +247,14 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, /* Prep stream message structure */ consumer_init_add_stream_comm_msg(&lkm, - ksession->metadata->key, - ksession->metadata_stream_fd, - 0 /* CPU: 0 for metadata. */); + ksession->metadata->key, + ksession->metadata_stream_fd, + 0 /* CPU: 0 for metadata. */); health_code_update(); /* Send stream and file descriptor */ - ret = consumer_send_stream(sock, consumer, &lkm, - &ksession->metadata_stream_fd, 1); + ret = consumer_send_stream(sock, consumer, &lkm, &ksession->metadata_stream_fd, 1); if (ret < 0) { goto error; } @@ -271,11 +269,10 @@ error: /* * Sending a single stream to the consumer with command ADD_STREAM. */ -static -int kernel_consumer_add_stream(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, - struct ltt_kernel_stream *stream, - struct ltt_kernel_session *session) +static int kernel_consumer_add_stream(struct consumer_socket *sock, + struct ltt_kernel_channel *channel, + struct ltt_kernel_stream *stream, + struct ltt_kernel_session *session) { int ret; struct lttcomm_consumer_msg lkm; @@ -288,16 +285,14 @@ int kernel_consumer_add_stream(struct consumer_socket *sock, LTTNG_ASSERT(sock); DBG("Sending stream %d of channel %s to kernel consumer", - stream->fd, channel->channel->name); + stream->fd, + channel->channel->name); /* Get consumer output pointer */ consumer = session->consumer; /* Prep stream consumer message */ - consumer_init_add_stream_comm_msg(&lkm, - channel->key, - stream->fd, - stream->cpu); + consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, stream->cpu); health_code_update(); @@ -317,7 +312,8 @@ error: * Sending the notification that all streams were sent with STREAMS_SENT. */ int kernel_consumer_streams_sent(struct consumer_socket *sock, - struct ltt_kernel_session *session, uint64_t channel_key) + struct ltt_kernel_session *session, + uint64_t channel_key) { int ret; struct lttcomm_consumer_msg lkm; @@ -331,9 +327,8 @@ int kernel_consumer_streams_sent(struct consumer_socket *sock, consumer = session->consumer; /* Prep stream consumer message */ - consumer_init_streams_sent_comm_msg(&lkm, - LTTNG_CONSUMER_STREAMS_SENT, - channel_key, consumer->net_seq_index); + consumer_init_streams_sent_comm_msg( + &lkm, LTTNG_CONSUMER_STREAMS_SENT, channel_key, consumer->net_seq_index); health_code_update(); @@ -353,8 +348,9 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession, - unsigned int monitor) + struct ltt_kernel_channel *channel, + struct ltt_kernel_session *ksession, + unsigned int monitor) { int ret = LTTNG_OK; struct ltt_kernel_stream *stream; @@ -373,8 +369,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, goto error; } - DBG("Sending streams of channel %s to kernel consumer", - channel->channel->name); + DBG("Sending streams of channel %s to kernel consumer", channel->channel->name); if (!channel->sent_to_consumer) { ret = kernel_consumer_add_channel(sock, channel, ksession, monitor); @@ -385,14 +380,13 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } /* Send streams */ - cds_list_for_each_entry(stream, &channel->stream_list.head, list) { + cds_list_for_each_entry (stream, &channel->stream_list.head, list) { if (!stream->fd || stream->sent_to_consumer) { continue; } /* Add stream on the kernel consumer side. */ - ret = kernel_consumer_add_stream(sock, channel, stream, - ksession); + ret = kernel_consumer_add_stream(sock, channel, stream, ksession); if (ret < 0) { goto error; } @@ -409,8 +403,7 @@ error: * * The consumer socket lock must be held by the caller. */ -int kernel_consumer_send_session(struct consumer_socket *sock, - struct ltt_kernel_session *session) +int kernel_consumer_send_session(struct consumer_socket *sock, struct ltt_kernel_session *session) { int ret, monitor = 0; struct ltt_kernel_channel *chan; @@ -441,9 +434,8 @@ int kernel_consumer_send_session(struct consumer_socket *sock, } /* Send channel and streams of it */ - cds_list_for_each_entry(chan, &session->channel_list.head, list) { - ret = kernel_consumer_send_channel_streams(sock, chan, session, - monitor); + cds_list_for_each_entry (chan, &session->channel_list.head, list) { + ret = kernel_consumer_send_channel_streams(sock, chan, session, monitor); if (ret < 0) { goto error; } @@ -469,7 +461,7 @@ error: } int kernel_consumer_destroy_channel(struct consumer_socket *socket, - struct ltt_kernel_channel *channel) + struct ltt_kernel_channel *channel) { int ret; struct lttcomm_consumer_msg msg; @@ -498,7 +490,7 @@ error: } int kernel_consumer_destroy_metadata(struct consumer_socket *socket, - struct ltt_kernel_metadata *metadata) + struct ltt_kernel_metadata *metadata) { int ret; struct lttcomm_consumer_msg msg;