X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Frelayd%2Frelayd.c;h=363a0e7b6dc02f8214d92175f64f4d06d48aa5b8;hp=91cbf762f90989809aaafd17623bee095492f4f9;hb=c35f9726a22f1d93e14589688d830efccda196f3;hpb=7fd975c523ee6e0bb45dcb13b7308b8d9d6406ba diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 91cbf762f..363a0e7b6 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "relayd.h" @@ -1121,82 +1122,97 @@ error: return ret; } -int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, - uint64_t new_chunk_id, uint64_t seq_num) +int relayd_rotate_streams(struct lttcomm_relayd_sock *sock, + unsigned int stream_count, uint64_t *new_chunk_id, + const struct relayd_stream_rotation_position *positions) { int ret; - struct lttcomm_relayd_rotate_stream *msg = NULL; - struct lttcomm_relayd_generic_reply reply; - size_t len; - int msg_len; - /* FIXME */ - char *new_pathname = NULL; + unsigned int i; + struct lttng_dynamic_buffer payload; + struct lttcomm_relayd_generic_reply reply = {}; + const struct lttcomm_relayd_rotate_streams msg = { + .stream_count = htobe32((uint32_t) stream_count), + .new_chunk_id = (typeof(msg.new_chunk_id)) { + .is_set = !!new_chunk_id, + .value = htobe64(new_chunk_id ? *new_chunk_id : 0), + }, + }; + char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {}; + const char *new_chunk_id_str; - /* Code flow error. Safety net. */ - assert(rsock); + lttng_dynamic_buffer_init(&payload); - DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id); + /* Code flow error. Safety net. */ + assert(sock); - /* Account for the trailing NULL. */ - len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1; - if (len > LTTNG_PATH_MAX) { - ERR("Path used in relayd rotate stream command exceeds the maximal allowed length"); - ret = -1; - goto error; + if (new_chunk_id) { + ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf), + "%" PRIu64, *new_chunk_id); + if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) { + new_chunk_id_str = "formatting error"; + } else { + new_chunk_id_str = new_chunk_id_buf; + } + } else { + new_chunk_id_str = "none"; } - msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len; - msg = zmalloc(msg_len); - if (!msg) { - PERROR("Failed to allocate relayd rotate stream command of %d bytes", - msg_len); - ret = -1; - goto error; - } + DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u", + new_chunk_id_str, stream_count); - if (lttng_strncpy(msg->new_pathname, new_pathname, len)) { - ret = -1; - ERR("Failed to copy relayd rotate stream command's new path name"); + ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg)); + if (ret) { + ERR("Failed to allocate \"rotate streams\" command payload"); goto error; } - msg->pathname_length = htobe32(len); - msg->stream_id = htobe64(stream_id); - msg->new_chunk_id = htobe64(new_chunk_id); - /* - * The seq_num is invalid for metadata streams, but it is ignored on - * the relay. - */ - msg->rotate_at_seq_num = htobe64(seq_num); + for (i = 0; i < stream_count; i++) { + const struct relayd_stream_rotation_position *position = + &positions[i]; + const struct lttcomm_relayd_stream_rotation_position comm_position = { + .stream_id = htobe64(position->stream_id), + .rotate_at_seq_num = htobe64( + position->rotate_at_seq_num), + }; + + DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64, + position->stream_id, + position->rotate_at_seq_num); + ret = lttng_dynamic_buffer_append(&payload, &comm_position, + sizeof(comm_position)); + if (ret) { + ERR("Failed to allocate \"rotate streams\" command payload"); + goto error; + } + } /* Send command. */ - ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0); + ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data, + payload.size, 0); if (ret < 0) { - ERR("Send rotate command"); + ERR("Failed to send \"rotate stream\" command"); goto error; } /* Receive response. */ - ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + ret = recv_reply(sock, &reply, sizeof(reply)); if (ret < 0) { - ERR("Receive rotate reply"); + ERR("Failed to receive \"rotate streams\" command reply"); goto error; } reply.ret_code = be32toh(reply.ret_code); - - /* Return session id or negative ret code. */ if (reply.ret_code != LTTNG_OK) { ret = -1; - ERR("Relayd rotate stream replied error %d", reply.ret_code); + ERR("Relayd rotate streams replied error %d", reply.ret_code); } else { /* Success. */ ret = 0; - DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id); + DBG("Relayd rotated streams successfully"); } error: - free(msg); + lttng_dynamic_buffer_reset(&payload); return ret; } @@ -1359,3 +1375,43 @@ int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock, end: return ret; } + +int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock, + uint64_t chunk_id, bool *chunk_exists) +{ + int ret = 0; + struct lttcomm_relayd_trace_chunk_exists msg = {}; + struct lttcomm_relayd_trace_chunk_exists_reply reply = {}; + + msg = (typeof(msg)){ + .chunk_id = htobe64(chunk_id), + }; + + ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg), + 0); + if (ret < 0) { + ERR("Failed to send trace chunk exists command to relay daemon"); + goto end; + } + + ret = recv_reply(sock, &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to receive relay daemon trace chunk close command reply"); + goto end; + } + + reply.generic.ret_code = be32toh(reply.generic.ret_code); + if (reply.generic.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd trace chunk close replied error %d", + reply.generic.ret_code); + } else { + ret = 0; + DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64 + ", exists = %s", chunk_id, + reply.trace_chunk_exists ? "true" : "false"); + *chunk_exists = !!reply.trace_chunk_exists; + } +end: + return ret; +}