X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fust-consumer.c;h=5b909ab1594609257f1026f7d246c39f43bf8d9c;hp=fa5f071f07aa6896105e51a092827d298b75daf5;hb=37278a1e7efe00011260569fa90909601e8c5184;hpb=7717e3612c10d85507abc47fbde262d39edfb8b4 diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index fa5f071f0..5b909ab15 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -30,28 +30,31 @@ #include "ust-consumer.h" /* - * Send all stream fds of UST channel to the consumer. + * Send a single channel to the consumer using command ADD_CHANNEL. */ -static int send_channel_streams(int sock, - struct ust_app_channel *uchan, const char *path, - uid_t uid, gid_t gid, struct consumer_output *consumer) +static int send_channel(int sock, struct ust_app_channel *uchan) { int ret, fd; - char tmp_path[PATH_MAX]; - const char *pathname; - struct lttcomm_consumer_msg lum; - struct ltt_ust_stream *stream, *tmp; + struct lttcomm_consumer_msg msg; - DBG("Sending streams of channel %s to UST consumer", uchan->name); + /* Safety net */ + assert(uchan); + + if (sock < 0) { + ret = -EINVAL; + goto error; + } - consumer_init_channel_comm_msg(&lum, + DBG2("Sending channel %s to UST consumer", uchan->name); + + consumer_init_channel_comm_msg(&msg, LTTNG_CONSUMER_ADD_CHANNEL, uchan->obj->shm_fd, uchan->attr.subbuf_size, uchan->obj->memory_map_size, uchan->name); - ret = consumer_send_channel(sock, &lum); + ret = consumer_send_channel(sock, &msg); if (ret < 0) { goto error; } @@ -62,11 +65,79 @@ static int send_channel_streams(int sock, goto error; } +error: + return ret; +} + +/* + * Send a single stream to the consumer using ADD_STREAM command. + */ +static int send_channel_stream(int sock, struct ust_app_channel *uchan, + struct ust_app_session *usess, struct ltt_ust_stream *stream, + struct consumer_output *consumer, const char *pathname) +{ + int ret, fds[2]; + struct lttcomm_consumer_msg msg; + + /* Safety net */ + assert(uchan); + assert(usess); + assert(stream); + assert(consumer); + + DBG2("Sending stream %d of channel %s to kernel consumer", + stream->obj->shm_fd, uchan->name); + + consumer_init_stream_comm_msg(&msg, + LTTNG_CONSUMER_ADD_STREAM, + uchan->obj->shm_fd, + stream->obj->shm_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_UST_CHANNEL_OUTPUT, + stream->obj->memory_map_size, + usess->uid, + usess->gid, + consumer->net_seq_index, + 0, /* Metadata flag unset */ + stream->name, + pathname); + + /* Send stream and file descriptor */ + fds[0] = stream->obj->shm_fd; + fds[1] = stream->obj->wait_fd; + ret = consumer_send_stream(sock, consumer, &msg, fds, 2); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * Send all stream fds of UST channel to the consumer. + */ +int ust_consumer_send_channel_streams(int sock, + struct ust_app_channel *uchan, struct ust_app_session *usess, + struct consumer_output *consumer) +{ + int ret; + char tmp_path[PATH_MAX]; + const char *pathname; + struct ltt_ust_stream *stream, *tmp; + + DBG("Sending streams of channel %s to UST consumer", uchan->name); + + ret = send_channel(sock, uchan); + if (ret < 0) { + goto error; + } + /* Get the right path name destination */ if (consumer->type == CONSUMER_DST_LOCAL) { /* Set application path to the destination path */ ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->dst.trace_path, path); + consumer->dst.trace_path, usess->path); if (ret < 0) { PERROR("snprintf stream path"); goto error; @@ -75,7 +146,7 @@ static int send_channel_streams(int sock, DBG3("UST local consumer tracefile path: %s", pathname); } else { ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->subdir, path); + consumer->subdir, usess->path); if (ret < 0) { PERROR("snprintf stream path"); goto error; @@ -85,30 +156,12 @@ static int send_channel_streams(int sock, } cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) { - int fds[2]; - if (!stream->obj->shm_fd) { continue; } - consumer_init_stream_comm_msg(&lum, - LTTNG_CONSUMER_ADD_STREAM, - uchan->obj->shm_fd, - stream->obj->shm_fd, - LTTNG_CONSUMER_ACTIVE_STREAM, - DEFAULT_UST_CHANNEL_OUTPUT, - stream->obj->memory_map_size, - uid, - gid, - consumer->net_seq_index, - 0, /* Metadata flag unset */ - stream->name, + ret = send_channel_stream(sock, uchan, usess, stream, consumer, pathname); - - /* Send stream and file descriptor */ - fds[0] = stream->obj->shm_fd; - fds[1] = stream->obj->wait_fd; - ret = consumer_send_stream(sock, consumer, &lum, fds, 2); if (ret < 0) { goto error; } @@ -123,99 +176,130 @@ error: } /* - * Send all stream fds of the UST session to the consumer. + * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. */ -int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, +int ust_consumer_send_metadata(int sock, struct ust_app_session *usess, struct consumer_output *consumer) { - int ret = 0; - int sock = consumer_fd; + int ret, fd, fds[2]; char tmp_path[PATH_MAX]; const char *pathname; - struct lttng_ht_iter iter; - struct lttcomm_consumer_msg lum; - struct ust_app_channel *ua_chan; + struct lttcomm_consumer_msg msg; - DBG("Sending metadata stream fd"); + /* Safety net */ + assert(usess); + assert(consumer); - if (consumer_fd < 0) { - ERR("Consumer has negative file descriptor"); + if (sock < 0) { + ERR("Consumer socket is negative (%d)", sock); return -EINVAL; } - if (usess->metadata->obj->shm_fd != 0) { - int fd; - int fds[2]; + if (usess->metadata->obj->shm_fd == 0) { + ERR("Metadata obj shm_fd is 0"); + ret = -1; + goto error; + } - consumer_init_channel_comm_msg(&lum, - LTTNG_CONSUMER_ADD_CHANNEL, - usess->metadata->obj->shm_fd, - usess->metadata->attr.subbuf_size, - usess->metadata->obj->memory_map_size, - "metadata"); + DBG("UST consumer sending metadata stream fd"); - ret = consumer_send_channel(sock, &lum); - if (ret < 0) { - goto error; - } + consumer_init_channel_comm_msg(&msg, + LTTNG_CONSUMER_ADD_CHANNEL, + usess->metadata->obj->shm_fd, + usess->metadata->attr.subbuf_size, + usess->metadata->obj->memory_map_size, + "metadata"); + + ret = consumer_send_channel(sock, &msg); + if (ret < 0) { + goto error; + } + + /* Sending metadata shared memory fd */ + fd = usess->metadata->obj->shm_fd; + ret = consumer_send_fds(sock, &fd, 1); + if (ret < 0) { + goto error; + } - fd = usess->metadata->obj->shm_fd; - ret = consumer_send_fds(sock, &fd, 1); + /* Get correct path name destination */ + if (consumer->type == CONSUMER_DST_LOCAL) { + /* Set application path to the destination path */ + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->dst.trace_path, usess->path); if (ret < 0) { + PERROR("snprintf stream path"); goto error; } + pathname = tmp_path; - /* Get correct path name destination */ - if (consumer->type == CONSUMER_DST_LOCAL) { - /* Set application path to the destination path */ - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->dst.trace_path, usess->path); - if (ret < 0) { - PERROR("snprintf stream path"); - goto error; - } - pathname = tmp_path; - - /* Create directory */ - ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, - usess->uid, usess->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - goto error; - } - } - } else { - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - consumer->subdir, usess->path); - if (ret < 0) { - PERROR("snprintf metadata path"); + /* Create directory */ + ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, + usess->uid, usess->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); goto error; } - pathname = tmp_path; } - - consumer_init_stream_comm_msg(&lum, - LTTNG_CONSUMER_ADD_STREAM, - usess->metadata->obj->shm_fd, - usess->metadata->stream_obj->shm_fd, - LTTNG_CONSUMER_ACTIVE_STREAM, - DEFAULT_UST_CHANNEL_OUTPUT, - usess->metadata->stream_obj->memory_map_size, - usess->uid, - usess->gid, - consumer->net_seq_index, - 1, /* Flag metadata set */ - "metadata", - pathname); - - /* Send stream and file descriptor */ - fds[0] = usess->metadata->stream_obj->shm_fd; - fds[1] = usess->metadata->stream_obj->wait_fd; - ret = consumer_send_stream(sock, consumer, &lum, fds, 2); + } else { + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->subdir, usess->path); if (ret < 0) { + PERROR("snprintf metadata path"); goto error; } + pathname = tmp_path; + } + + consumer_init_stream_comm_msg(&msg, + LTTNG_CONSUMER_ADD_STREAM, + usess->metadata->obj->shm_fd, + usess->metadata->stream_obj->shm_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_UST_CHANNEL_OUTPUT, + usess->metadata->stream_obj->memory_map_size, + usess->uid, + usess->gid, + consumer->net_seq_index, + 1, /* Flag metadata set */ + "metadata", + pathname); + + /* Send stream and file descriptor */ + fds[0] = usess->metadata->stream_obj->shm_fd; + fds[1] = usess->metadata->stream_obj->wait_fd; + ret = consumer_send_stream(sock, consumer, &msg, fds, 2); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * Send all stream fds of the UST session to the consumer. + */ +int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, + struct consumer_output *consumer) +{ + int ret = 0; + int sock = consumer_fd; + struct lttng_ht_iter iter; + struct ust_app_channel *ua_chan; + + DBG("Sending metadata stream fd"); + + if (consumer_fd < 0) { + ERR("Consumer has negative file descriptor"); + return -EINVAL; + } + + /* Sending metadata information to the consumer */ + ret = ust_consumer_send_metadata(consumer_fd, usess, consumer); + if (ret < 0) { + goto error; } /* Send each channel fd streams of session */ @@ -230,8 +314,7 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, continue; } - ret = send_channel_streams(sock, ua_chan, usess->path, usess->uid, - usess->gid, consumer); + ret = ust_consumer_send_channel_streams(sock, ua_chan, usess, consumer); if (ret < 0) { rcu_read_unlock(); goto error; @@ -246,42 +329,3 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, error: return ret; } - -/* - * Send relayd socket to consumer associated with a session name. - * - * On success return positive value. On error, negative value. - */ -int ust_consumer_send_relayd_socket(int consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, - enum lttng_stream_type type) -{ - int ret; - struct lttcomm_consumer_msg msg; - - /* Code flow error. Safety net. */ - assert(sock); - - msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; - msg.u.relayd_sock.net_index = consumer->net_seq_index; - msg.u.relayd_sock.type = type; - memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); - - DBG2("Sending relayd sock info to consumer"); - ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); - if (ret < 0) { - PERROR("send consumer relayd socket info"); - goto error; - } - - DBG2("Sending relayd socket file descriptor to consumer"); - ret = consumer_send_fds(consumer_sock, &sock->fd, 1); - if (ret < 0) { - goto error; - } - - DBG("UST consumer relayd socket sent"); - -error: - return ret; -}