X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=bb038a670b0a763afd41b1343cbbb1e919fbe1f7;hp=082eb7975efd0adb1c41bfbfee65c621dd34abeb;hb=309167d2a6f59d0c8cbf64eb23ba912cdea76a34;hpb=d620da680b1e5fc67bdacaf6150ca2201cad37b7 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 082eb7975..bb038a670 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -47,16 +47,17 @@ #include #include #include -#include #include #include #include +#include "cmd.h" +#include "utils.h" #include "lttng-relayd.h" /* command line options */ +char *opt_output_path; static int opt_daemon; -static char *opt_output_path; static struct lttng_uri *control_uri; static struct lttng_uri *data_uri; @@ -104,12 +105,12 @@ static void usage(void) { fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); - fprintf(stderr, " -h, --help Display this usage.\n"); - fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); - fprintf(stderr, " -C, --control-port Control port listening (URI)\n"); - fprintf(stderr, " -D, --data-port Data port listening (URI)\n"); - fprintf(stderr, " -o, --output Output path for traces (PATH)\n"); - fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -h, --help Display this usage.\n"); + fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); + fprintf(stderr, " -C, --control-port URL Control port listening.\n"); + fprintf(stderr, " -D, --data-port URL Data port listening.\n"); + fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); + fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); } static @@ -685,82 +686,6 @@ error: return NULL; } -/* - * config_get_default_path - * - * Returns the HOME directory path. Caller MUST NOT free(3) the return pointer. - */ -static -char *config_get_default_path(void) -{ - return getenv("HOME"); -} - -static -char *create_output_path_auto(char *path_name) -{ - int ret; - char *traces_path = NULL; - char *alloc_path = NULL; - char *default_path; - - default_path = config_get_default_path(); - if (default_path == NULL) { - ERR("Home path not found.\n \ - Please specify an output path using -o, --output PATH"); - goto exit; - } - alloc_path = strdup(default_path); - if (alloc_path == NULL) { - PERROR("Path allocation"); - goto exit; - } - ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME - "/%s", alloc_path, path_name); - if (ret < 0) { - PERROR("asprintf trace dir name"); - goto exit; - } -exit: - free(alloc_path); - return traces_path; -} - -static -char *create_output_path_noauto(char *path_name) -{ - int ret; - char *traces_path = NULL; - char *full_path; - - full_path = utils_expand_path(opt_output_path); - if (!full_path) { - goto exit; - } - - ret = asprintf(&traces_path, "%s/%s", full_path, path_name); - if (ret < 0) { - PERROR("asprintf trace dir name"); - goto exit; - } -exit: - free(full_path); - return traces_path; -} - -/* - * create_output_path: create the output trace directory - */ -static -char *create_output_path(char *path_name) -{ - if (opt_output_path == NULL) { - return create_output_path_auto(path_name); - } else { - return create_output_path_noauto(path_name); - } -} - /* * Get stream from stream id. * Need to be called with RCU read-side lock held. @@ -794,6 +719,8 @@ void deferred_free_stream(struct rcu_head *head) { struct relay_stream *stream = caa_container_of(head, struct relay_stream, rcu_node); + free(stream->path_name); + free(stream->channel_name); free(stream); } @@ -895,10 +822,8 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd, struct lttng_ht *streams_ht) { struct relay_session *session = cmd->session; - struct lttcomm_relayd_add_stream stream_info; struct relay_stream *stream = NULL; struct lttcomm_relayd_status_stream reply; - char *path = NULL, *root_path = NULL; int ret, send_ret; if (!session || cmd->version_check_done == 0) { @@ -907,18 +832,6 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, - sizeof(struct lttcomm_relayd_add_stream), 0); - if (ret < sizeof(struct lttcomm_relayd_add_stream)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", cmd->sock->fd); - } else { - ERR("Relay didn't receive valid add_stream struct size : %d", ret); - } - ret = -1; - goto end_no_session; - } stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { PERROR("relay stream zmalloc"); @@ -926,49 +839,55 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + switch (cmd->minor) { + case 1: /* LTTng sessiond 2.1 */ + ret = cmd_recv_stream_2_1(cmd, stream); + break; + case 2: /* LTTng sessiond 2.2 */ + default: + ret = cmd_recv_stream_2_2(cmd, stream); + break; + } + if (ret < 0) { + goto err_free_stream; + } + rcu_read_lock(); stream->stream_handle = ++last_relay_stream_id; stream->prev_seq = -1ULL; stream->session = session; - root_path = create_output_path(stream_info.pathname); - if (!root_path) { - ret = -1; - goto end; - } - ret = utils_mkdir_recursive(root_path, S_IRWXU | S_IRWXG); + ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); if (ret < 0) { ERR("relay creating output directory"); goto end; } - ret = asprintf(&path, "%s/%s", root_path, stream_info.channel_name); - if (ret < 0) { - PERROR("asprintf stream path"); - path = NULL; - goto end; - } - - ret = open(path, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + /* + * No need to use run_as API here because whatever we receives, the relayd + * uses its own credentials for the stream files. + */ + ret = utils_create_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, 0, -1, -1, NULL); if (ret < 0) { - PERROR("Relay creating trace file"); + ERR("Create output file"); goto end; } - stream->fd = ret; - DBG("Tracefile %s created", path); + if (stream->tracefile_size) { + DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); + } else { + DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); + } lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); lttng_ht_add_unique_ulong(streams_ht, &stream->stream_n); - DBG("Relay new stream added %s", stream_info.channel_name); + DBG("Relay new stream added %s", stream->channel_name); end: - free(path); - free(root_path); - reply.handle = htobe64(stream->stream_handle); /* send the session id to the client or a negative return code on error */ if (ret < 0) { @@ -989,6 +908,12 @@ end: end_no_session: return ret; + +err_free_stream: + free(stream->path_name); + free(stream->channel_name); + free(stream); + return ret; } /* @@ -1240,7 +1165,7 @@ end: */ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd) + struct relay_command *cmd, struct lttng_ht *streams_ht) { int ret; struct lttcomm_relayd_version reply, msg; @@ -1262,19 +1187,26 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - /* - * For now, we just ignore the received version but after 2.1 stable - * release, a check must be done to see if we either adapt to the other - * side version (which MUST be lower than us) or keep the latest data - * structure considering that the other side will adapt. - */ + reply.major = RELAYD_VERSION_COMM_MAJOR; + reply.minor = RELAYD_VERSION_COMM_MINOR; - ret = sscanf(VERSION, "%10u.%10u", &reply.major, &reply.minor); - if (ret < 2) { - ERR("Error in scanning version"); - ret = -1; + /* Major versions must be the same */ + if (reply.major != be32toh(msg.major)) { + DBG("Incompatible major versions (%u vs %u), deleting session", + reply.major, be32toh(msg.major)); + relay_delete_session(cmd, streams_ht); + ret = 0; goto end; } + + cmd->major = reply.major; + /* We adapt to the lowest compatible version */ + if (reply.minor <= be32toh(msg.minor)) { + cmd->minor = reply.minor; + } else { + cmd->minor = be32toh(msg.minor); + } + reply.major = htobe32(reply.major); reply.minor = htobe32(reply.minor); ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, @@ -1282,8 +1214,9 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (ret < 0) { ERR("Relay sending version"); } - DBG("Version check done (%u.%u)", be32toh(reply.major), - be32toh(reply.minor)); + + DBG("Version check done using protocol %u.%u", cmd->major, + cmd->minor); end: return ret; @@ -1593,7 +1526,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_recv_metadata(recv_hdr, cmd, streams_ht); break; case RELAYD_VERSION: - ret = relay_send_version(recv_hdr, cmd); + ret = relay_send_version(recv_hdr, cmd, streams_ht); break; case RELAYD_CLOSE_STREAM: ret = relay_close_stream(recv_hdr, cmd, streams_ht); @@ -1687,6 +1620,23 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) goto end_unlock; } + if (stream->tracefile_size > 0 && + (stream->tracefile_size_current + data_size) > + stream->tracefile_size) { + ret = utils_rotate_stream_file(stream->path_name, + stream->channel_name, stream->tracefile_size, + stream->tracefile_count, -1, -1, + stream->fd, &(stream->tracefile_count_current), + &stream->fd); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + stream->fd = ret; + /* Reset current size because we just perform a stream rotation. */ + stream->tracefile_size_current = 0; + } + stream->tracefile_size_current += data_size; do { ret = write(stream->fd, data_buffer, data_size); } while (ret < 0 && errno == EINTR); @@ -2118,6 +2068,11 @@ int main(int argc, char **argv) /* Try to create directory if -o, --output is specified. */ if (opt_output_path) { + if (*opt_output_path != '/') { + ERR("Please specify an absolute path for -o, --output PATH"); + goto exit; + } + ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG); if (ret < 0) { ERR("Unable to create %s", opt_output_path); @@ -2156,6 +2111,9 @@ int main(int argc, char **argv) /* Set up max poll set size */ lttng_poll_set_max_size(); + /* Initialize communication library */ + lttcomm_init(); + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL);