X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=12db236d08335c34221c7305527be81c049f97d1;hp=5f551e31dd5a05d6283e0aca2cbe01ca660096b4;hb=b178f53e90c376dd44b020535c32649edef8f80e;hpb=61ba6b6dc790245afa7fbec89a9811f6c7603b00 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 5f551e31d..12db236d0 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1,5 +1,6 @@ /* * Copyright (C) 2012 - David Goulet + * 2018 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -548,25 +549,26 @@ void consumer_output_put(struct consumer_output *obj) * * Should *NOT* be called with RCU read-side lock held. */ -struct consumer_output *consumer_copy_output(struct consumer_output *obj) +struct consumer_output *consumer_copy_output(struct consumer_output *src) { int ret; struct consumer_output *output; - assert(obj); + assert(src); - output = consumer_create_output(obj->type); + output = consumer_create_output(src->type); if (output == NULL) { goto end; } - output->enabled = obj->enabled; - output->net_seq_index = obj->net_seq_index; - memcpy(output->subdir, obj->subdir, sizeof(output->subdir)); - output->snapshot = obj->snapshot; - output->relay_major_version = obj->relay_major_version; - output->relay_minor_version = obj->relay_minor_version; - memcpy(&output->dst, &obj->dst, sizeof(output->dst)); - ret = consumer_copy_sockets(output, obj); + output->enabled = src->enabled; + output->net_seq_index = src->net_seq_index; + memcpy(output->domain_subdir, src->domain_subdir, + sizeof(output->domain_subdir)); + output->snapshot = src->snapshot; + output->relay_major_version = src->relay_major_version; + output->relay_minor_version = src->relay_minor_version; + memcpy(&output->dst, &src->dst, sizeof(output->dst)); + ret = consumer_copy_sockets(output, src); if (ret < 0) { goto error_put; } @@ -625,33 +627,32 @@ error: } /* - * Set network URI to the consumer output object. + * Set network URI to the consumer output. * * Return 0 on success. Return 1 if the URI were equal. Else, negative value on * error. */ -int consumer_set_network_uri(struct consumer_output *obj, +int consumer_set_network_uri(const struct ltt_session *session, + struct consumer_output *output, struct lttng_uri *uri) { int ret; - char tmp_path[PATH_MAX]; - char hostname[HOST_NAME_MAX]; struct lttng_uri *dst_uri = NULL; /* Code flow error safety net. */ - assert(obj); + assert(output); assert(uri); switch (uri->stype) { case LTTNG_STREAM_CONTROL: - dst_uri = &obj->dst.net.control; - obj->dst.net.control_isset = 1; + dst_uri = &output->dst.net.control; + output->dst.net.control_isset = 1; if (uri->port == 0) { /* Assign default port. */ uri->port = DEFAULT_NETWORK_CONTROL_PORT; } else { - if (obj->dst.net.data_isset && uri->port == - obj->dst.net.data.port) { + if (output->dst.net.data_isset && uri->port == + output->dst.net.data.port) { ret = -LTTNG_ERR_INVALID; goto error; } @@ -659,14 +660,14 @@ int consumer_set_network_uri(struct consumer_output *obj, DBG3("Consumer control URI set with port %d", uri->port); break; case LTTNG_STREAM_DATA: - dst_uri = &obj->dst.net.data; - obj->dst.net.data_isset = 1; + dst_uri = &output->dst.net.data; + output->dst.net.data_isset = 1; if (uri->port == 0) { /* Assign default port. */ uri->port = DEFAULT_NETWORK_DATA_PORT; } else { - if (obj->dst.net.control_isset && uri->port == - obj->dst.net.control.port) { + if (output->dst.net.control_isset && uri->port == + output->dst.net.control.port) { ret = -LTTNG_ERR_INVALID; goto error; } @@ -687,42 +688,89 @@ int consumer_set_network_uri(struct consumer_output *obj, } /* URIs were not equal, replacing it. */ - memset(dst_uri, 0, sizeof(struct lttng_uri)); memcpy(dst_uri, uri, sizeof(struct lttng_uri)); - obj->type = CONSUMER_DST_NET; - - /* Handle subdir and add hostname in front. */ - if (dst_uri->stype == LTTNG_STREAM_CONTROL) { - /* Get hostname to append it in the pathname */ - ret = gethostname(hostname, sizeof(hostname)); - if (ret < 0) { - PERROR("gethostname. Fallback on default localhost"); - strncpy(hostname, "localhost", sizeof(hostname)); - } - hostname[sizeof(hostname) - 1] = '\0'; + output->type = CONSUMER_DST_NET; + if (dst_uri->stype != LTTNG_STREAM_CONTROL) { + /* Only the control uri needs to contain the path. */ + goto end; + } - /* Setup consumer subdir if none present in the control URI */ - if (strlen(dst_uri->subdir) == 0) { - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - hostname, obj->subdir); - } else { - ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", - hostname, dst_uri->subdir); - } - if (ret < 0) { - PERROR("snprintf set consumer uri subdir"); - ret = -LTTNG_ERR_NOMEM; + /* + * If the user has specified a subdir as part of the control + * URL, the session's base output directory is: + * /RELAYD_OUTPUT_PATH/HOSTNAME/USER_SPECIFIED_DIR + * + * Hence, the "base_dir" from which all stream files and + * session rotation chunks are created takes the form + * /HOSTNAME/USER_SPECIFIED_DIR + * + * If the user has not specified an output directory as part of + * the control URL, the base output directory has the form: + * /RELAYD_OUTPUT_PATH/HOSTNAME/SESSION_NAME-CREATION_TIME + * + * Hence, the "base_dir" from which all stream files and + * session rotation chunks are created takes the form + * /HOSTNAME/SESSION_NAME-CREATION_TIME + * + * Note that automatically generated session names already + * contain the session's creation time. In that case, the + * creation time is omitted to prevent it from being duplicated + * in the final directory hierarchy. + */ + if (*uri->subdir) { + if (strstr(uri->subdir, "../")) { + ERR("Network URI subdirs are not allowed to walk up the path hierarchy"); + ret = -LTTNG_ERR_INVALID; goto error; } + ret = snprintf(output->dst.net.base_dir, + sizeof(output->dst.net.base_dir), + "/%s/%s/", session->hostname, uri->subdir); + } else { + if (session->has_auto_generated_name) { + ret = snprintf(output->dst.net.base_dir, + sizeof(output->dst.net.base_dir), + "/%s/%s/", session->hostname, + session->name); + } else { + char session_creation_datetime[16]; + size_t strftime_ret; + struct tm *timeinfo; - if (lttng_strncpy(obj->dst.net.base_dir, tmp_path, - sizeof(obj->dst.net.base_dir))) { - ret = -LTTNG_ERR_INVALID; - goto error; + timeinfo = localtime(&session->creation_time); + if (!timeinfo) { + ret = -LTTNG_ERR_FATAL; + goto error; + } + strftime_ret = strftime(session_creation_datetime, + sizeof(session_creation_datetime), + "%Y%m%d-%H%M%S", timeinfo); + if (strftime_ret == 0) { + ERR("Failed to format session creation timestamp while setting network URI"); + ret = -LTTNG_ERR_FATAL; + goto error; + } + ret = snprintf(output->dst.net.base_dir, + sizeof(output->dst.net.base_dir), + "/%s/%s-%s/", session->hostname, + session->name, + session_creation_datetime); } - DBG3("Consumer set network uri base_dir path %s", tmp_path); } + if (ret >= sizeof(output->dst.net.base_dir)) { + ret = -LTTNG_ERR_INVALID; + ERR("Truncation occurred while setting network output base directory"); + goto error; + } else if (ret == -1) { + ret = -LTTNG_ERR_INVALID; + PERROR("Error occurred while setting network output base directory"); + goto error; + } + + DBG3("Consumer set network uri base_dir path %s", + output->dst.net.base_dir); +end: return 0; equal: return 1; @@ -735,7 +783,8 @@ error: * * The consumer socket lock must be held by the caller. */ -int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) +int consumer_send_fds(struct consumer_socket *sock, const int *fds, + size_t nb_fd) { int ret; @@ -833,7 +882,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint32_t ust_app_uid, int64_t blocking_timeout, const char *root_shm_path, - const char *shm_path) + const char *shm_path, + uint64_t trace_archive_id) { assert(msg); @@ -862,6 +912,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; msg->u.ask_channel.blocking_timeout = blocking_timeout; + msg->u.ask_channel.trace_archive_id = trace_archive_id; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -889,8 +940,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, /* * Init channel communication message structure. */ -void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, +void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t session_id, const char *pathname, @@ -913,7 +963,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); /* Send channel */ - msg->cmd_type = cmd; + msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; msg->u.channel.channel_key = channel_key; msg->u.channel.session_id = session_id; msg->u.channel.uid = uid; @@ -939,20 +989,21 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, /* * Init stream communication message structure. */ -void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, +void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int cpu) + int32_t cpu, + uint64_t trace_archive_id) { assert(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); - msg->cmd_type = cmd; + msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM; msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; msg->u.stream.cpu = cpu; + msg->u.stream.trace_archive_id = trace_archive_id; } void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, @@ -973,7 +1024,7 @@ void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, */ int consumer_send_stream(struct consumer_socket *sock, struct consumer_output *dst, struct lttcomm_consumer_msg *msg, - int *fds, size_t nb_fd) + const int *fds, size_t nb_fd) { int ret; @@ -1064,87 +1115,57 @@ error: return ret; } -int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, - int pipe) +static +int consumer_send_pipe(struct consumer_socket *consumer_sock, + enum lttng_consumer_command cmd, int pipe) { int ret; struct lttcomm_consumer_msg msg; + const char *pipe_name; + const char *command_name; + + switch (cmd) { + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + pipe_name = "channel monitor"; + command_name = "SET_CHANNEL_MONITOR_PIPE"; + break; + default: + ERR("Unexpected command received in %s (cmd = %d)", __func__, + (int) cmd); + abort(); + } /* Code flow error. Safety net. */ memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE; + msg.cmd_type = cmd; pthread_mutex_lock(consumer_sock->lock); - DBG3("Sending set_channel_monitor_pipe command to consumer"); + DBG3("Sending %s command to consumer", command_name); ret = consumer_send_msg(consumer_sock, &msg); if (ret < 0) { goto error; } - DBG3("Sending channel monitoring pipe %d to consumer on socket %d", + DBG3("Sending %s pipe %d to consumer on socket %d", + pipe_name, pipe, *consumer_sock->fd_ptr); ret = consumer_send_fds(consumer_sock, &pipe, 1); if (ret < 0) { goto error; } - DBG2("Channel monitoring pipe successfully sent"); + DBG2("%s pipe successfully sent", pipe_name); error: pthread_mutex_unlock(consumer_sock->lock); return ret; } -/* - * Set consumer subdirectory using the session name and a generated datetime if - * needed. This is appended to the current subdirectory. - */ -int consumer_set_subdir(struct consumer_output *consumer, - const char *session_name) +int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock, + int pipe) { - int ret = 0; - unsigned int have_default_name = 0; - char datetime[16], tmp_path[PATH_MAX]; - time_t rawtime; - struct tm *timeinfo; - - assert(consumer); - assert(session_name); - - memset(tmp_path, 0, sizeof(tmp_path)); - - /* Flag if we have a default session. */ - if (strncmp(session_name, DEFAULT_SESSION_NAME "-", - strlen(DEFAULT_SESSION_NAME) + 1) == 0) { - have_default_name = 1; - } else { - /* Get date and time for session path */ - time(&rawtime); - timeinfo = localtime(&rawtime); - strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo); - } - - if (have_default_name) { - ret = snprintf(tmp_path, sizeof(tmp_path), - "%s/%s", consumer->subdir, session_name); - } else { - ret = snprintf(tmp_path, sizeof(tmp_path), - "%s/%s-%s/", consumer->subdir, session_name, datetime); - } - if (ret < 0) { - PERROR("snprintf session name date"); - goto error; - } - - if (lttng_strncpy(consumer->subdir, tmp_path, - sizeof(consumer->subdir))) { - ret = -EINVAL; - goto error; - } - DBG2("Consumer subdir set to %s", consumer->subdir); - -error: - return ret; + return consumer_send_pipe(consumer_sock, + LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe); } /* @@ -1392,13 +1413,15 @@ end: /* * Ask the consumer to snapshot a specific channel using the key. * - * Return 0 on success or else a negative error. + * Returns LTTNG_OK on success or else an LTTng error code. */ -int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, - struct snapshot_output *output, int metadata, uid_t uid, gid_t gid, - const char *session_path, int wait, uint64_t nb_packets_per_stream) +enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, + uint64_t key, struct snapshot_output *output, int metadata, + uid_t uid, gid_t gid, const char *session_path, int wait, + uint64_t nb_packets_per_stream, uint64_t trace_archive_id) { int ret; + enum lttng_error_code status = LTTNG_OK; struct lttcomm_consumer_msg msg; assert(socket); @@ -1412,6 +1435,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, msg.u.snapshot_channel.key = key; msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream; msg.u.snapshot_channel.metadata = metadata; + msg.u.snapshot_channel.trace_archive_id = trace_archive_id; if (output->consumer->type == CONSUMER_DST_NET) { msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index; @@ -1420,12 +1444,22 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, sizeof(msg.u.snapshot_channel.pathname), "%s/%s/%s-%s-%" PRIu64 "%s", output->consumer->dst.net.base_dir, - output->consumer->subdir, + output->consumer->domain_subdir, output->name, output->datetime, output->nb_snapshot, session_path); if (ret < 0) { - ret = -LTTNG_ERR_NOMEM; + status = LTTNG_ERR_INVALID; + goto error; + } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { + ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s/%s-%s-%" PRIu64 "%s\"", + sizeof(msg.u.snapshot_channel.pathname), + ret, output->consumer->dst.net.base_dir, + output->consumer->domain_subdir, + output->name, output->datetime, + output->nb_snapshot, + session_path); + status = LTTNG_ERR_SNAPSHOT_FAIL; goto error; } } else { @@ -1437,9 +1471,18 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, output->nb_snapshot, session_path); if (ret < 0) { - ret = -LTTNG_ERR_NOMEM; + status = LTTNG_ERR_NOMEM; + goto error; + } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { + ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s-%s-%" PRIu64 "%s\"", + sizeof(msg.u.snapshot_channel.pathname), + ret, output->consumer->dst.session_root_path, + output->name, output->datetime, output->nb_snapshot, + session_path); + status = LTTNG_ERR_SNAPSHOT_FAIL; goto error; } + msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL; /* Create directory. Ignore if exist. */ @@ -1447,7 +1490,8 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { if (errno != EEXIST) { - ERR("Trace directory creation error"); + status = LTTNG_ERR_CREATE_DIR_FAIL; + PERROR("Trace directory creation error"); goto error; } } @@ -1458,12 +1502,20 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, ret = consumer_send_msg(socket, &msg); pthread_mutex_unlock(socket->lock); if (ret < 0) { + switch (-ret) { + case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND: + status = LTTNG_ERR_CHAN_NOT_FOUND; + break; + default: + status = LTTNG_ERR_SNAPSHOT_FAIL; + break; + } goto error; } error: health_code_update(); - return ret; + return status; } /* @@ -1579,3 +1631,266 @@ end: rcu_read_unlock(); return ret; } + +/* + * Ask the consumer to rotate a channel. + * domain_path contains "/kernel" for kernel or the complete path for UST + * (ex: /ust/uid/1000/64-bit); + * + * The new_chunk_id is the session->rotate_count that has been incremented + * when the rotation started. On the relay, this allows to keep track in which + * chunk each stream is currently writing to (for the rotate_pending operation). + */ +int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, + uid_t uid, gid_t gid, struct consumer_output *output, + const char *domain_path, bool is_metadata_channel, + uint64_t new_chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG("Consumer rotate channel key %" PRIu64, key); + + pthread_mutex_lock(socket->lock); + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL; + msg.u.rotate_channel.key = key; + msg.u.rotate_channel.metadata = !!is_metadata_channel; + msg.u.rotate_channel.new_chunk_id = new_chunk_id; + + if (output->type == CONSUMER_DST_NET) { + msg.u.rotate_channel.relayd_id = output->net_seq_index; + ret = snprintf(msg.u.rotate_channel.pathname, + sizeof(msg.u.rotate_channel.pathname), "%s%s%s", + output->dst.net.base_dir, + output->chunk_path, domain_path); + if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) { + ERR("Failed to format channel path name when asking consumer to rotate channel"); + ret = -LTTNG_ERR_INVALID; + goto error; + } + } else { + msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL; + ret = snprintf(msg.u.rotate_channel.pathname, + sizeof(msg.u.rotate_channel.pathname), "%s/%s%s", + output->dst.session_root_path, + output->chunk_path, domain_path); + if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) { + ERR("Failed to format channel path name when asking consumer to rotate channel"); + ret = -LTTNG_ERR_INVALID; + goto error; + } + } + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + switch (-ret) { + case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND: + ret = -LTTNG_ERR_CHAN_NOT_FOUND; + break; + default: + ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER; + break; + } + goto error; + } +error: + pthread_mutex_unlock(socket->lock); + health_code_update(); + return ret; +} + +int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, + const struct consumer_output *output, const char *old_path, + const char *new_path, uid_t uid, gid_t gid) +{ + int ret; + struct lttcomm_consumer_msg msg; + size_t old_path_length, new_path_length; + + assert(socket); + assert(old_path); + assert(new_path); + + DBG("Consumer rotate rename session %" PRIu64 ", old path = \"%s\", new_path = \"%s\"", + session_id, old_path, new_path); + + old_path_length = strlen(old_path); + if (old_path_length >= sizeof(msg.u.rotate_rename.old_path)) { + ERR("consumer_rotate_rename: old path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", + old_path_length + 1, sizeof(msg.u.rotate_rename.old_path)); + ret = -LTTNG_ERR_INVALID; + goto error; + } + + new_path_length = strlen(new_path); + if (new_path_length >= sizeof(msg.u.rotate_rename.new_path)) { + ERR("consumer_rotate_rename: new path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", + new_path_length + 1, sizeof(msg.u.rotate_rename.new_path)); + ret = -LTTNG_ERR_INVALID; + goto error; + } + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME; + msg.u.rotate_rename.session_id = session_id; + msg.u.rotate_rename.uid = uid; + msg.u.rotate_rename.gid = gid; + strcpy(msg.u.rotate_rename.old_path, old_path); + strcpy(msg.u.rotate_rename.new_path, new_path); + + if (output->type == CONSUMER_DST_NET) { + msg.u.rotate_rename.relayd_id = output->net_seq_index; + } else { + msg.u.rotate_rename.relayd_id = -1ULL; + } + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_ROTATE_RENAME_FAIL_CONSUMER; + goto error; + } + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer if a rotation is locally pending. Must be called with the + * socket lock held. + * + * Return 1 if the rotation is still pending, 0 if finished, a negative value + * on error. + */ +int consumer_check_rotation_pending_local(struct consumer_socket *socket, + uint64_t session_id, uint64_t chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + uint32_t pending = 0; + + assert(socket); + + DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64, + session_id, chunk_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL; + msg.u.check_rotation_pending_local.session_id = session_id; + msg.u.check_rotation_pending_local.chunk_id = chunk_id; + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_ROTATION_PENDING_LOCAL_FAIL_CONSUMER; + goto error; + } + + ret = consumer_socket_recv(socket, &pending, sizeof(pending)); + if (ret < 0) { + goto error; + } + + ret = pending; + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer if a rotation is pending on the relayd. Must be called with + * the socket lock held. + * + * Return 1 if the rotation is still pending, 0 if finished, a negative value + * on error. + */ +int consumer_check_rotation_pending_relay(struct consumer_socket *socket, + const struct consumer_output *output, uint64_t session_id, + uint64_t chunk_id) +{ + int ret; + struct lttcomm_consumer_msg msg; + uint32_t pending = 0; + + assert(socket); + + DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64, + session_id, chunk_id); + assert(output->type == CONSUMER_DST_NET); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY; + msg.u.check_rotation_pending_relay.session_id = session_id; + msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index; + msg.u.check_rotation_pending_relay.chunk_id = chunk_id; + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_ROTATION_PENDING_RELAY_FAIL_CONSUMER; + goto error; + } + + ret = consumer_socket_recv(socket, &pending, sizeof(pending)); + if (ret < 0) { + goto error; + } + + ret = pending; + +error: + health_code_update(); + return ret; +} + +/* + * Ask the consumer to create a directory. + * + * Called with the consumer socket lock held. + */ +int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id, + const struct consumer_output *output, const char *path, + uid_t uid, gid_t gid) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG("Consumer mkdir %s in session %" PRIu64, path, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_MKDIR; + msg.u.mkdir.session_id = session_id; + msg.u.mkdir.uid = uid; + msg.u.mkdir.gid = gid; + ret = snprintf(msg.u.mkdir.path, sizeof(msg.u.mkdir.path), "%s", path); + if (ret < 0 || ret >= sizeof(msg.u.mkdir.path)) { + ERR("Format path"); + ret = -LTTNG_ERR_INVALID; + goto error; + } + + if (output->type == CONSUMER_DST_NET) { + msg.u.mkdir.relayd_id = output->net_seq_index; + } else { + msg.u.mkdir.relayd_id = -1ULL; + } + + health_code_update(); + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + ret = -LTTNG_ERR_MKDIR_FAIL_CONSUMER; + goto error; + } + +error: + health_code_update(); + return ret; +}