From: David Goulet Date: Mon, 23 Jul 2012 15:03:08 +0000 (-0400) Subject: Code cleanup in the ust/kernel consumer file X-Git-Tag: v2.1.0-rc1~64 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=37278a1e7efe00011260569fa90909601e8c5184 Code cleanup in the ust/kernel consumer file Generalize the send relayd socket function call for both UST and kernel. Refactor code in ust-consumer.c which only moves code around and bring no changes to the behavior. Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 56d938114..72450d50c 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -331,3 +331,54 @@ int consumer_send_stream(int sock, struct consumer_output *dst, error: return ret; } + +/* + * Send relayd socket to consumer associated with a session name. + * + * On success return positive value. On error, negative value. + */ +int consumer_send_relayd_socket(int consumer_sock, + struct lttcomm_sock *sock, struct consumer_output *consumer, + enum lttng_stream_type type) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + assert(sock); + assert(consumer); + + /* Bail out if consumer is disabled */ + if (!consumer->enabled) { + ret = LTTCOMM_OK; + goto error; + } + + msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; + /* + * Assign network consumer output index using the temporary consumer since + * this call should only be made from within a set_consumer_uri() function + * call in the session daemon. + */ + msg.u.relayd_sock.net_index = consumer->net_seq_index; + msg.u.relayd_sock.type = type; + memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); + + DBG3("Sending relayd sock info to consumer"); + ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); + if (ret < 0) { + PERROR("send consumer relayd socket info"); + goto error; + } + + DBG3("Sending relayd socket file descriptor to consumer"); + ret = consumer_send_fds(consumer_sock, &sock->fd, 1); + if (ret < 0) { + goto error; + } + + DBG2("Consumer relayd socket sent"); + +error: + return ret; +} diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index fff32de0f..ab36a685c 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -108,6 +108,10 @@ int consumer_send_fds(int sock, int *fds, size_t nb_fd); int consumer_send_stream(int sock, struct consumer_output *dst, struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd); int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg); +int consumer_send_relayd_socket(int consumer_sock, + struct lttcomm_sock *sock, struct consumer_output *consumer, + enum lttng_stream_type type); + void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, int channel_key, diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 5d690548c..4c037a9eb 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -269,54 +269,3 @@ int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session) error: return ret; } - -/* - * Send relayd socket to consumer associated with a session name. - * - * On success return positive value. On error, negative value. - */ -int kernel_consumer_send_relayd_socket(int consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type) -{ - int ret; - struct lttcomm_consumer_msg msg; - - /* Code flow error. Safety net. */ - assert(sock); - assert(consumer); - - /* Bail out if consumer is disabled */ - if (!consumer->enabled) { - ret = LTTCOMM_OK; - goto error; - } - - msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; - /* - * Assign network consumer output index using the temporary consumer since - * this call should only be made from within a set_consumer_uri() function - * call in the session daemon. - */ - msg.u.relayd_sock.net_index = consumer->net_seq_index; - msg.u.relayd_sock.type = type; - memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); - - DBG2("Sending relayd sock info to consumer"); - ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); - if (ret < 0) { - PERROR("send consumer relayd socket info"); - goto error; - } - - DBG2("Sending relayd socket file descriptor to consumer"); - ret = consumer_send_fds(consumer_sock, &sock->fd, 1); - if (ret < 0) { - goto error; - } - - DBG("Kernel consumer relayd socket sent"); - -error: - return ret; -} diff --git a/src/bin/lttng-sessiond/kernel-consumer.h b/src/bin/lttng-sessiond/kernel-consumer.h index 02e3ec008..8aba019e6 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.h +++ b/src/bin/lttng-sessiond/kernel-consumer.h @@ -26,10 +26,6 @@ int kernel_consumer_send_channel_stream(int sock, int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session); -int kernel_consumer_send_relayd_socket(int consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type); - int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, struct ltt_kernel_session *session); diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 3c16d50a0..85b960ee3 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -1957,25 +1957,12 @@ static int send_socket_relayd_consumer(int domain, struct ltt_session *session, session->net_handle = 1; } - switch (domain) { - case LTTNG_DOMAIN_KERNEL: - /* Send relayd socket to consumer. */ - ret = kernel_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; - case LTTNG_DOMAIN_UST: - /* Send relayd socket to consumer. */ - ret = ust_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; + /* Send relayd socket to consumer. */ + ret = consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; } ret = LTTCOMM_OK; diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index fa5f071f0..5b909ab15 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -30,28 +30,31 @@ #include "ust-consumer.h" /* - * Send all stream fds of UST channel to the consumer. + * Send a single channel to the consumer using command ADD_CHANNEL. */ -static int send_channel_streams(int sock, - struct ust_app_channel *uchan, const char *path, - uid_t uid, gid_t gid, struct consumer_output *consumer) +static int send_channel(int sock, struct ust_app_channel *uchan) { int ret, fd; - char tmp_path[PATH_MAX]; - const char *pathname; - struct lttcomm_consumer_msg lum; - struct ltt_ust_stream *stream, *tmp; + struct lttcomm_consumer_msg msg; - DBG("Sending streams of channel %s to UST consumer", uchan->name); + /* Safety net */ + assert(uchan); + + if (sock < 0) { + ret = -EINVAL; + goto error; + } - consumer_init_channel_comm_msg(&lum, + DBG2("Sending channel %s to UST consumer", uchan->name); + + consumer_init_channel_comm_msg(&msg, LTTNG_CONSUMER_ADD_CHANNEL, uchan->obj->shm_fd, uchan->attr.subbuf_size, uchan->obj->memory_map_size, uchan->name); - ret = consumer_send_channel(sock, &lum); + ret = consumer_send_channel(sock, &msg); if (ret < 0) { goto error; } @@ -62,11 +65,79 @@ static int send_channel_streams(int sock, goto error; } +error: + return ret; +} + +/* + * Send a single stream to the consumer using ADD_STREAM command. + */ +static int send_channel_stream(int sock, struct ust_app_channel *uchan, + struct ust_app_session *usess, struct ltt_ust_stream *stream, + struct consumer_output *consumer, const char *pathname) +{ + int ret, fds[2]; + struct lttcomm_consumer_msg msg; + + /* Safety net */ + assert(uchan); + assert(usess); + assert(stream); + assert(consumer); + + DBG2("Sending stream %d of channel %s to kernel consumer", + stream->obj->shm_fd, uchan->name); + + consumer_init_stream_comm_msg(&msg, + LTTNG_CONSUMER_ADD_STREAM, + uchan->obj->shm_fd, + stream->obj->shm_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_UST_CHANNEL_OUTPUT, + stream->obj->memory_map_size, + usess->uid, + usess->gid, + consumer->net_seq_index, + 0, /* Metadata flag unset */ + stream->name, + pathname); + + /* Send stream and file descriptor */ + fds[0] = stream->obj->shm_fd; + fds[1] = stream->obj->wait_fd; + ret = consumer_send_stream(sock, consumer, &msg, fds, 2); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * Send all stream fds of UST channel to the consumer. + */ +int ust_consumer_send_channel_streams(int sock, + struct ust_app_channel *uchan, struct ust_app_session *usess, + struct consumer_output *consumer) +{ + int ret; + char tmp_path[PATH_MAX]; + const char *pathname; + struct ltt_ust_stream *stream, *tmp; + + DBG("Sending streams of channel %s to UST consumer", uchan->name); + + ret = send_channel(sock, uchan); + if (ret < 0) { + goto error; + } + /* Get the right path name destination */ if (consumer->type == CONSUMER_DST_LOCAL) { /* Set application path to the destination path */ ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->dst.trace_path, path); + consumer->dst.trace_path, usess->path); if (ret < 0) { PERROR("snprintf stream path"); goto error; @@ -75,7 +146,7 @@ static int send_channel_streams(int sock, DBG3("UST local consumer tracefile path: %s", pathname); } else { ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->subdir, path); + consumer->subdir, usess->path); if (ret < 0) { PERROR("snprintf stream path"); goto error; @@ -85,30 +156,12 @@ static int send_channel_streams(int sock, } cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) { - int fds[2]; - if (!stream->obj->shm_fd) { continue; } - consumer_init_stream_comm_msg(&lum, - LTTNG_CONSUMER_ADD_STREAM, - uchan->obj->shm_fd, - stream->obj->shm_fd, - LTTNG_CONSUMER_ACTIVE_STREAM, - DEFAULT_UST_CHANNEL_OUTPUT, - stream->obj->memory_map_size, - uid, - gid, - consumer->net_seq_index, - 0, /* Metadata flag unset */ - stream->name, + ret = send_channel_stream(sock, uchan, usess, stream, consumer, pathname); - - /* Send stream and file descriptor */ - fds[0] = stream->obj->shm_fd; - fds[1] = stream->obj->wait_fd; - ret = consumer_send_stream(sock, consumer, &lum, fds, 2); if (ret < 0) { goto error; } @@ -123,99 +176,130 @@ error: } /* - * Send all stream fds of the UST session to the consumer. + * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. */ -int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, +int ust_consumer_send_metadata(int sock, struct ust_app_session *usess, struct consumer_output *consumer) { - int ret = 0; - int sock = consumer_fd; + int ret, fd, fds[2]; char tmp_path[PATH_MAX]; const char *pathname; - struct lttng_ht_iter iter; - struct lttcomm_consumer_msg lum; - struct ust_app_channel *ua_chan; + struct lttcomm_consumer_msg msg; - DBG("Sending metadata stream fd"); + /* Safety net */ + assert(usess); + assert(consumer); - if (consumer_fd < 0) { - ERR("Consumer has negative file descriptor"); + if (sock < 0) { + ERR("Consumer socket is negative (%d)", sock); return -EINVAL; } - if (usess->metadata->obj->shm_fd != 0) { - int fd; - int fds[2]; + if (usess->metadata->obj->shm_fd == 0) { + ERR("Metadata obj shm_fd is 0"); + ret = -1; + goto error; + } - consumer_init_channel_comm_msg(&lum, - LTTNG_CONSUMER_ADD_CHANNEL, - usess->metadata->obj->shm_fd, - usess->metadata->attr.subbuf_size, - usess->metadata->obj->memory_map_size, - "metadata"); + DBG("UST consumer sending metadata stream fd"); - ret = consumer_send_channel(sock, &lum); - if (ret < 0) { - goto error; - } + consumer_init_channel_comm_msg(&msg, + LTTNG_CONSUMER_ADD_CHANNEL, + usess->metadata->obj->shm_fd, + usess->metadata->attr.subbuf_size, + usess->metadata->obj->memory_map_size, + "metadata"); + + ret = consumer_send_channel(sock, &msg); + if (ret < 0) { + goto error; + } + + /* Sending metadata shared memory fd */ + fd = usess->metadata->obj->shm_fd; + ret = consumer_send_fds(sock, &fd, 1); + if (ret < 0) { + goto error; + } - fd = usess->metadata->obj->shm_fd; - ret = consumer_send_fds(sock, &fd, 1); + /* Get correct path name destination */ + if (consumer->type == CONSUMER_DST_LOCAL) { + /* Set application path to the destination path */ + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->dst.trace_path, usess->path); if (ret < 0) { + PERROR("snprintf stream path"); goto error; } + pathname = tmp_path; - /* Get correct path name destination */ - if (consumer->type == CONSUMER_DST_LOCAL) { - /* Set application path to the destination path */ - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->dst.trace_path, usess->path); - if (ret < 0) { - PERROR("snprintf stream path"); - goto error; - } - pathname = tmp_path; - - /* Create directory */ - ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, - usess->uid, usess->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - goto error; - } - } - } else { - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->subdir, usess->path); - if (ret < 0) { - PERROR("snprintf metadata path"); + /* Create directory */ + ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, + usess->uid, usess->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); goto error; } - pathname = tmp_path; } - - consumer_init_stream_comm_msg(&lum, - LTTNG_CONSUMER_ADD_STREAM, - usess->metadata->obj->shm_fd, - usess->metadata->stream_obj->shm_fd, - LTTNG_CONSUMER_ACTIVE_STREAM, - DEFAULT_UST_CHANNEL_OUTPUT, - usess->metadata->stream_obj->memory_map_size, - usess->uid, - usess->gid, - consumer->net_seq_index, - 1, /* Flag metadata set */ - "metadata", - pathname); - - /* Send stream and file descriptor */ - fds[0] = usess->metadata->stream_obj->shm_fd; - fds[1] = usess->metadata->stream_obj->wait_fd; - ret = consumer_send_stream(sock, consumer, &lum, fds, 2); + } else { + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->subdir, usess->path); if (ret < 0) { + PERROR("snprintf metadata path"); goto error; } + pathname = tmp_path; + } + + consumer_init_stream_comm_msg(&msg, + LTTNG_CONSUMER_ADD_STREAM, + usess->metadata->obj->shm_fd, + usess->metadata->stream_obj->shm_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_UST_CHANNEL_OUTPUT, + usess->metadata->stream_obj->memory_map_size, + usess->uid, + usess->gid, + consumer->net_seq_index, + 1, /* Flag metadata set */ + "metadata", + pathname); + + /* Send stream and file descriptor */ + fds[0] = usess->metadata->stream_obj->shm_fd; + fds[1] = usess->metadata->stream_obj->wait_fd; + ret = consumer_send_stream(sock, consumer, &msg, fds, 2); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * Send all stream fds of the UST session to the consumer. + */ +int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, + struct consumer_output *consumer) +{ + int ret = 0; + int sock = consumer_fd; + struct lttng_ht_iter iter; + struct ust_app_channel *ua_chan; + + DBG("Sending metadata stream fd"); + + if (consumer_fd < 0) { + ERR("Consumer has negative file descriptor"); + return -EINVAL; + } + + /* Sending metadata information to the consumer */ + ret = ust_consumer_send_metadata(consumer_fd, usess, consumer); + if (ret < 0) { + goto error; } /* Send each channel fd streams of session */ @@ -230,8 +314,7 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, continue; } - ret = send_channel_streams(sock, ua_chan, usess->path, usess->uid, - usess->gid, consumer); + ret = ust_consumer_send_channel_streams(sock, ua_chan, usess, consumer); if (ret < 0) { rcu_read_unlock(); goto error; @@ -246,42 +329,3 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, error: return ret; } - -/* - * Send relayd socket to consumer associated with a session name. - * - * On success return positive value. On error, negative value. - */ -int ust_consumer_send_relayd_socket(int consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type) -{ - int ret; - struct lttcomm_consumer_msg msg; - - /* Code flow error. Safety net. */ - assert(sock); - - msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; - msg.u.relayd_sock.net_index = consumer->net_seq_index; - msg.u.relayd_sock.type = type; - memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); - - DBG2("Sending relayd sock info to consumer"); - ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); - if (ret < 0) { - PERROR("send consumer relayd socket info"); - goto error; - } - - DBG2("Sending relayd socket file descriptor to consumer"); - ret = consumer_send_fds(consumer_sock, &sock->fd, 1); - if (ret < 0) { - goto error; - } - - DBG("UST consumer relayd socket sent"); - -error: - return ret; -} diff --git a/src/bin/lttng-sessiond/ust-consumer.h b/src/bin/lttng-sessiond/ust-consumer.h index dcc51dcee..ac57e64ed 100644 --- a/src/bin/lttng-sessiond/ust-consumer.h +++ b/src/bin/lttng-sessiond/ust-consumer.h @@ -18,15 +18,17 @@ #ifndef _UST_CONSUMER_H #define _UST_CONSUMER_H -#include - #include "consumer.h" #include "ust-app.h" int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, struct consumer_output *consumer); -int ust_consumer_send_relayd_socket(int consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type); + +int ust_consumer_send_metadata(int sock, struct ust_app_session *usess, + struct consumer_output *consumer); + +int ust_consumer_send_channel_streams(int sock, + struct ust_app_channel *uchan, struct ust_app_session *usess, + struct consumer_output *consumer); #endif /* _UST_CONSUMER_H */