X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=d376c9ee3cc31ff7817d9a99146f88fd8e806c45;hp=c90436c574270e2716372853b4a4c86dbbc80578;hb=1f4962443f25c371e4b54e97f9eb867d67cbf88e;hpb=913a542b6eabb29468ab09a1503ed454427679f2 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index c90436c57..d376c9ee3 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -38,6 +38,61 @@ #include "utils.h" #include "lttng-sessiond.h" +/* + * Return allocated full pathname of the session using the consumer trace path + * and subdir if available. + * + * The caller can safely free(3) the returned value. On error, NULL is + * returned. + */ +char *setup_channel_trace_path(struct consumer_output *consumer, + const char *session_path) +{ + int ret; + char *pathname; + + assert(consumer); + assert(session_path); + + health_code_update(); + + /* + * Allocate the string ourself to make sure we never exceed + * LTTNG_PATH_MAX. + */ + pathname = zmalloc(LTTNG_PATH_MAX); + if (!pathname) { + goto error; + } + + /* Get correct path name destination */ + if (consumer->type == CONSUMER_DST_NET && + consumer->relay_major_version == 2 && + consumer->relay_minor_version < 11) { + ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s/%s%s", + consumer->dst.net.base_dir, + consumer->chunk_path, consumer->domain_subdir, + session_path); + } else { + ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s", + consumer->domain_subdir, session_path); + } + DBG3("Consumer trace path relative to current trace chunk: \"%s\"", + pathname); + if (ret < 0) { + PERROR("Failed to format channel path"); + goto error; + } else if (ret >= LTTNG_PATH_MAX) { + ERR("Truncation occurred while formatting channel path"); + goto error; + } + + return pathname; +error: + free(pathname); + return NULL; +} + /* * Send a data payload using a given consumer socket of size len. * @@ -1075,8 +1130,9 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id, const char *session_name, const char *hostname, - int session_live_timer, const uint64_t *current_chunk_id, - time_t session_creation_time) + const char *base_path, int session_live_timer, + const uint64_t *current_chunk_id, time_t session_creation_time, + bool session_name_contains_creation_time) { int ret; struct lttcomm_consumer_msg msg; @@ -1094,17 +1150,26 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, } if (type == LTTNG_STREAM_CONTROL) { + char output_path[LTTNG_PATH_MAX] = {}; + uint64_t relayd_session_id; + ret = relayd_create_session(rsock, - &msg.u.relayd_sock.relayd_session_id, - session_name, hostname, session_live_timer, + &relayd_session_id, + session_name, hostname, base_path, + session_live_timer, consumer->snapshot, session_id, sessiond_uuid, current_chunk_id, - session_creation_time); + session_creation_time, + session_name_contains_creation_time, + output_path); if (ret < 0) { /* Close the control socket. */ (void) relayd_close(rsock); goto error; } + msg.u.relayd_sock.relayd_session_id = relayd_session_id; + DBG("Created session on relay, output path reply: %s", + output_path); } msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; @@ -1125,7 +1190,7 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, } DBG3("Sending relayd socket file descriptor to consumer"); - ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1); + ret = consumer_send_fds(consumer_sock, ALIGNED_CONST_PTR(rsock->sock.fd), 1); if (ret < 0) { goto error; } @@ -1837,7 +1902,8 @@ error: */ int consumer_close_trace_chunk(struct consumer_socket *socket, uint64_t relayd_id, uint64_t session_id, - struct lttng_trace_chunk *chunk) + struct lttng_trace_chunk *chunk, + char *closed_trace_chunk_path) { int ret; enum lttng_trace_chunk_status chunk_status; @@ -1845,12 +1911,15 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, .u.close_trace_chunk.session_id = session_id, }; + struct lttcomm_consumer_close_trace_chunk_reply reply; uint64_t chunk_id; time_t close_timestamp; enum lttng_trace_chunk_command_type close_command; const char *close_command_name = "none"; + struct lttng_dynamic_buffer path_reception_buffer; assert(socket); + lttng_dynamic_buffer_init(&path_reception_buffer); if (relayd_id != -1ULL) { LTTNG_OPTIONAL_SET( @@ -1901,13 +1970,51 @@ int consumer_close_trace_chunk(struct consumer_socket *socket, relayd_id, session_id, chunk_id, close_command_name); health_code_update(); - ret = consumer_send_msg(socket, &msg); + ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } - + ret = consumer_socket_recv(socket, &reply, sizeof(reply)); + if (ret < 0) { + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + if (reply.path_length >= LTTNG_PATH_MAX) { + ERR("Invalid path returned by relay daemon: %" PRIu32 "bytes exceeds maximal allowed length of %d bytes", + reply.path_length, LTTNG_PATH_MAX); + ret = -LTTNG_ERR_INVALID_PROTOCOL; + goto error; + } + ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, + reply.path_length); + if (ret) { + ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command"); + ret = -LTTNG_ERR_NOMEM; + goto error; + } + ret = consumer_socket_recv(socket, path_reception_buffer.data, + path_reception_buffer.size); + if (ret < 0) { + ERR("Communication error while receiving path of closed trace chunk"); + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') { + ERR("Invalid path returned by relay daemon: not null-terminated"); + ret = -LTTNG_ERR_INVALID_PROTOCOL; + goto error; + } + if (closed_trace_chunk_path) { + /* + * closed_trace_chunk_path is assumed to have a length >= + * LTTNG_PATH_MAX + */ + memcpy(closed_trace_chunk_path, path_reception_buffer.data, + path_reception_buffer.size); + } error: + lttng_dynamic_buffer_reset(&path_reception_buffer); health_code_update(); return ret; }