/*
- * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
*
* SPDX-License-Identifier: GPL-2.0-only
*
*/
#define _LGPL_SOURCE
+#include "buffer-registry.hpp"
+#include "consumer.hpp"
+#include "health-sessiond.hpp"
+#include "lttng-sessiond.hpp"
+#include "lttng-ust-error.hpp"
+#include "session.hpp"
+#include "ust-consumer.hpp"
+
+#include <common/common.hpp>
+#include <common/compat/errno.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/defaults.hpp>
+
+#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-#include <inttypes.h>
-
-#include <common/compat/errno.h>
-#include <common/common.h>
-#include <common/consumer/consumer.h>
-#include <common/defaults.h>
-#include "consumer.h"
-#include "health-sessiond.h"
-#include "ust-consumer.h"
-#include "lttng-ust-error.h"
-#include "buffer-registry.h"
-#include "session.h"
-#include "lttng-sessiond.h"
+namespace lsu = lttng::sessiond::ust;
/*
* Send a single channel to the consumer using command ASK_CHANNEL_CREATION.
* Consumer socket lock MUST be acquired before calling this.
*/
static int ask_channel_creation(struct ust_app_session *ua_sess,
- struct ust_app_channel *ua_chan,
- struct consumer_output *consumer,
- struct consumer_socket *socket,
- struct ust_registry_session *registry,
- struct lttng_trace_chunk *trace_chunk)
+ struct ust_app_channel *ua_chan,
+ struct consumer_output *consumer,
+ struct consumer_socket *socket,
+ lsu::registry_session *registry,
+ struct lttng_trace_chunk *trace_chunk)
{
int ret, output;
uint32_t chan_id;
uint64_t key, chan_reg_key;
- char *pathname = NULL;
+ char *pathname = nullptr;
struct lttcomm_consumer_msg msg;
- struct ust_registry_channel *ust_reg_chan;
char shm_path[PATH_MAX] = "";
char root_shm_path[PATH_MAX] = "";
bool is_local_trace;
is_local_trace = consumer->net_seq_index == -1ULL;
/* Format the channel's path (relative to the current trace chunk). */
- pathname = setup_channel_trace_path(consumer, ua_sess->path,
- &consumer_path_offset);
+ pathname = setup_channel_trace_path(consumer, ua_sess->path, &consumer_path_offset);
if (!pathname) {
ret = -1;
goto error;
enum lttng_trace_chunk_status chunk_status;
char *pathname_index;
- ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR,
- pathname);
+ ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname);
if (ret < 0) {
ERR("Failed to format channel index directory");
ret = -1;
* Create the index subdirectory which will take care
* of implicitly creating the channel's path.
*/
- chunk_status = lttng_trace_chunk_create_subdirectory(
- trace_chunk, pathname_index);
+ chunk_status = lttng_trace_chunk_create_subdirectory(trace_chunk, pathname_index);
free(pathname_index);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
* those buffer files.
*/
} else {
- ust_reg_chan = ust_registry_channel_find(registry, chan_reg_key);
- LTTNG_ASSERT(ust_reg_chan);
- chan_id = ust_reg_chan->chan_id;
+ {
+ auto locked_registry = registry->lock();
+ auto& ust_reg_chan = registry->channel(chan_reg_key);
+
+ chan_id = ust_reg_chan.id;
+ }
+
if (ua_sess->shm_path[0]) {
strncpy(shm_path, ua_sess->shm_path, sizeof(shm_path));
shm_path[sizeof(shm_path) - 1] = '\0';
- strncat(shm_path, "/",
- sizeof(shm_path) - strlen(shm_path) - 1);
- strncat(shm_path, ua_chan->name,
- sizeof(shm_path) - strlen(shm_path) - 1);
- strncat(shm_path, "_",
- sizeof(shm_path) - strlen(shm_path) - 1);
+ strncat(shm_path, "/", sizeof(shm_path) - strlen(shm_path) - 1);
+ strncat(shm_path, ua_chan->name, sizeof(shm_path) - strlen(shm_path) - 1);
+ strncat(shm_path, "_", sizeof(shm_path) - strlen(shm_path) - 1);
}
strncpy(root_shm_path, ua_sess->root_shm_path, sizeof(root_shm_path));
root_shm_path[sizeof(root_shm_path) - 1] = '\0';
}
consumer_init_ask_channel_comm_msg(&msg,
- ua_chan->attr.subbuf_size,
- ua_chan->attr.num_subbuf,
- ua_chan->attr.overwrite,
- ua_chan->attr.switch_timer_interval,
- ua_chan->attr.read_timer_interval,
- ua_sess->live_timer_interval,
- ua_sess->live_timer_interval != 0,
- ua_chan->monitor_timer_interval,
- output,
- (int) ua_chan->attr.type,
- ua_sess->tracing_id,
- &pathname[consumer_path_offset],
- ua_chan->name,
- consumer->net_seq_index,
- ua_chan->key,
- registry->uuid,
- chan_id,
- ua_chan->tracefile_size,
- ua_chan->tracefile_count,
- ua_sess->id,
- ua_sess->output_traces,
- lttng_credentials_get_uid(&ua_sess->real_credentials),
- ua_chan->attr.blocking_timeout,
- root_shm_path, shm_path,
- trace_chunk,
- &ua_sess->effective_credentials);
+ ua_chan->attr.subbuf_size,
+ ua_chan->attr.num_subbuf,
+ ua_chan->attr.overwrite,
+ ua_chan->attr.switch_timer_interval,
+ ua_chan->attr.read_timer_interval,
+ ua_sess->live_timer_interval,
+ ua_sess->live_timer_interval != 0,
+ ua_chan->monitor_timer_interval,
+ output,
+ (int) ua_chan->attr.type,
+ ua_sess->tracing_id,
+ &pathname[consumer_path_offset],
+ ua_chan->name,
+ consumer->net_seq_index,
+ ua_chan->key,
+ registry->uuid,
+ chan_id,
+ ua_chan->tracefile_size,
+ ua_chan->tracefile_count,
+ ua_sess->id,
+ ua_sess->output_traces,
+ lttng_credentials_get_uid(&ua_sess->real_credentials),
+ ua_chan->attr.blocking_timeout,
+ root_shm_path,
+ shm_path,
+ trace_chunk,
+ &ua_sess->effective_credentials);
health_code_update();
goto error;
}
- ret = consumer_recv_status_channel(socket, &key,
- &ua_chan->expected_stream_count);
+ ret = consumer_recv_status_channel(socket, &key, &ua_chan->expected_stream_count);
if (ret < 0) {
goto error;
}
LTTNG_ASSERT(ua_chan->expected_stream_count > 0);
}
- DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
- ua_chan->expected_stream_count);
+ DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)",
+ key,
+ ua_chan->expected_stream_count);
error:
free(pathname);
* Returns 0 on success else a negative value.
*/
int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
- struct ust_app_channel *ua_chan,
- struct consumer_output *consumer,
- struct consumer_socket *socket,
- struct ust_registry_session *registry,
- struct lttng_trace_chunk * trace_chunk)
+ struct ust_app_channel *ua_chan,
+ struct consumer_output *consumer,
+ struct consumer_socket *socket,
+ lsu::registry_session *registry,
+ struct lttng_trace_chunk *trace_chunk)
{
int ret;
}
pthread_mutex_lock(socket->lock);
- ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry,
- trace_chunk);
+ ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry, trace_chunk);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
ERR("ask_channel_creation consumer command failed");
*
* Return 0 on success else a negative value.
*/
-int ust_consumer_get_channel(struct consumer_socket *socket,
- struct ust_app_channel *ua_chan)
+int ust_consumer_get_channel(struct consumer_socket *socket, struct ust_app_channel *ua_chan)
{
int ret;
struct lttcomm_consumer_msg msg;
if (ret < 0) {
if (ret != -EPIPE) {
ERR("Error recv channel from consumer %d with ret %d",
- *socket->fd_ptr, ret);
+ *socket->fd_ptr,
+ ret);
} else {
DBG3("UST app recv channel from consumer. Consumer is dead.");
}
}
/* Next, get all streams. */
- while (1) {
+ while (true) {
struct ust_app_stream *stream;
/* Create UST stream */
stream = ust_app_alloc_stream();
- if (stream == NULL) {
+ if (stream == nullptr) {
ret = -ENOMEM;
goto error;
}
}
if (ret != -EPIPE) {
ERR("Recv stream from consumer %d with ret %d",
- *socket->fd_ptr, ret);
+ *socket->fd_ptr,
+ ret);
} else {
DBG3("UST app recv stream from consumer. Consumer is dead.");
}
*
* Return 0 on success else a negative value.
*/
-int ust_consumer_destroy_channel(struct consumer_socket *socket,
- struct ust_app_channel *ua_chan)
+int ust_consumer_destroy_channel(struct consumer_socket *socket, struct ust_app_channel *ua_chan)
{
int ret;
struct lttcomm_consumer_msg msg;
* On success return 0 else a negative value.
*/
int ust_consumer_send_stream_to_ust(struct ust_app *app,
- struct ust_app_channel *channel, struct ust_app_stream *stream)
+ struct ust_app_channel *channel,
+ struct ust_app_stream *stream)
{
int ret;
if (ret < 0) {
if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) {
DBG3("UST app send stream to ust failed. Application is dead. (pid: %d, sock: %d).",
- app->pid, app->sock);
+ app->pid,
+ app->sock);
} else if (ret == -EAGAIN) {
WARN("UST app send stream to ust failed. Communication time out (pid: %d, sock: %d).",
- app->pid, app->sock);
+ app->pid,
+ app->sock);
} else {
ERR("UST app send stream, handle %d, to ust failed with ret %d (pid: %d, sock: %d).",
- stream->obj->handle, ret, app->pid,
- app->sock);
+ stream->obj->handle,
+ ret,
+ app->pid,
+ app->sock);
}
goto error;
}
* On success return 0 else a negative value.
*/
int ust_consumer_send_channel_to_ust(struct ust_app *app,
- struct ust_app_session *ua_sess, struct ust_app_channel *channel)
+ struct ust_app_session *ua_sess,
+ struct ust_app_channel *channel)
{
int ret;
LTTNG_ASSERT(channel->obj);
DBG2("UST app send channel to sock %d pid %d (name: %s, key: %" PRIu64 ")",
- app->sock, app->pid, channel->name, channel->tracing_channel_id);
+ app->sock,
+ app->pid,
+ channel->name,
+ channel->tracing_channel_id);
/* Send stream to application. */
pthread_mutex_lock(&app->sock_lock);
if (ret < 0) {
if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) {
DBG3("UST app send channel to ust failed. Application is dead (pid: %d, sock: %d).",
- app->pid, app->sock);
+ app->pid,
+ app->sock);
} else if (ret == -EAGAIN) {
WARN("UST app send channel to ust failed. Communication timeout (pid: %d, sock: %d).",
- app->pid, app->sock);
+ app->pid,
+ app->sock);
} else {
ERR("UST app send channel %s, to ust failed with ret %d (pid: %d, sock: %d).",
- channel->name, ret, app->pid,
- app->sock);
+ channel->name,
+ ret,
+ app->pid,
+ app->sock);
}
goto error;
}
ssize_t ret_push;
struct lttcomm_metadata_request_msg request;
struct buffer_reg_uid *reg_uid;
- struct ust_registry_session *ust_reg;
+ lsu::registry_session *ust_reg;
struct lttcomm_consumer_msg msg;
LTTNG_ASSERT(socket);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
health_code_update();
/* Wait for a metadata request */
}
DBG("Metadata request received for session %" PRIu64 ", key %" PRIu64,
- request.session_id, request.key);
+ request.session_id,
+ request.key);
- reg_uid = buffer_reg_uid_find(request.session_id,
- request.bits_per_long, request.uid);
+ reg_uid = buffer_reg_uid_find(request.session_id, request.bits_per_long, request.uid);
if (reg_uid) {
ust_reg = reg_uid->registry->reg.ust;
} else {
- struct buffer_reg_pid *reg_pid =
- buffer_reg_pid_find(request.session_id_per_pid);
+ struct buffer_reg_pid *reg_pid = buffer_reg_pid_find(request.session_id_per_pid);
if (!reg_pid) {
DBG("PID registry not found for session id %" PRIu64,
- request.session_id_per_pid);
+ request.session_id_per_pid);
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_ERR_UND;
}
LTTNG_ASSERT(ust_reg);
- pthread_mutex_lock(&ust_reg->lock);
- ret_push = ust_app_push_metadata(ust_reg, socket, 1);
- pthread_mutex_unlock(&ust_reg->lock);
+ {
+ auto locked_ust_reg = ust_reg->lock();
+ ret_push = ust_app_push_metadata(locked_ust_reg, socket, 1);
+ }
if (ret_push == -EPIPE) {
DBG("Application or relay closed while pushing metadata");
} else if (ret_push < 0) {
ret = 0;
end:
- rcu_read_unlock();
return ret;
}