X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fust-consumer.cpp;fp=src%2Fbin%2Flttng-sessiond%2Fust-consumer.cpp;h=6b646fb941c767b1e4bdbd546619625f6c1186f3;hp=ef61cfc7e0f00271e5babb82b2c0f7e19118d0de;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d diff --git a/src/bin/lttng-sessiond/ust-consumer.cpp b/src/bin/lttng-sessiond/ust-consumer.cpp index ef61cfc7e..6b646fb94 100644 --- a/src/bin/lttng-sessiond/ust-consumer.cpp +++ b/src/bin/lttng-sessiond/ust-consumer.cpp @@ -6,24 +6,24 @@ */ #define _LGPL_SOURCE -#include -#include -#include -#include -#include +#include "buffer-registry.hpp" +#include "consumer.hpp" +#include "health-sessiond.hpp" +#include "lttng-sessiond.hpp" +#include "lttng-ust-error.hpp" +#include "session.hpp" +#include "ust-consumer.hpp" -#include #include +#include #include #include -#include "consumer.hpp" -#include "health-sessiond.hpp" -#include "ust-consumer.hpp" -#include "lttng-ust-error.hpp" -#include "buffer-registry.hpp" -#include "session.hpp" -#include "lttng-sessiond.hpp" +#include +#include +#include +#include +#include namespace lsu = lttng::sessiond::ust; @@ -33,11 +33,11 @@ namespace lsu = lttng::sessiond::ust; * Consumer socket lock MUST be acquired before calling this. */ static int ask_channel_creation(struct ust_app_session *ua_sess, - struct ust_app_channel *ua_chan, - struct consumer_output *consumer, - struct consumer_socket *socket, - lsu::registry_session *registry, - struct lttng_trace_chunk *trace_chunk) + struct ust_app_channel *ua_chan, + struct consumer_output *consumer, + struct consumer_socket *socket, + lsu::registry_session *registry, + struct lttng_trace_chunk *trace_chunk) { int ret, output; uint32_t chan_id; @@ -59,8 +59,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, is_local_trace = consumer->net_seq_index == -1ULL; /* Format the channel's path (relative to the current trace chunk). */ - pathname = setup_channel_trace_path(consumer, ua_sess->path, - &consumer_path_offset); + pathname = setup_channel_trace_path(consumer, ua_sess->path, &consumer_path_offset); if (!pathname) { ret = -1; goto error; @@ -70,8 +69,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, enum lttng_trace_chunk_status chunk_status; char *pathname_index; - ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, - pathname); + ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname); if (ret < 0) { ERR("Failed to format channel index directory"); ret = -1; @@ -82,8 +80,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, * Create the index subdirectory which will take care * of implicitly creating the channel's path. */ - chunk_status = lttng_trace_chunk_create_subdirectory( - trace_chunk, pathname_index); + chunk_status = lttng_trace_chunk_create_subdirectory(trace_chunk, pathname_index); free(pathname_index); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; @@ -116,12 +113,9 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, if (ua_sess->shm_path[0]) { strncpy(shm_path, ua_sess->shm_path, sizeof(shm_path)); shm_path[sizeof(shm_path) - 1] = '\0'; - strncat(shm_path, "/", - sizeof(shm_path) - strlen(shm_path) - 1); - strncat(shm_path, ua_chan->name, - sizeof(shm_path) - strlen(shm_path) - 1); - strncat(shm_path, "_", - sizeof(shm_path) - strlen(shm_path) - 1); + strncat(shm_path, "/", sizeof(shm_path) - strlen(shm_path) - 1); + strncat(shm_path, ua_chan->name, sizeof(shm_path) - strlen(shm_path) - 1); + strncat(shm_path, "_", sizeof(shm_path) - strlen(shm_path) - 1); } strncpy(root_shm_path, ua_sess->root_shm_path, sizeof(root_shm_path)); root_shm_path[sizeof(root_shm_path) - 1] = '\0'; @@ -135,32 +129,33 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, } consumer_init_ask_channel_comm_msg(&msg, - ua_chan->attr.subbuf_size, - ua_chan->attr.num_subbuf, - ua_chan->attr.overwrite, - ua_chan->attr.switch_timer_interval, - ua_chan->attr.read_timer_interval, - ua_sess->live_timer_interval, - ua_sess->live_timer_interval != 0, - ua_chan->monitor_timer_interval, - output, - (int) ua_chan->attr.type, - ua_sess->tracing_id, - &pathname[consumer_path_offset], - ua_chan->name, - consumer->net_seq_index, - ua_chan->key, - registry->uuid, - chan_id, - ua_chan->tracefile_size, - ua_chan->tracefile_count, - ua_sess->id, - ua_sess->output_traces, - lttng_credentials_get_uid(&ua_sess->real_credentials), - ua_chan->attr.blocking_timeout, - root_shm_path, shm_path, - trace_chunk, - &ua_sess->effective_credentials); + ua_chan->attr.subbuf_size, + ua_chan->attr.num_subbuf, + ua_chan->attr.overwrite, + ua_chan->attr.switch_timer_interval, + ua_chan->attr.read_timer_interval, + ua_sess->live_timer_interval, + ua_sess->live_timer_interval != 0, + ua_chan->monitor_timer_interval, + output, + (int) ua_chan->attr.type, + ua_sess->tracing_id, + &pathname[consumer_path_offset], + ua_chan->name, + consumer->net_seq_index, + ua_chan->key, + registry->uuid, + chan_id, + ua_chan->tracefile_size, + ua_chan->tracefile_count, + ua_sess->id, + ua_sess->output_traces, + lttng_credentials_get_uid(&ua_sess->real_credentials), + ua_chan->attr.blocking_timeout, + root_shm_path, + shm_path, + trace_chunk, + &ua_sess->effective_credentials); health_code_update(); @@ -169,8 +164,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, goto error; } - ret = consumer_recv_status_channel(socket, &key, - &ua_chan->expected_stream_count); + ret = consumer_recv_status_channel(socket, &key, &ua_chan->expected_stream_count); if (ret < 0) { goto error; } @@ -181,8 +175,9 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, LTTNG_ASSERT(ua_chan->expected_stream_count > 0); } - DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key, - ua_chan->expected_stream_count); + DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", + key, + ua_chan->expected_stream_count); error: free(pathname); @@ -198,11 +193,11 @@ error: * Returns 0 on success else a negative value. */ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, - struct ust_app_channel *ua_chan, - struct consumer_output *consumer, - struct consumer_socket *socket, - lsu::registry_session *registry, - struct lttng_trace_chunk * trace_chunk) + struct ust_app_channel *ua_chan, + struct consumer_output *consumer, + struct consumer_socket *socket, + lsu::registry_session *registry, + struct lttng_trace_chunk *trace_chunk) { int ret; @@ -219,8 +214,7 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, } pthread_mutex_lock(socket->lock); - ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry, - trace_chunk); + ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry, trace_chunk); pthread_mutex_unlock(socket->lock); if (ret < 0) { ERR("ask_channel_creation consumer command failed"); @@ -237,8 +231,7 @@ error: * * Return 0 on success else a negative value. */ -int ust_consumer_get_channel(struct consumer_socket *socket, - struct ust_app_channel *ua_chan) +int ust_consumer_get_channel(struct consumer_socket *socket, struct ust_app_channel *ua_chan) { int ret; struct lttcomm_consumer_msg msg; @@ -264,7 +257,8 @@ int ust_consumer_get_channel(struct consumer_socket *socket, if (ret < 0) { if (ret != -EPIPE) { ERR("Error recv channel from consumer %d with ret %d", - *socket->fd_ptr, ret); + *socket->fd_ptr, + ret); } else { DBG3("UST app recv channel from consumer. Consumer is dead."); } @@ -292,7 +286,8 @@ int ust_consumer_get_channel(struct consumer_socket *socket, } if (ret != -EPIPE) { ERR("Recv stream from consumer %d with ret %d", - *socket->fd_ptr, ret); + *socket->fd_ptr, + ret); } else { DBG3("UST app recv stream from consumer. Consumer is dead."); } @@ -331,8 +326,7 @@ error: * * Return 0 on success else a negative value. */ -int ust_consumer_destroy_channel(struct consumer_socket *socket, - struct ust_app_channel *ua_chan) +int ust_consumer_destroy_channel(struct consumer_socket *socket, struct ust_app_channel *ua_chan) { int ret; struct lttcomm_consumer_msg msg; @@ -364,7 +358,8 @@ error: * On success return 0 else a negative value. */ int ust_consumer_send_stream_to_ust(struct ust_app *app, - struct ust_app_channel *channel, struct ust_app_stream *stream) + struct ust_app_channel *channel, + struct ust_app_stream *stream) { int ret; @@ -381,14 +376,18 @@ int ust_consumer_send_stream_to_ust(struct ust_app *app, if (ret < 0) { if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) { DBG3("UST app send stream to ust failed. Application is dead. (pid: %d, sock: %d).", - app->pid, app->sock); + app->pid, + app->sock); } else if (ret == -EAGAIN) { WARN("UST app send stream to ust failed. Communication time out (pid: %d, sock: %d).", - app->pid, app->sock); + app->pid, + app->sock); } else { ERR("UST app send stream, handle %d, to ust failed with ret %d (pid: %d, sock: %d).", - stream->obj->handle, ret, app->pid, - app->sock); + stream->obj->handle, + ret, + app->pid, + app->sock); } goto error; } @@ -404,7 +403,8 @@ error: * On success return 0 else a negative value. */ int ust_consumer_send_channel_to_ust(struct ust_app *app, - struct ust_app_session *ua_sess, struct ust_app_channel *channel) + struct ust_app_session *ua_sess, + struct ust_app_channel *channel) { int ret; @@ -414,7 +414,10 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app, LTTNG_ASSERT(channel->obj); DBG2("UST app send channel to sock %d pid %d (name: %s, key: %" PRIu64 ")", - app->sock, app->pid, channel->name, channel->tracing_channel_id); + app->sock, + app->pid, + channel->name, + channel->tracing_channel_id); /* Send stream to application. */ pthread_mutex_lock(&app->sock_lock); @@ -423,14 +426,18 @@ int ust_consumer_send_channel_to_ust(struct ust_app *app, if (ret < 0) { if (ret == -EPIPE || ret == -LTTNG_UST_ERR_EXITING) { DBG3("UST app send channel to ust failed. Application is dead (pid: %d, sock: %d).", - app->pid, app->sock); + app->pid, + app->sock); } else if (ret == -EAGAIN) { WARN("UST app send channel to ust failed. Communication timeout (pid: %d, sock: %d).", - app->pid, app->sock); + app->pid, + app->sock); } else { ERR("UST app send channel %s, to ust failed with ret %d (pid: %d, sock: %d).", - channel->name, ret, app->pid, - app->sock); + channel->name, + ret, + app->pid, + app->sock); } goto error; } @@ -467,18 +474,17 @@ int ust_consumer_metadata_request(struct consumer_socket *socket) } DBG("Metadata request received for session %" PRIu64 ", key %" PRIu64, - request.session_id, request.key); + request.session_id, + request.key); - reg_uid = buffer_reg_uid_find(request.session_id, - request.bits_per_long, request.uid); + reg_uid = buffer_reg_uid_find(request.session_id, request.bits_per_long, request.uid); if (reg_uid) { ust_reg = reg_uid->registry->reg.ust; } else { - struct buffer_reg_pid *reg_pid = - buffer_reg_pid_find(request.session_id_per_pid); + struct buffer_reg_pid *reg_pid = buffer_reg_pid_find(request.session_id_per_pid); if (!reg_pid) { DBG("PID registry not found for session id %" PRIu64, - request.session_id_per_pid); + request.session_id_per_pid); memset(&msg, 0, sizeof(msg)); msg.cmd_type = LTTNG_ERR_UND;