*/
#define _LGPL_SOURCE
-#include <stdio.h>
-#include <stdlib.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <inttypes.h>
-
-#include <common/common.hpp>
-#include <common/defaults.hpp>
-#include <common/compat/string.hpp>
-
#include "consumer.hpp"
#include "health-sessiond.hpp"
#include "kernel-consumer.hpp"
+#include "lttng-sessiond.hpp"
#include "notification-thread-commands.hpp"
#include "session.hpp"
-#include "lttng-sessiond.hpp"
-static char *create_channel_path(struct consumer_output *consumer,
- size_t *consumer_path_offset)
+#include <common/common.hpp>
+#include <common/compat/string.hpp>
+#include <common/defaults.hpp>
+#include <common/urcu.hpp>
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+static char *create_channel_path(struct consumer_output *consumer, size_t *consumer_path_offset)
{
int ret;
char tmp_path[PATH_MAX];
- char *pathname = NULL;
+ char *pathname = nullptr;
LTTNG_ASSERT(consumer);
/* Get the right path name destination */
if (consumer->type == CONSUMER_DST_LOCAL ||
- (consumer->type == CONSUMER_DST_NET &&
- consumer->relay_major_version == 2 &&
- consumer->relay_minor_version >= 11)) {
+ (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
+ consumer->relay_minor_version >= 11)) {
pathname = strdup(consumer->domain_subdir);
if (!pathname) {
PERROR("Failed to copy domain subdirectory string %s",
- consumer->domain_subdir);
+ consumer->domain_subdir);
goto error;
}
*consumer_path_offset = strlen(consumer->domain_subdir);
DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
- pathname);
+ pathname);
} else {
/* Network output, relayd < 2.11. */
- ret = snprintf(tmp_path, sizeof(tmp_path), "%s%s",
- consumer->dst.net.base_dir,
- consumer->domain_subdir);
+ ret = snprintf(tmp_path,
+ sizeof(tmp_path),
+ "%s%s",
+ consumer->dst.net.base_dir,
+ consumer->domain_subdir);
if (ret < 0) {
PERROR("snprintf kernel metadata path");
goto error;
} else if (ret >= sizeof(tmp_path)) {
ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
- sizeof(tmp_path), ret,
- consumer->dst.net.base_dir,
- consumer->domain_subdir);
+ sizeof(tmp_path),
+ ret,
+ consumer->dst.net.base_dir,
+ consumer->domain_subdir);
goto error;
}
pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
error:
free(pathname);
- return NULL;
+ return nullptr;
}
/*
* Sending a single channel to the consumer with command ADD_CHANNEL.
*/
-static
-int kernel_consumer_add_channel(struct consumer_socket *sock,
- struct ltt_kernel_channel *channel,
- struct ltt_kernel_session *ksession,
- unsigned int monitor)
+static int kernel_consumer_add_channel(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel,
+ struct ltt_kernel_session *ksession,
+ unsigned int monitor)
{
int ret;
- char *pathname = NULL;
+ char *pathname = nullptr;
struct lttcomm_consumer_msg lkm;
struct consumer_output *consumer;
enum lttng_error_code status;
- struct ltt_session *session = NULL;
+ struct ltt_session *session = nullptr;
struct lttng_channel_extended *channel_attr_extended;
bool is_local_trace;
size_t consumer_path_offset = 0;
+ lttng::urcu::read_lock_guard read_lock;
/* Safety net */
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ksession->consumer);
consumer = ksession->consumer;
- channel_attr_extended = (struct lttng_channel_extended *)
- channel->channel->attr.extended.ptr;
+ channel_attr_extended =
+ (struct lttng_channel_extended *) channel->channel->attr.extended.ptr;
- DBG("Kernel consumer adding channel %s to kernel consumer",
- channel->channel->name);
+ DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name);
is_local_trace = consumer->net_seq_index == -1ULL;
pathname = create_channel_path(consumer, &consumer_path_offset);
enum lttng_trace_chunk_status chunk_status;
char *pathname_index;
- ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
- pathname);
+ ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname);
if (ret < 0) {
ERR("Failed to format channel index directory");
ret = -1;
* Create the index subdirectory which will take care
* of implicitly creating the channel's path.
*/
- chunk_status = lttng_trace_chunk_create_subdirectory(
- ksession->current_trace_chunk, pathname_index);
+ chunk_status = lttng_trace_chunk_create_subdirectory(ksession->current_trace_chunk,
+ pathname_index);
free(pathname_index);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
/* Prep channel message structure */
consumer_init_add_channel_comm_msg(&lkm,
- channel->key,
- ksession->id,
- &pathname[consumer_path_offset],
- consumer->net_seq_index,
- channel->channel->name,
- channel->stream_count,
- channel->channel->attr.output,
- CONSUMER_CHANNEL_TYPE_DATA,
- channel->channel->attr.tracefile_size,
- channel->channel->attr.tracefile_count,
- monitor,
- channel->channel->attr.live_timer_interval,
- ksession->is_live_session,
- channel_attr_extended->monitor_timer_interval,
- ksession->current_trace_chunk);
+ channel->key,
+ ksession->id,
+ &pathname[consumer_path_offset],
+ consumer->net_seq_index,
+ channel->channel->name,
+ channel->stream_count,
+ channel->channel->attr.output,
+ CONSUMER_CHANNEL_TYPE_DATA,
+ channel->channel->attr.tracefile_size,
+ channel->channel->attr.tracefile_count,
+ monitor,
+ channel->channel->attr.live_timer_interval,
+ ksession->is_live_session,
+ channel_attr_extended->monitor_timer_interval,
+ ksession->current_trace_chunk);
health_code_update();
}
health_code_update();
- rcu_read_lock();
session = session_find_by_id(ksession->id);
LTTNG_ASSERT(session);
- LTTNG_ASSERT(pthread_mutex_trylock(&session->lock));
- LTTNG_ASSERT(session_trylock_list());
-
- status = notification_thread_command_add_channel(
- the_notification_thread_handle, session->name,
- ksession->uid, ksession->gid, channel->channel->name,
- channel->key, LTTNG_DOMAIN_KERNEL,
- channel->channel->attr.subbuf_size *
- channel->channel->attr.num_subbuf);
- rcu_read_unlock();
+ ASSERT_LOCKED(session->lock);
+ ASSERT_SESSION_LIST_LOCKED();
+
+ status = notification_thread_command_add_channel(the_notification_thread_handle,
+ session->id,
+ channel->channel->name,
+ channel->key,
+ LTTNG_DOMAIN_KERNEL,
+ channel->channel->attr.subbuf_size *
+ channel->channel->attr.num_subbuf);
if (status != LTTNG_OK) {
ret = -1;
goto error;
* The consumer socket lock must be held by the caller.
*/
int kernel_consumer_add_metadata(struct consumer_socket *sock,
- struct ltt_kernel_session *ksession, unsigned int monitor)
+ struct ltt_kernel_session *ksession,
+ unsigned int monitor)
{
int ret;
struct lttcomm_consumer_msg lkm;
struct consumer_output *consumer;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Safety net */
LTTNG_ASSERT(ksession);
LTTNG_ASSERT(ksession->consumer);
LTTNG_ASSERT(sock);
- DBG("Sending metadata %d to kernel consumer",
- ksession->metadata_stream_fd);
+ DBG("Sending metadata %d to kernel consumer", ksession->metadata_stream_fd);
/* Get consumer output pointer */
consumer = ksession->consumer;
/* Prep channel message structure */
consumer_init_add_channel_comm_msg(&lkm,
- ksession->metadata->key,
- ksession->id,
- "",
- consumer->net_seq_index,
- ksession->metadata->conf->name,
- 1,
- ksession->metadata->conf->attr.output,
- CONSUMER_CHANNEL_TYPE_METADATA,
- ksession->metadata->conf->attr.tracefile_size,
- ksession->metadata->conf->attr.tracefile_count,
- monitor,
- ksession->metadata->conf->attr.live_timer_interval,
- ksession->is_live_session,
- 0,
- ksession->current_trace_chunk);
+ ksession->metadata->key,
+ ksession->id,
+ "",
+ consumer->net_seq_index,
+ ksession->metadata->conf->name,
+ 1,
+ ksession->metadata->conf->attr.output,
+ CONSUMER_CHANNEL_TYPE_METADATA,
+ ksession->metadata->conf->attr.tracefile_size,
+ ksession->metadata->conf->attr.tracefile_count,
+ monitor,
+ ksession->metadata->conf->attr.live_timer_interval,
+ ksession->is_live_session,
+ 0,
+ ksession->current_trace_chunk);
health_code_update();
/* Prep stream message structure */
consumer_init_add_stream_comm_msg(&lkm,
- ksession->metadata->key,
- ksession->metadata_stream_fd,
- 0 /* CPU: 0 for metadata. */);
+ ksession->metadata->key,
+ ksession->metadata_stream_fd,
+ 0 /* CPU: 0 for metadata. */);
health_code_update();
/* Send stream and file descriptor */
- ret = consumer_send_stream(sock, consumer, &lkm,
- &ksession->metadata_stream_fd, 1);
+ ret = consumer_send_stream(sock, consumer, &lkm, &ksession->metadata_stream_fd, 1);
if (ret < 0) {
goto error;
}
health_code_update();
error:
- rcu_read_unlock();
return ret;
}
/*
* Sending a single stream to the consumer with command ADD_STREAM.
*/
-static
-int kernel_consumer_add_stream(struct consumer_socket *sock,
- struct ltt_kernel_channel *channel,
- struct ltt_kernel_stream *stream,
- struct ltt_kernel_session *session)
+static int kernel_consumer_add_stream(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel,
+ struct ltt_kernel_stream *stream,
+ struct ltt_kernel_session *session)
{
int ret;
struct lttcomm_consumer_msg lkm;
LTTNG_ASSERT(sock);
DBG("Sending stream %d of channel %s to kernel consumer",
- stream->fd, channel->channel->name);
+ stream->fd,
+ channel->channel->name);
/* Get consumer output pointer */
consumer = session->consumer;
/* Prep stream consumer message */
- consumer_init_add_stream_comm_msg(&lkm,
- channel->key,
- stream->fd,
- stream->cpu);
+ consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, stream->cpu);
health_code_update();
* Sending the notification that all streams were sent with STREAMS_SENT.
*/
int kernel_consumer_streams_sent(struct consumer_socket *sock,
- struct ltt_kernel_session *session, uint64_t channel_key)
+ struct ltt_kernel_session *session,
+ uint64_t channel_key)
{
int ret;
struct lttcomm_consumer_msg lkm;
consumer = session->consumer;
/* Prep stream consumer message */
- consumer_init_streams_sent_comm_msg(&lkm,
- LTTNG_CONSUMER_STREAMS_SENT,
- channel_key, consumer->net_seq_index);
+ consumer_init_streams_sent_comm_msg(
+ &lkm, LTTNG_CONSUMER_STREAMS_SENT, channel_key, consumer->net_seq_index);
health_code_update();
* The consumer socket lock must be held by the caller.
*/
int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
- struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
- unsigned int monitor)
+ struct ltt_kernel_channel *channel,
+ struct ltt_kernel_session *ksession,
+ unsigned int monitor)
{
int ret = LTTNG_OK;
struct ltt_kernel_stream *stream;
LTTNG_ASSERT(ksession->consumer);
LTTNG_ASSERT(sock);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Bail out if consumer is disabled */
if (!ksession->consumer->enabled) {
goto error;
}
- DBG("Sending streams of channel %s to kernel consumer",
- channel->channel->name);
+ DBG("Sending streams of channel %s to kernel consumer", channel->channel->name);
if (!channel->sent_to_consumer) {
ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
}
/* Send streams */
- cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
+ cds_list_for_each_entry (stream, &channel->stream_list.head, list) {
if (!stream->fd || stream->sent_to_consumer) {
continue;
}
/* Add stream on the kernel consumer side. */
- ret = kernel_consumer_add_stream(sock, channel, stream,
- ksession);
+ ret = kernel_consumer_add_stream(sock, channel, stream, ksession);
if (ret < 0) {
goto error;
}
}
error:
- rcu_read_unlock();
return ret;
}
*
* The consumer socket lock must be held by the caller.
*/
-int kernel_consumer_send_session(struct consumer_socket *sock,
- struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock, struct ltt_kernel_session *session)
{
int ret, monitor = 0;
struct ltt_kernel_channel *chan;
}
/* Send channel and streams of it */
- cds_list_for_each_entry(chan, &session->channel_list.head, list) {
- ret = kernel_consumer_send_channel_streams(sock, chan, session,
- monitor);
+ cds_list_for_each_entry (chan, &session->channel_list.head, list) {
+ ret = kernel_consumer_send_channel_streams(sock, chan, session, monitor);
if (ret < 0) {
goto error;
}
}
int kernel_consumer_destroy_channel(struct consumer_socket *socket,
- struct ltt_kernel_channel *channel)
+ struct ltt_kernel_channel *channel)
{
int ret;
struct lttcomm_consumer_msg msg;
}
int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
- struct ltt_kernel_metadata *metadata)
+ struct ltt_kernel_metadata *metadata)
{
int ret;
struct lttcomm_consumer_msg msg;