X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=dc945020bf49853edf2e3369c48c4c9096a2acaa;hp=99b22210d3ffff07538900b465a32e46008ba8e4;hb=69ebf37e42e67cbd8dca80e9f5f074e88770af2d;hpb=fb9a95c4d6242bd8336b638c90a7d8f846125659 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 99b22210d..dc945020b 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1,19 +1,9 @@ /* - * Copyright (C) 2012 - David Goulet - * 2018 - Jérémie Galarneau + * Copyright (C) 2012 David Goulet + * Copyright (C) 2018 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE @@ -30,6 +20,7 @@ #include #include #include +#include #include "consumer.h" #include "health-sessiond.h" @@ -37,6 +28,63 @@ #include "utils.h" #include "lttng-sessiond.h" +/* + * Return allocated full pathname of the session using the consumer trace path + * and subdir if available. + * + * The caller can safely free(3) the returned value. On error, NULL is + * returned. + */ +char *setup_channel_trace_path(struct consumer_output *consumer, + const char *session_path, size_t *consumer_path_offset) +{ + int ret; + char *pathname; + + assert(consumer); + assert(session_path); + + health_code_update(); + + /* + * Allocate the string ourself to make sure we never exceed + * LTTNG_PATH_MAX. + */ + pathname = zmalloc(LTTNG_PATH_MAX); + if (!pathname) { + goto error; + } + + /* Get correct path name destination */ + if (consumer->type == CONSUMER_DST_NET && + consumer->relay_major_version == 2 && + consumer->relay_minor_version < 11) { + ret = snprintf(pathname, LTTNG_PATH_MAX, "%s%s/%s/%s", + consumer->dst.net.base_dir, + consumer->chunk_path, consumer->domain_subdir, + session_path); + *consumer_path_offset = 0; + } else { + ret = snprintf(pathname, LTTNG_PATH_MAX, "%s/%s", + consumer->domain_subdir, session_path); + *consumer_path_offset = strlen(consumer->domain_subdir) + 1; + } + DBG3("Consumer trace path relative to current trace chunk: \"%s\"", + pathname); + if (ret < 0) { + PERROR("Failed to format channel path"); + goto error; + } else if (ret >= LTTNG_PATH_MAX) { + ERR("Truncation occurred while formatting channel path"); + goto error; + } + + return pathname; +error: + free(pathname); + return NULL; +} + /* * Send a data payload using a given consumer socket of size len. * @@ -314,7 +362,7 @@ error: * object reference is not needed anymore. */ struct consumer_socket *consumer_find_socket_by_bitness(int bits, - struct consumer_output *consumer) + const struct consumer_output *consumer) { int consumer_fd; struct consumer_socket *socket = NULL; @@ -347,7 +395,7 @@ end: * returned consumer_socket. */ struct consumer_socket *consumer_find_socket(int key, - struct consumer_output *consumer) + const struct consumer_output *consumer) { struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; @@ -404,7 +452,7 @@ void consumer_add_socket(struct consumer_socket *sock, } /* - * Delte consumer socket to consumer output object. Read side lock must be + * Delete consumer socket to consumer output object. Read side lock must be * acquired before calling this function. */ void consumer_del_socket(struct consumer_socket *sock, @@ -568,6 +616,7 @@ struct consumer_output *consumer_copy_output(struct consumer_output *src) output->snapshot = src->snapshot; output->relay_major_version = src->relay_major_version; output->relay_minor_version = src->relay_minor_version; + output->relay_allows_clear = src->relay_allows_clear; memcpy(&output->dst, &src->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, src); if (ret < 0) { @@ -870,8 +919,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t session_id, const char *pathname, const char *name, - uid_t uid, - gid_t gid, uint64_t relayd_id, uint64_t key, unsigned char *uuid, @@ -884,12 +931,26 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, int64_t blocking_timeout, const char *root_shm_path, const char *shm_path, - uint64_t trace_archive_id) + struct lttng_trace_chunk *trace_chunk, + const struct lttng_credentials *buffer_credentials) { assert(msg); - /* Zeroed structure */ + /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + msg->u.ask_channel.buffer_credentials.uid = UINT32_MAX; + msg->u.ask_channel.buffer_credentials.gid = UINT32_MAX; + + if (trace_chunk) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&msg->u.ask_channel.chunk_id, chunk_id); + } + msg->u.ask_channel.buffer_credentials.uid = buffer_credentials->uid; + msg->u.ask_channel.buffer_credentials.gid = buffer_credentials->gid; msg->cmd_type = LTTNG_CONSUMER_ASK_CHANNEL_CREATION; msg->u.ask_channel.subbuf_size = subbuf_size; @@ -903,8 +964,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; msg->u.ask_channel.session_id_per_pid = session_id_per_pid; - msg->u.ask_channel.uid = uid; - msg->u.ask_channel.gid = gid; msg->u.ask_channel.relayd_id = relayd_id; msg->u.ask_channel.key = key; msg->u.ask_channel.chan_id = chan_id; @@ -913,7 +972,6 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; msg->u.ask_channel.blocking_timeout = blocking_timeout; - msg->u.ask_channel.trace_archive_id = trace_archive_id; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -956,19 +1014,27 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t tracefile_count, unsigned int monitor, unsigned int live_timer_interval, - unsigned int monitor_timer_interval) + unsigned int monitor_timer_interval, + struct lttng_trace_chunk *trace_chunk) { assert(msg); /* Zeroed structure */ memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + if (trace_chunk) { + uint64_t chunk_id; + enum lttng_trace_chunk_status chunk_status; + + chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id); + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + LTTNG_OPTIONAL_SET(&msg->u.channel.chunk_id, chunk_id); + } + /* Send channel */ msg->cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; msg->u.channel.channel_key = channel_key; msg->u.channel.session_id = session_id; - msg->u.channel.uid = uid; - msg->u.channel.gid = gid; msg->u.channel.relayd_id = relayd_id; msg->u.channel.nb_init_streams = nb_init_streams; msg->u.channel.output = output; @@ -993,8 +1059,7 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int32_t cpu, - uint64_t trace_archive_id) + int32_t cpu) { assert(msg); @@ -1004,7 +1069,6 @@ void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; msg->u.stream.cpu = cpu; - msg->u.stream.trace_archive_id = trace_archive_id; } void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, @@ -1059,7 +1123,9 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id, const char *session_name, const char *hostname, - int session_live_timer) + const char *base_path, int session_live_timer, + const uint64_t *current_chunk_id, time_t session_creation_time, + bool session_name_contains_creation_time) { int ret; struct lttcomm_consumer_msg msg; @@ -1077,16 +1143,26 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, } if (type == LTTNG_STREAM_CONTROL) { + char output_path[LTTNG_PATH_MAX] = {}; + uint64_t relayd_session_id; + ret = relayd_create_session(rsock, - &msg.u.relayd_sock.relayd_session_id, - session_name, hostname, session_live_timer, + &relayd_session_id, + session_name, hostname, base_path, + session_live_timer, consumer->snapshot, session_id, - sessiond_uuid); + sessiond_uuid, current_chunk_id, + session_creation_time, + session_name_contains_creation_time, + output_path); if (ret < 0) { /* Close the control socket. */ (void) relayd_close(rsock); goto error; } + msg.u.relayd_sock.relayd_session_id = relayd_session_id; + DBG("Created session on relay, output path reply: %s", + output_path); } msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; @@ -1107,7 +1183,7 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, } DBG3("Sending relayd socket file descriptor to consumer"); - ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1); + ret = consumer_send_fds(consumer_sock, ALIGNED_CONST_PTR(rsock->sock.fd), 1); if (ret < 0) { goto error; } @@ -1419,9 +1495,9 @@ end: * Returns LTTNG_OK on success or else an LTTng error code. */ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, - uint64_t key, const struct snapshot_output *output, int metadata, - uid_t uid, gid_t gid, const char *session_path, int wait, - uint64_t nb_packets_per_stream, uint64_t trace_archive_id) + uint64_t key, const struct consumer_output *output, int metadata, + uid_t uid, gid_t gid, const char *channel_path, int wait, + uint64_t nb_packets_per_stream) { int ret; enum lttng_error_code status = LTTNG_OK; @@ -1429,7 +1505,6 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, assert(socket); assert(output); - assert(output->consumer); DBG("Consumer snapshot channel key %" PRIu64, key); @@ -1438,66 +1513,24 @@ enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket, msg.u.snapshot_channel.key = key; msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream; msg.u.snapshot_channel.metadata = metadata; - msg.u.snapshot_channel.trace_archive_id = trace_archive_id; - if (output->consumer->type == CONSUMER_DST_NET) { - msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index; + if (output->type == CONSUMER_DST_NET) { + msg.u.snapshot_channel.relayd_id = + output->net_seq_index; msg.u.snapshot_channel.use_relayd = 1; - ret = snprintf(msg.u.snapshot_channel.pathname, - sizeof(msg.u.snapshot_channel.pathname), - "%s/%s/%s-%s-%" PRIu64 "%s", - output->consumer->dst.net.base_dir, - output->consumer->domain_subdir, - output->name, output->datetime, - output->nb_snapshot, - session_path); - if (ret < 0) { - status = LTTNG_ERR_INVALID; - goto error; - } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { - ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s/%s-%s-%" PRIu64 "%s\"", - sizeof(msg.u.snapshot_channel.pathname), - ret, output->consumer->dst.net.base_dir, - output->consumer->domain_subdir, - output->name, output->datetime, - output->nb_snapshot, - session_path); - status = LTTNG_ERR_SNAPSHOT_FAIL; - goto error; - } } else { - ret = snprintf(msg.u.snapshot_channel.pathname, - sizeof(msg.u.snapshot_channel.pathname), - "%s/%s-%s-%" PRIu64 "%s", - output->consumer->dst.session_root_path, - output->name, output->datetime, - output->nb_snapshot, - session_path); - if (ret < 0) { - status = LTTNG_ERR_NOMEM; - goto error; - } else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) { - ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s-%s-%" PRIu64 "%s\"", - sizeof(msg.u.snapshot_channel.pathname), - ret, output->consumer->dst.session_root_path, - output->name, output->datetime, output->nb_snapshot, - session_path); - status = LTTNG_ERR_SNAPSHOT_FAIL; - goto error; - } - msg.u.snapshot_channel.relayd_id = (uint64_t) -1ULL; - - /* Create directory. Ignore if exist. */ - ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, - S_IRWXU | S_IRWXG, uid, gid); - if (ret < 0) { - if (errno != EEXIST) { - status = LTTNG_ERR_CREATE_DIR_FAIL; - PERROR("Trace directory creation error"); - goto error; - } - } + } + ret = lttng_strncpy(msg.u.snapshot_channel.pathname, + channel_path, + sizeof(msg.u.snapshot_channel.pathname)); + if (ret < 0) { + ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%zu bytes required) with path \"%s\"", + sizeof(msg.u.snapshot_channel.pathname), + strlen(channel_path), + channel_path); + status = LTTNG_ERR_SNAPSHOT_FAIL; + goto error; } health_code_update(); @@ -1637,8 +1670,6 @@ end: /* * Ask the consumer to rotate a channel. - * domain_path contains "/kernel" for kernel or the complete path for UST - * (ex: /ust/uid/1000/64-bit); * * The new_chunk_id is the session->rotate_count that has been incremented * when the rotation started. On the relay, this allows to keep track in which @@ -1646,8 +1677,7 @@ end: */ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - const char *domain_path, bool is_metadata_channel, - uint64_t new_chunk_id) + bool is_metadata_channel) { int ret; struct lttcomm_consumer_msg msg; @@ -1661,30 +1691,11 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL; msg.u.rotate_channel.key = key; msg.u.rotate_channel.metadata = !!is_metadata_channel; - msg.u.rotate_channel.new_chunk_id = new_chunk_id; if (output->type == CONSUMER_DST_NET) { msg.u.rotate_channel.relayd_id = output->net_seq_index; - ret = snprintf(msg.u.rotate_channel.pathname, - sizeof(msg.u.rotate_channel.pathname), "%s%s%s", - output->dst.net.base_dir, - output->chunk_path, domain_path); - if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) { - ERR("Failed to format channel path name when asking consumer to rotate channel"); - ret = -LTTNG_ERR_INVALID; - goto error; - } } else { msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL; - ret = snprintf(msg.u.rotate_channel.pathname, - sizeof(msg.u.rotate_channel.pathname), "%s/%s%s", - output->dst.session_root_path, - output->chunk_path, domain_path); - if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) { - ERR("Failed to format channel path name when asking consumer to rotate channel"); - ret = -LTTNG_ERR_INVALID; - goto error; - } } health_code_update(); @@ -1706,217 +1717,414 @@ error: return ret; } -int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, - const struct consumer_output *output, const char *old_path, - const char *new_path, uid_t uid, gid_t gid) +int consumer_clear_channel(struct consumer_socket *socket, uint64_t key) { int ret; struct lttcomm_consumer_msg msg; - size_t old_path_length, new_path_length; assert(socket); - assert(old_path); - assert(new_path); - DBG("Consumer rotate rename session %" PRIu64 ", old path = \"%s\", new_path = \"%s\"", - session_id, old_path, new_path); - - old_path_length = strlen(old_path); - if (old_path_length >= sizeof(msg.u.rotate_rename.old_path)) { - ERR("consumer_rotate_rename: old path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", - old_path_length + 1, sizeof(msg.u.rotate_rename.old_path)); - ret = -LTTNG_ERR_INVALID; - goto error; - } - - new_path_length = strlen(new_path); - if (new_path_length >= sizeof(msg.u.rotate_rename.new_path)) { - ERR("consumer_rotate_rename: new path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)", - new_path_length + 1, sizeof(msg.u.rotate_rename.new_path)); - ret = -LTTNG_ERR_INVALID; - goto error; - } + DBG("Consumer clear channel %" PRIu64, key); memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME; - msg.u.rotate_rename.session_id = session_id; - msg.u.rotate_rename.uid = uid; - msg.u.rotate_rename.gid = gid; - strcpy(msg.u.rotate_rename.old_path, old_path); - strcpy(msg.u.rotate_rename.new_path, new_path); - - if (output->type == CONSUMER_DST_NET) { - msg.u.rotate_rename.relayd_id = output->net_seq_index; - } else { - msg.u.rotate_rename.relayd_id = -1ULL; - } + msg.cmd_type = LTTNG_CONSUMER_CLEAR_CHANNEL; + msg.u.clear_channel.key = key; health_code_update(); + + pthread_mutex_lock(socket->lock); ret = consumer_send_msg(socket, &msg); if (ret < 0) { - ret = -LTTNG_ERR_ROTATE_RENAME_FAIL_CONSUMER; - goto error; + goto error_socket; } -error: +error_socket: + pthread_mutex_unlock(socket->lock); + health_code_update(); return ret; } -/* - * Ask the consumer if a rotation is locally pending. Must be called with the - * socket lock held. - * - * Return 1 if the rotation is still pending, 0 if finished, a negative value - * on error. - */ -int consumer_check_rotation_pending_local(struct consumer_socket *socket, - uint64_t session_id, uint64_t chunk_id) +int consumer_init(struct consumer_socket *socket, + const lttng_uuid sessiond_uuid) { int ret; - struct lttcomm_consumer_msg msg; - uint32_t pending = 0; + struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_INIT, + }; assert(socket); - DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64, - session_id, chunk_id); - - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL; - msg.u.check_rotation_pending_local.session_id = session_id; - msg.u.check_rotation_pending_local.chunk_id = chunk_id; + DBG("Sending consumer initialization command"); + lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); health_code_update(); ret = consumer_send_msg(socket, &msg); if (ret < 0) { - ret = -LTTNG_ERR_ROTATION_PENDING_LOCAL_FAIL_CONSUMER; goto error; } - ret = consumer_socket_recv(socket, &pending, sizeof(pending)); - if (ret < 0) { - goto error; - } - - ret = pending; - error: health_code_update(); return ret; } /* - * Ask the consumer if a rotation is pending on the relayd. Must be called with - * the socket lock held. + * Ask the consumer to create a new chunk for a given session. * - * Return 1 if the rotation is still pending, 0 if finished, a negative value - * on error. + * Called with the consumer socket lock held. */ -int consumer_check_rotation_pending_relay(struct consumer_socket *socket, - const struct consumer_output *output, uint64_t session_id, - uint64_t chunk_id) +int consumer_create_trace_chunk(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk, + const char *domain_subdir) { int ret; - struct lttcomm_consumer_msg msg; - uint32_t pending = 0; + enum lttng_trace_chunk_status chunk_status; + struct lttng_credentials chunk_credentials; + const struct lttng_directory_handle *chunk_directory_handle = NULL; + struct lttng_directory_handle *domain_handle = NULL; + int domain_dirfd; + const char *chunk_name; + bool chunk_name_overridden; + uint64_t chunk_id; + time_t creation_timestamp; + char creation_timestamp_buffer[ISO8601_STR_LEN]; + const char *creation_timestamp_str = "(none)"; + const bool chunk_has_local_output = relayd_id == -1ULL; + enum lttng_trace_chunk_status tc_status; + struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK, + .u.create_trace_chunk.session_id = session_id, + }; assert(socket); + assert(chunk); - DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64, - session_id, chunk_id); - assert(output->type == CONSUMER_DST_NET); + if (relayd_id != -1ULL) { + LTTNG_OPTIONAL_SET(&msg.u.create_trace_chunk.relayd_id, + relayd_id); + } - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY; - msg.u.check_rotation_pending_relay.session_id = session_id; - msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index; - msg.u.check_rotation_pending_relay.chunk_id = chunk_id; + chunk_status = lttng_trace_chunk_get_name(chunk, &chunk_name, + &chunk_name_overridden); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK && + chunk_status != LTTNG_TRACE_CHUNK_STATUS_NONE) { + ERR("Failed to get name of trace chunk"); + ret = -LTTNG_ERR_FATAL; + goto error; + } + if (chunk_name_overridden) { + ret = lttng_strncpy(msg.u.create_trace_chunk.override_name, + chunk_name, + sizeof(msg.u.create_trace_chunk.override_name)); + if (ret) { + ERR("Trace chunk name \"%s\" exceeds the maximal length allowed by the consumer protocol", + chunk_name); + ret = -LTTNG_ERR_FATAL; + goto error; + } + } - health_code_update(); - ret = consumer_send_msg(socket, &msg); - if (ret < 0) { - ret = -LTTNG_ERR_ROTATION_PENDING_RELAY_FAIL_CONSUMER; + chunk_status = lttng_trace_chunk_get_creation_timestamp(chunk, + &creation_timestamp); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -LTTNG_ERR_FATAL; goto error; } + msg.u.create_trace_chunk.creation_timestamp = + (uint64_t) creation_timestamp; + /* Only used for logging purposes. */ + ret = time_to_iso8601_str(creation_timestamp, + creation_timestamp_buffer, + sizeof(creation_timestamp_buffer)); + creation_timestamp_str = !ret ? creation_timestamp_buffer : + "(formatting error)"; + + chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Anonymous trace chunks should never be transmitted + * to remote peers (consumerd and relayd). They are used + * internally for backward-compatibility purposes. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + msg.u.create_trace_chunk.chunk_id = chunk_id; - ret = consumer_socket_recv(socket, &pending, sizeof(pending)); + if (chunk_has_local_output) { + chunk_status = lttng_trace_chunk_borrow_chunk_directory_handle( + chunk, &chunk_directory_handle); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + ret = -LTTNG_ERR_FATAL; + goto error; + } + chunk_status = lttng_trace_chunk_get_credentials( + chunk, &chunk_credentials); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Not associating credentials to a sessiond chunk is a + * fatal internal error. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + tc_status = lttng_trace_chunk_create_subdirectory( + chunk, domain_subdir); + if (tc_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + PERROR("Failed to create chunk domain output directory \"%s\"", + domain_subdir); + ret = -LTTNG_ERR_FATAL; + goto error; + } + domain_handle = lttng_directory_handle_create_from_handle( + domain_subdir, + chunk_directory_handle); + if (!domain_handle) { + ret = -LTTNG_ERR_FATAL; + goto error; + } + + /* + * This will only compile on platforms that support + * dirfd (POSIX.2008). This is fine as the session daemon + * is only built for such platforms. + * + * The ownership of the chunk directory handle's is maintained + * by the trace chunk. + */ + domain_dirfd = lttng_directory_handle_get_dirfd( + domain_handle); + assert(domain_dirfd >= 0); + + msg.u.create_trace_chunk.credentials.value.uid = + chunk_credentials.uid; + msg.u.create_trace_chunk.credentials.value.gid = + chunk_credentials.gid; + msg.u.create_trace_chunk.credentials.is_set = 1; + } + + DBG("Sending consumer create trace chunk command: relayd_id = %" PRId64 + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", creation_timestamp = %s", + relayd_id, session_id, chunk_id, + creation_timestamp_str); + health_code_update(); + ret = consumer_send_msg(socket, &msg); + health_code_update(); if (ret < 0) { + ERR("Trace chunk creation error on consumer"); + ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } - ret = pending; - + if (chunk_has_local_output) { + DBG("Sending trace chunk domain directory fd to consumer"); + health_code_update(); + ret = consumer_send_fds(socket, &domain_dirfd, 1); + health_code_update(); + if (ret < 0) { + ERR("Trace chunk creation error on consumer"); + ret = -LTTNG_ERR_CREATE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + } error: - health_code_update(); + lttng_directory_handle_put(domain_handle); return ret; } /* - * Ask the consumer to create a directory. + * Ask the consumer to close a trace chunk for a given session. * * Called with the consumer socket lock held. */ -int consumer_mkdir(struct consumer_socket *socket, uint64_t session_id, - const struct consumer_output *output, const char *path, - uid_t uid, gid_t gid) +int consumer_close_trace_chunk(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk, + char *closed_trace_chunk_path) { int ret; - struct lttcomm_consumer_msg msg; + enum lttng_trace_chunk_status chunk_status; + struct lttcomm_consumer_msg msg = { + .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, + .u.close_trace_chunk.session_id = session_id, + }; + struct lttcomm_consumer_close_trace_chunk_reply reply; + uint64_t chunk_id; + time_t close_timestamp; + enum lttng_trace_chunk_command_type close_command; + const char *close_command_name = "none"; + struct lttng_dynamic_buffer path_reception_buffer; assert(socket); + lttng_dynamic_buffer_init(&path_reception_buffer); - DBG("Consumer mkdir %s in session %" PRIu64, path, session_id); + if (relayd_id != -1ULL) { + LTTNG_OPTIONAL_SET( + &msg.u.close_trace_chunk.relayd_id, relayd_id); + } - memset(&msg, 0, sizeof(msg)); - msg.cmd_type = LTTNG_CONSUMER_MKDIR; - msg.u.mkdir.session_id = session_id; - msg.u.mkdir.uid = uid; - msg.u.mkdir.gid = gid; - ret = snprintf(msg.u.mkdir.path, sizeof(msg.u.mkdir.path), "%s", path); - if (ret < 0 || ret >= sizeof(msg.u.mkdir.path)) { - ERR("Format path"); - ret = -LTTNG_ERR_INVALID; + chunk_status = lttng_trace_chunk_get_close_command( + chunk, &close_command); + switch (chunk_status) { + case LTTNG_TRACE_CHUNK_STATUS_OK: + LTTNG_OPTIONAL_SET(&msg.u.close_trace_chunk.close_command, + (uint32_t) close_command); + break; + case LTTNG_TRACE_CHUNK_STATUS_NONE: + break; + default: + ERR("Failed to get trace chunk close command"); + ret = -1; goto error; } - if (output->type == CONSUMER_DST_NET) { - msg.u.mkdir.relayd_id = output->net_seq_index; - } else { - msg.u.mkdir.relayd_id = -1ULL; + chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); + /* + * Anonymous trace chunks should never be transmitted to remote peers + * (consumerd and relayd). They are used internally for + * backward-compatibility purposes. + */ + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + msg.u.close_trace_chunk.chunk_id = chunk_id; + + chunk_status = lttng_trace_chunk_get_close_timestamp(chunk, + &close_timestamp); + /* + * A trace chunk should be closed locally before being closed remotely. + * Otherwise, the close timestamp would never be transmitted to the + * peers. + */ + assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); + msg.u.close_trace_chunk.close_timestamp = (uint64_t) close_timestamp; + + if (msg.u.close_trace_chunk.close_command.is_set) { + close_command_name = lttng_trace_chunk_command_type_get_name( + close_command); } + DBG("Sending consumer close trace chunk command: relayd_id = %" PRId64 + ", session_id = %" PRIu64 ", chunk_id = %" PRIu64 + ", close command = \"%s\"", + relayd_id, session_id, chunk_id, close_command_name); health_code_update(); - ret = consumer_send_msg(socket, &msg); + ret = consumer_socket_send(socket, &msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { - ret = -LTTNG_ERR_MKDIR_FAIL_CONSUMER; + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; goto error; } - + ret = consumer_socket_recv(socket, &reply, sizeof(reply)); + if (ret < 0) { + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + if (reply.path_length >= LTTNG_PATH_MAX) { + ERR("Invalid path returned by relay daemon: %" PRIu32 "bytes exceeds maximal allowed length of %d bytes", + reply.path_length, LTTNG_PATH_MAX); + ret = -LTTNG_ERR_INVALID_PROTOCOL; + goto error; + } + ret = lttng_dynamic_buffer_set_size(&path_reception_buffer, + reply.path_length); + if (ret) { + ERR("Failed to allocate reception buffer of path returned by the \"close trace chunk\" command"); + ret = -LTTNG_ERR_NOMEM; + goto error; + } + ret = consumer_socket_recv(socket, path_reception_buffer.data, + path_reception_buffer.size); + if (ret < 0) { + ERR("Communication error while receiving path of closed trace chunk"); + ret = -LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER; + goto error; + } + if (path_reception_buffer.data[path_reception_buffer.size - 1] != '\0') { + ERR("Invalid path returned by relay daemon: not null-terminated"); + ret = -LTTNG_ERR_INVALID_PROTOCOL; + goto error; + } + if (closed_trace_chunk_path) { + /* + * closed_trace_chunk_path is assumed to have a length >= + * LTTNG_PATH_MAX + */ + memcpy(closed_trace_chunk_path, path_reception_buffer.data, + path_reception_buffer.size); + } error: + lttng_dynamic_buffer_reset(&path_reception_buffer); health_code_update(); return ret; } -int consumer_init(struct consumer_socket *socket, - const lttng_uuid sessiond_uuid) +/* + * Ask the consumer if a trace chunk exists. + * + * Called with the consumer socket lock held. + * Returns 0 on success, or a negative value on error. + */ +int consumer_trace_chunk_exists(struct consumer_socket *socket, + uint64_t relayd_id, uint64_t session_id, + struct lttng_trace_chunk *chunk, + enum consumer_trace_chunk_exists_status *result) { int ret; + enum lttng_trace_chunk_status chunk_status; struct lttcomm_consumer_msg msg = { - .cmd_type = LTTNG_CONSUMER_INIT, + .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, + .u.trace_chunk_exists.session_id = session_id, }; + uint64_t chunk_id; + const char *consumer_reply_str; assert(socket); - DBG("Sending consumer initialization command"); - lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid); + if (relayd_id != -1ULL) { + LTTNG_OPTIONAL_SET(&msg.u.trace_chunk_exists.relayd_id, + relayd_id); + } + + chunk_status = lttng_trace_chunk_get_id(chunk, &chunk_id); + if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { + /* + * Anonymous trace chunks should never be transmitted + * to remote peers (consumerd and relayd). They are used + * internally for backward-compatibility purposes. + */ + ret = -LTTNG_ERR_FATAL; + goto error; + } + msg.u.trace_chunk_exists.chunk_id = chunk_id; + + DBG("Sending consumer trace chunk exists command: relayd_id = %" PRId64 + ", session_id = %" PRIu64 + ", chunk_id = %" PRIu64, relayd_id, session_id, chunk_id); health_code_update(); ret = consumer_send_msg(socket, &msg); - if (ret < 0) { + switch (-ret) { + case LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK: + consumer_reply_str = "unknown trace chunk"; + *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK; + break; + case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL: + consumer_reply_str = "trace chunk exists locally"; + *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_LOCAL; + break; + case LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE: + consumer_reply_str = "trace chunk exists on remote peer"; + *result = CONSUMER_TRACE_CHUNK_EXISTS_STATUS_EXISTS_REMOTE; + break; + default: + ERR("Consumer returned an error from TRACE_CHUNK_EXISTS command"); + ret = -1; goto error; } - + DBG("Consumer reply to TRACE_CHUNK_EXISTS command: %s", + consumer_reply_str); + ret = 0; error: health_code_update(); return ret;