clang-tidy: add Chrome-inspired checks
[lttng-tools.git] / src / common / relayd / relayd.cpp
CommitLineData
00e2e675 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
00e2e675 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
00e2e675 5 *
00e2e675
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
28ab034a 9#include "relayd.hpp"
00e2e675 10
c9e313bc 11#include <common/common.hpp>
c9e313bc
SM
12#include <common/compat/endian.hpp>
13#include <common/compat/string.hpp>
28ab034a 14#include <common/defaults.hpp>
c9e313bc 15#include <common/index/ctf-index.hpp>
28ab034a 16#include <common/sessiond-comm/relayd.hpp>
c9e313bc 17#include <common/string-utils/format.hpp>
28ab034a 18#include <common/trace-chunk.hpp>
00e2e675 19
28ab034a
JG
20#include <inttypes.h>
21#include <stdio.h>
22#include <stdlib.h>
23#include <string.h>
24#include <sys/stat.h>
00e2e675 25
28ab034a 26static bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
070b6a86
MD
27{
28 if (sock->major > 2) {
29 return true;
30 } else if (sock->major == 2 && sock->minor >= 11) {
31 return true;
32 }
33 return false;
34}
35
28ab034a 36static bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
8614e600
MD
37{
38 if (sock->major > 2) {
39 return true;
40 } else if (sock->major == 2 && sock->minor >= 12) {
41 return true;
42 }
43 return false;
44}
45
00e2e675
DG
46/*
47 * Send command. Fill up the header and append the data.
48 */
6151a90f 49static int send_command(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
50 enum lttcomm_relayd_command cmd,
51 const void *data,
52 size_t size,
53 int flags)
00e2e675
DG
54{
55 int ret;
56 struct lttcomm_relayd_hdr header;
57 char *buf;
58 uint64_t buf_size = sizeof(header);
59
f96e4545
MD
60 if (rsock->sock.fd < 0) {
61 return -ECONNRESET;
62 }
63
00e2e675
DG
64 if (data) {
65 buf_size += size;
66 }
67
64803277 68 buf = calloc<char>(buf_size);
cd9adb8b 69 if (buf == nullptr) {
00e2e675
DG
70 PERROR("zmalloc relayd send command buf");
71 ret = -1;
72 goto alloc_error;
73 }
74
53efb85a 75 memset(&header, 0, sizeof(header));
00e2e675
DG
76 header.cmd = htobe32(cmd);
77 header.data_size = htobe64(size);
78
79 /* Zeroed for now since not used. */
80 header.cmd_version = 0;
81 header.circuit_id = 0;
82
83 /* Prepare buffer to send. */
84 memcpy(buf, &header, sizeof(header));
85 if (data) {
86 memcpy(buf + sizeof(header), data, size);
87 }
88
00e71031 89 DBG3("Relayd sending command %s of size %" PRIu64,
28ab034a
JG
90 lttcomm_relayd_command_str(cmd),
91 buf_size);
6151a90f 92 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 93 if (ret < 0) {
00e71031 94 PERROR("Failed to send command %s of size %" PRIu64,
28ab034a
JG
95 lttcomm_relayd_command_str(cmd),
96 buf_size);
97 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
8994307f 98 ret = -errno;
00e2e675
DG
99 goto error;
100 }
00e2e675
DG
101error:
102 free(buf);
103alloc_error:
104 return ret;
105}
106
107/*
108 * Receive reply data on socket. This MUST be call after send_command or else
109 * could result in unexpected behavior(s).
110 */
6151a90f 111static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
112{
113 int ret;
114
f96e4545
MD
115 if (rsock->sock.fd < 0) {
116 return -ECONNRESET;
117 }
118
8fd623e0 119 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 120
6151a90f 121 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
122 if (ret <= 0 || ret != size) {
123 if (ret == 0) {
124 /* Orderly shutdown. */
6151a90f 125 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 126 } else {
8fd623e0 127 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
28ab034a
JG
128 rsock->sock.fd,
129 size,
130 ret);
20275fe8
DG
131 }
132 /* Always return -1 here and the caller can use errno. */
133 ret = -1;
00e2e675
DG
134 goto error;
135 }
136
137error:
138 return ret;
139}
140
d3e2ba59 141/*
6fa5fe7c
MD
142 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
143 * hostname, and base_path) have no length restriction on the sender side.
f86f6389
JR
144 * Length for both payloads is stored in the msg struct. A new dynamic size
145 * payload size is introduced.
146 */
147static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
148 const char *session_name,
149 const char *hostname,
150 const char *base_path,
151 int session_live_timer,
152 unsigned int snapshot,
153 uint64_t sessiond_session_id,
154 const lttng_uuid& sessiond_uuid,
155 const uint64_t *current_chunk_id,
156 time_t creation_time,
157 bool session_name_contains_creation_time,
158 struct lttcomm_relayd_create_session_reply_2_11 *reply,
159 char *output_path)
f86f6389
JR
160{
161 int ret;
cd9adb8b 162 struct lttcomm_relayd_create_session_2_11 *msg = nullptr;
f86f6389
JR
163 size_t session_name_len;
164 size_t hostname_len;
6fa5fe7c 165 size_t base_path_len;
f86f6389 166 size_t msg_length;
6fa5fe7c 167 char *dst;
f86f6389 168
46ef2188
MD
169 if (!base_path) {
170 base_path = "";
171 }
172 /* The three names are sent with a '\0' delimiter between them. */
f86f6389
JR
173 session_name_len = strlen(session_name) + 1;
174 hostname_len = strlen(hostname) + 1;
17e736a5 175 base_path_len = strlen(base_path) + 1;
f86f6389 176
6fa5fe7c 177 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
64803277 178 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
f86f6389
JR
179 if (!msg) {
180 PERROR("zmalloc create_session_2_11 command message");
181 ret = -1;
182 goto error;
183 }
184
a0377dfe 185 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
f86f6389
JR
186 msg->session_name_len = htobe32(session_name_len);
187
a0377dfe 188 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
f86f6389
JR
189 msg->hostname_len = htobe32(hostname_len);
190
a0377dfe 191 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
6fa5fe7c
MD
192 msg->base_path_len = htobe32(base_path_len);
193
194 dst = msg->names;
195 if (lttng_strncpy(dst, session_name, session_name_len)) {
196 ret = -1;
197 goto error;
198 }
199 dst += session_name_len;
200 if (lttng_strncpy(dst, hostname, hostname_len)) {
f86f6389
JR
201 ret = -1;
202 goto error;
203 }
6fa5fe7c 204 dst += hostname_len;
36f9f13b 205 if (lttng_strncpy(dst, base_path, base_path_len)) {
f86f6389
JR
206 ret = -1;
207 goto error;
208 }
209
210 msg->live_timer = htobe32(session_live_timer);
211 msg->snapshot = !!snapshot;
212
328c2fe7 213 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
658f12fa 214 msg->session_id = htobe64(sessiond_session_id);
46ef2188 215 msg->session_name_contains_creation_time = session_name_contains_creation_time;
f39bd140 216 if (current_chunk_id) {
28ab034a 217 LTTNG_OPTIONAL_SET(&msg->current_chunk_id, htobe64(*current_chunk_id));
f39bd140
JG
218 }
219
db1da059
JG
220 msg->creation_time = htobe64((uint64_t) creation_time);
221
f86f6389
JR
222 /* Send command */
223 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
224 if (ret < 0) {
225 goto error;
226 }
ecd1a12f
MD
227 /* Receive response */
228 ret = recv_reply(rsock, reply, sizeof(*reply));
229 if (ret < 0) {
230 goto error;
231 }
232 reply->generic.session_id = be64toh(reply->generic.session_id);
233 reply->generic.ret_code = be32toh(reply->generic.ret_code);
234 reply->output_path_length = be32toh(reply->output_path_length);
235 if (reply->output_path_length >= LTTNG_PATH_MAX) {
28ab034a
JG
236 ERR("Invalid session output path length in reply (%" PRIu32
237 " bytes) exceeds maximal allowed length (%d bytes)",
238 reply->output_path_length,
239 LTTNG_PATH_MAX);
ecd1a12f
MD
240 ret = -1;
241 goto error;
242 }
243 ret = recv_reply(rsock, output_path, reply->output_path_length);
244 if (ret < 0) {
245 goto error;
246 }
f86f6389
JR
247error:
248 free(msg);
249 return ret;
250}
251/*
252 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
253 * support the live reading capability.
254 */
255static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
256 const char *session_name,
257 const char *hostname,
258 int session_live_timer,
259 unsigned int snapshot,
260 struct lttcomm_relayd_status_session *reply)
d3e2ba59
JD
261{
262 int ret;
263 struct lttcomm_relayd_create_session_2_4 msg;
264
28ab034a 265 if (lttng_strncpy(msg.session_name, session_name, sizeof(msg.session_name))) {
246777db
MD
266 ret = -1;
267 goto error;
268 }
3a13ffd5 269 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
270 ret = -1;
271 goto error;
272 }
d3e2ba59 273 msg.live_timer = htobe32(session_live_timer);
7d2f7452 274 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
275
276 /* Send command */
277 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
278 if (ret < 0) {
279 goto error;
280 }
281
ecd1a12f
MD
282 /* Receive response */
283 ret = recv_reply(rsock, reply, sizeof(*reply));
284 if (ret < 0) {
285 goto error;
286 }
287 reply->session_id = be64toh(reply->session_id);
288 reply->ret_code = be32toh(reply->ret_code);
d3e2ba59
JD
289error:
290 return ret;
291}
292
293/*
294 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
295 */
ecd1a12f 296static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
28ab034a 297 struct lttcomm_relayd_status_session *reply)
d3e2ba59
JD
298{
299 int ret;
300
301 /* Send command */
cd9adb8b 302 ret = send_command(rsock, RELAYD_CREATE_SESSION, nullptr, 0, 0);
d3e2ba59
JD
303 if (ret < 0) {
304 goto error;
305 }
306
ecd1a12f
MD
307 /* Receive response */
308 ret = recv_reply(rsock, reply, sizeof(*reply));
309 if (ret < 0) {
310 goto error;
311 }
312 reply->session_id = be64toh(reply->session_id);
313 reply->ret_code = be32toh(reply->ret_code);
d3e2ba59
JD
314error:
315 return ret;
316}
317
c5b6f4f0
DG
318/*
319 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
320 * set session_id of the relayd if we have a successful reply from the relayd.
321 *
20275fe8
DG
322 * On success, return 0 else a negative value which is either an errno error or
323 * a lttng error code from the relayd.
c5b6f4f0 324 */
fb9a95c4 325int relayd_create_session(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
326 uint64_t *relayd_session_id,
327 const char *session_name,
328 const char *hostname,
329 const char *base_path,
330 int session_live_timer,
331 unsigned int snapshot,
332 uint64_t sessiond_session_id,
333 const lttng_uuid& sessiond_uuid,
334 const uint64_t *current_chunk_id,
335 time_t creation_time,
336 bool session_name_contains_creation_time,
337 char *output_path)
c5b6f4f0
DG
338{
339 int ret;
ecd1a12f 340 struct lttcomm_relayd_create_session_reply_2_11 reply = {};
c5b6f4f0 341
a0377dfe
FD
342 LTTNG_ASSERT(rsock);
343 LTTNG_ASSERT(relayd_session_id);
c5b6f4f0
DG
344
345 DBG("Relayd create session");
346
f86f6389
JR
347 if (rsock->minor < 4) {
348 /* From 2.1 to 2.3 */
ecd1a12f 349 ret = relayd_create_session_2_1(rsock, &reply.generic);
f86f6389
JR
350 } else if (rsock->minor >= 4 && rsock->minor < 11) {
351 /* From 2.4 to 2.10 */
28ab034a
JG
352 ret = relayd_create_session_2_4(
353 rsock, session_name, hostname, session_live_timer, snapshot, &reply.generic);
f86f6389
JR
354 } else {
355 /* From 2.11 to ... */
28ab034a
JG
356 ret = relayd_create_session_2_11(rsock,
357 session_name,
358 hostname,
359 base_path,
360 session_live_timer,
361 snapshot,
362 sessiond_session_id,
363 sessiond_uuid,
364 current_chunk_id,
365 creation_time,
366 session_name_contains_creation_time,
367 &reply,
368 output_path);
c5b6f4f0
DG
369 }
370
c5b6f4f0
DG
371 if (ret < 0) {
372 goto error;
373 }
374
c5b6f4f0 375 /* Return session id or negative ret code. */
ecd1a12f 376 if (reply.generic.ret_code != LTTNG_OK) {
bb63afd9 377 ret = -1;
28ab034a 378 ERR("Relayd create session replied error %d", reply.generic.ret_code);
c5b6f4f0
DG
379 goto error;
380 } else {
381 ret = 0;
ecd1a12f 382 *relayd_session_id = reply.generic.session_id;
c5b6f4f0
DG
383 }
384
ecd1a12f 385 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
c5b6f4f0
DG
386
387error:
388 return ret;
389}
390
2f21a469 391static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
392 const char *channel_name,
393 const char *pathname)
2f21a469
JR
394{
395 int ret;
396 struct lttcomm_relayd_add_stream msg;
397
398 memset(&msg, 0, sizeof(msg));
28ab034a 399 if (lttng_strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name))) {
2f21a469
JR
400 ret = -1;
401 goto error;
402 }
403
28ab034a 404 if (lttng_strncpy(msg.pathname, pathname, sizeof(msg.pathname))) {
2f21a469
JR
405 ret = -1;
406 goto error;
407 }
408
409 /* Send command */
410 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
411 if (ret < 0) {
412 ret = -1;
413 goto error;
414 }
415 ret = 0;
416error:
417 return ret;
418}
419
420static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
421 const char *channel_name,
422 const char *pathname,
423 uint64_t tracefile_size,
424 uint64_t tracefile_count)
2f21a469
JR
425{
426 int ret;
427 struct lttcomm_relayd_add_stream_2_2 msg;
428
429 memset(&msg, 0, sizeof(msg));
430 /* Compat with relayd 2.2 to 2.10 */
28ab034a 431 if (lttng_strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name))) {
2f21a469
JR
432 ret = -1;
433 goto error;
434 }
28ab034a 435 if (lttng_strncpy(msg.pathname, pathname, sizeof(msg.pathname))) {
2f21a469
JR
436 ret = -1;
437 goto error;
438 }
439 msg.tracefile_size = htobe64(tracefile_size);
440 msg.tracefile_count = htobe64(tracefile_count);
441
442 /* Send command */
443 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
444 if (ret < 0) {
445 goto error;
446 }
447 ret = 0;
448error:
449 return ret;
450}
451
452static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
453 const char *channel_name,
454 const char *pathname,
455 uint64_t tracefile_size,
456 uint64_t tracefile_count,
457 uint64_t trace_archive_id)
2f21a469
JR
458{
459 int ret;
cd9adb8b 460 struct lttcomm_relayd_add_stream_2_11 *msg = nullptr;
2f21a469
JR
461 size_t channel_name_len;
462 size_t pathname_len;
463 size_t msg_length;
464
465 /* The two names are sent with a '\0' delimiter between them. */
466 channel_name_len = strlen(channel_name) + 1;
467 pathname_len = strlen(pathname) + 1;
468
469 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
64803277 470 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
2f21a469
JR
471 if (!msg) {
472 PERROR("zmalloc add_stream_2_11 command message");
473 ret = -1;
474 goto error;
475 }
476
a0377dfe 477 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
2f21a469
JR
478 msg->channel_name_len = htobe32(channel_name_len);
479
a0377dfe 480 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
2f21a469
JR
481 msg->pathname_len = htobe32(pathname_len);
482
483 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
484 ret = -1;
485 goto error;
486 }
487 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
488 ret = -1;
489 goto error;
490 }
491
492 msg->tracefile_size = htobe64(tracefile_size);
493 msg->tracefile_count = htobe64(tracefile_count);
348a81dc 494 msg->trace_chunk_id = htobe64(trace_archive_id);
2f21a469
JR
495
496 /* Send command */
497 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
498 if (ret < 0) {
499 goto error;
500 }
501 ret = 0;
502error:
503 free(msg);
504 return ret;
505}
506
00e2e675
DG
507/*
508 * Add stream on the relayd and assign stream handle to the stream_id argument.
509 *
070b6a86
MD
510 * Chunks are not supported by relayd prior to 2.11, but are used to
511 * internally between session daemon and consumer daemon to keep track
512 * of the channel and stream output path.
513 *
00e2e675
DG
514 * On success return 0 else return ret_code negative value.
515 */
28ab034a
JG
516int relayd_add_stream(struct lttcomm_relayd_sock *rsock,
517 const char *channel_name,
518 const char *domain_name,
519 const char *_pathname,
520 uint64_t *stream_id,
521 uint64_t tracefile_size,
522 uint64_t tracefile_count,
523 struct lttng_trace_chunk *trace_chunk)
00e2e675
DG
524{
525 int ret;
00e2e675 526 struct lttcomm_relayd_status_stream reply;
5da88b0f 527 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
00e2e675
DG
528
529 /* Code flow error. Safety net. */
a0377dfe
FD
530 LTTNG_ASSERT(rsock);
531 LTTNG_ASSERT(channel_name);
532 LTTNG_ASSERT(domain_name);
533 LTTNG_ASSERT(_pathname);
534 LTTNG_ASSERT(trace_chunk);
00e2e675
DG
535
536 DBG("Relayd adding stream for channel name %s", channel_name);
537
0f907de1
JD
538 /* Compat with relayd 2.1 */
539 if (rsock->minor == 1) {
2f21a469 540 /* For 2.1 */
5fab2976 541 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
55caead7 542
2f21a469
JR
543 } else if (rsock->minor > 1 && rsock->minor < 11) {
544 /* From 2.2 to 2.10 */
28ab034a
JG
545 ret = relayd_add_stream_2_2(
546 rsock, channel_name, _pathname, tracefile_size, tracefile_count);
0f907de1 547 } else {
5fab2976 548 const char *separator;
d2956687
JG
549 enum lttng_trace_chunk_status chunk_status;
550 uint64_t chunk_id;
551
5fab2976
JR
552 if (_pathname[0] == '\0') {
553 separator = "";
554 } else {
555 separator = "/";
556 }
557
28ab034a
JG
558 ret = snprintf(pathname,
559 RELAYD_COMM_LTTNG_PATH_MAX,
560 "%s%s%s",
561 domain_name,
562 separator,
563 _pathname);
5fab2976
JR
564 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
565 ERR("Failed to format stream path: %s",
28ab034a 566 ret <= 0 ? "formatting error" : "path exceeds maximal allowed length");
5fab2976
JR
567 ret = -1;
568 goto error;
569 }
570
28ab034a 571 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
a0377dfe 572 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 573
2f21a469 574 /* From 2.11 to ...*/
28ab034a
JG
575 ret = relayd_add_stream_2_11(
576 rsock, channel_name, pathname, tracefile_size, tracefile_count, chunk_id);
2f21a469 577 }
0f907de1 578
2f21a469
JR
579 if (ret) {
580 ret = -1;
581 goto error;
00e2e675
DG
582 }
583
633d0084 584 /* Waiting for reply */
6151a90f 585 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
586 if (ret < 0) {
587 goto error;
588 }
589
590 /* Back to host bytes order. */
591 reply.handle = be64toh(reply.handle);
592 reply.ret_code = be32toh(reply.ret_code);
593
594 /* Return session id or negative ret code. */
f73fabfd 595 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
596 ret = -1;
597 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
598 } else {
599 /* Success */
600 ret = 0;
601 *stream_id = reply.handle;
602 }
603
28ab034a 604 DBG("Relayd stream added successfully with handle %" PRIu64, reply.handle);
00e2e675
DG
605
606error:
607 return ret;
608}
609
a4baae1b
JD
610/*
611 * Inform the relay that all the streams for the current channel has been sent.
612 *
613 * On success return 0 else return ret_code negative value.
614 */
615int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
616{
617 int ret;
618 struct lttcomm_relayd_generic_reply reply;
619
620 /* Code flow error. Safety net. */
a0377dfe 621 LTTNG_ASSERT(rsock);
a4baae1b
JD
622
623 DBG("Relayd sending streams sent.");
624
625 /* This feature was introduced in 2.4, ignore it for earlier versions. */
626 if (rsock->minor < 4) {
627 ret = 0;
628 goto end;
629 }
630
631 /* Send command */
cd9adb8b 632 ret = send_command(rsock, RELAYD_STREAMS_SENT, nullptr, 0, 0);
a4baae1b
JD
633 if (ret < 0) {
634 goto error;
635 }
636
637 /* Waiting for reply */
638 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
639 if (ret < 0) {
640 goto error;
641 }
642
643 /* Back to host bytes order. */
644 reply.ret_code = be32toh(reply.ret_code);
645
646 /* Return session id or negative ret code. */
647 if (reply.ret_code != LTTNG_OK) {
648 ret = -1;
649 ERR("Relayd streams sent replied error %d", reply.ret_code);
650 goto error;
651 } else {
652 /* Success */
653 ret = 0;
654 }
655
656 DBG("Relayd streams sent success");
657
658error:
659end:
660 return ret;
661}
662
00e2e675
DG
663/*
664 * Check version numbers on the relayd.
d4519fa3
JD
665 * If major versions are compatible, we assign minor_to_use to the
666 * minor version of the procotol we are going to use for this session.
00e2e675 667 *
67d5aa28
JD
668 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
669 * otherwise, or a negative value on network errors.
00e2e675 670 */
6151a90f 671int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
672{
673 int ret;
092b6259 674 struct lttcomm_relayd_version msg;
00e2e675
DG
675
676 /* Code flow error. Safety net. */
a0377dfe 677 LTTNG_ASSERT(rsock);
00e2e675 678
28ab034a 679 DBG("Relayd version check for major.minor %u.%u", rsock->major, rsock->minor);
00e2e675 680
53efb85a 681 memset(&msg, 0, sizeof(msg));
092b6259 682 /* Prepare network byte order before transmission. */
6151a90f
JD
683 msg.major = htobe32(rsock->major);
684 msg.minor = htobe32(rsock->minor);
092b6259 685
00e2e675 686 /* Send command */
6151a90f 687 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
688 if (ret < 0) {
689 goto error;
690 }
691
20275fe8 692 /* Receive response */
6151a90f 693 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
694 if (ret < 0) {
695 goto error;
696 }
697
698 /* Set back to host bytes order */
092b6259
DG
699 msg.major = be32toh(msg.major);
700 msg.minor = be32toh(msg.minor);
701
702 /*
703 * Only validate the major version. If the other side is higher,
704 * communication is not possible. Only major version equal can talk to each
705 * other. If the minor version differs, the lowest version is used by both
706 * sides.
092b6259 707 */
6151a90f 708 if (msg.major != rsock->major) {
d4519fa3 709 /* Not compatible */
67d5aa28 710 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 711 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
28ab034a
JG
712 msg.major,
713 rsock->major);
092b6259 714 goto error;
00e2e675
DG
715 }
716
092b6259 717 /*
6151a90f
JD
718 * If the relayd's minor version is higher, it will adapt to our version so
719 * we can continue to use the latest relayd communication data structure.
720 * If the received minor version is higher, the relayd should adapt to us.
092b6259 721 */
6151a90f
JD
722 if (rsock->minor > msg.minor) {
723 rsock->minor = msg.minor;
d4519fa3 724 }
092b6259 725
d4519fa3
JD
726 /* Version number compatible */
727 DBG2("Relayd version is compatible, using protocol version %u.%u",
28ab034a
JG
728 rsock->major,
729 rsock->minor);
d4519fa3 730 ret = 0;
00e2e675
DG
731
732error:
733 return ret;
734}
735
00e2e675
DG
736/*
737 * Add stream on the relayd and assign stream handle to the stream_id argument.
738 *
739 * On success return 0 else return ret_code negative value.
740 */
6151a90f 741int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
742{
743 int ret;
744
745 /* Code flow error. Safety net. */
a0377dfe 746 LTTNG_ASSERT(rsock);
00e2e675 747
77c7c900 748 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
749
750 /* Send command */
cd9adb8b 751 ret = send_command(rsock, RELAYD_SEND_METADATA, nullptr, len, 0);
00e2e675
DG
752 if (ret < 0) {
753 goto error;
754 }
755
756 DBG2("Relayd metadata added successfully");
757
758 /*
759 * After that call, the metadata data MUST be sent to the relayd so the
760 * receive size on the other end matches the len of the metadata packet
633d0084 761 * header. This is why we don't wait for a reply here.
00e2e675
DG
762 */
763
764error:
765 return ret;
766}
767
768/*
6151a90f 769 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 770 */
6151a90f 771int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
772{
773 /* Code flow error. Safety net. */
a0377dfe 774 LTTNG_ASSERT(rsock);
00e2e675 775
f96e4545
MD
776 if (!rsock->sock.ops) {
777 /*
778 * Attempting a connect on a non-initialized socket.
779 */
780 return -ECONNRESET;
781 }
782
00e2e675
DG
783 DBG3("Relayd connect ...");
784
6151a90f 785 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
786}
787
788/*
6151a90f 789 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
790 *
791 * If no socket operations are found, simply return 0 meaning that everything
792 * is fine. Without operations, the socket can not possibly be opened or used.
793 * This is possible if the socket was allocated but not created. However, the
794 * caller could simply use it to store a valid file descriptor for instance
795 * passed over a Unix socket and call this to cleanup but still without a valid
796 * ops pointer.
797 *
798 * Return the close returned value. On error, a negative value is usually
799 * returned back from close(2).
00e2e675 800 */
6151a90f 801int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 802{
ffe60014
DG
803 int ret;
804
00e2e675 805 /* Code flow error. Safety net. */
a0377dfe 806 LTTNG_ASSERT(rsock);
00e2e675 807
ffe60014 808 /* An invalid fd is fine, return success. */
6151a90f 809 if (rsock->sock.fd < 0) {
ffe60014
DG
810 ret = 0;
811 goto end;
812 }
813
6151a90f 814 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 815
6151a90f
JD
816 if (rsock->sock.ops) {
817 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
818 } else {
819 /* Default call if no specific ops found. */
6151a90f 820 ret = close(rsock->sock.fd);
ffe60014
DG
821 if (ret < 0) {
822 PERROR("relayd_close default close");
823 }
824 }
f96e4545 825 rsock->sock.fd = -1;
ffe60014
DG
826
827end:
828 return ret;
00e2e675
DG
829}
830
831/*
832 * Send data header structure to the relayd.
833 */
6151a90f 834int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
835 struct lttcomm_relayd_data_hdr *hdr,
836 size_t size)
00e2e675
DG
837{
838 int ret;
839
840 /* Code flow error. Safety net. */
a0377dfe
FD
841 LTTNG_ASSERT(rsock);
842 LTTNG_ASSERT(hdr);
00e2e675 843
f96e4545
MD
844 if (rsock->sock.fd < 0) {
845 return -ECONNRESET;
846 }
847
8fd623e0 848 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
849
850 /* Again, safety net */
851 if (size == 0) {
852 size = sizeof(struct lttcomm_relayd_data_hdr);
853 }
854
855 /* Only send data header. */
6151a90f 856 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 857 if (ret < 0) {
8994307f 858 ret = -errno;
00e2e675
DG
859 goto error;
860 }
861
862 /*
863 * The data MUST be sent right after that command for the receive on the
864 * other end to match the size in the header.
865 */
866
867error:
868 return ret;
869}
173af62f
DG
870
871/*
872 * Send close stream command to the relayd.
873 */
28ab034a
JG
874int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock,
875 uint64_t stream_id,
876 uint64_t last_net_seq_num)
173af62f
DG
877{
878 int ret;
879 struct lttcomm_relayd_close_stream msg;
880 struct lttcomm_relayd_generic_reply reply;
881
882 /* Code flow error. Safety net. */
a0377dfe 883 LTTNG_ASSERT(rsock);
173af62f 884
77c7c900 885 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 886
53efb85a 887 memset(&msg, 0, sizeof(msg));
173af62f
DG
888 msg.stream_id = htobe64(stream_id);
889 msg.last_net_seq_num = htobe64(last_net_seq_num);
890
891 /* Send command */
6151a90f 892 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
893 if (ret < 0) {
894 goto error;
895 }
896
20275fe8 897 /* Receive response */
6151a90f 898 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
899 if (ret < 0) {
900 goto error;
901 }
902
903 reply.ret_code = be32toh(reply.ret_code);
904
905 /* Return session id or negative ret code. */
f73fabfd 906 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
907 ret = -1;
908 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
909 } else {
910 /* Success */
911 ret = 0;
912 }
913
77c7c900 914 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
915
916error:
917 return ret;
918}
c8f59ee5
DG
919
920/*
921 * Check for data availability for a given stream id.
922 *
6d805429 923 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 924 */
28ab034a
JG
925int relayd_data_pending(struct lttcomm_relayd_sock *rsock,
926 uint64_t stream_id,
927 uint64_t last_net_seq_num)
c8f59ee5
DG
928{
929 int ret;
6d805429 930 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
931 struct lttcomm_relayd_generic_reply reply;
932
933 /* Code flow error. Safety net. */
a0377dfe 934 LTTNG_ASSERT(rsock);
c8f59ee5 935
6d805429 936 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 937
53efb85a 938 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
939 msg.stream_id = htobe64(stream_id);
940 msg.last_net_seq_num = htobe64(last_net_seq_num);
941
942 /* Send command */
28ab034a 943 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg, sizeof(msg), 0);
c8f59ee5
DG
944 if (ret < 0) {
945 goto error;
946 }
947
20275fe8 948 /* Receive response */
6151a90f 949 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
950 if (ret < 0) {
951 goto error;
952 }
953
954 reply.ret_code = be32toh(reply.ret_code);
955
956 /* Return session id or negative ret code. */
957 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 958 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
959 }
960
961 /* At this point, the ret code is either 1 or 0 */
962 ret = reply.ret_code;
963
28ab034a 964 DBG("Relayd data is %s pending for stream id %" PRIu64, ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
965
966error:
967 return ret;
968}
969
970/*
971 * Check on the relayd side for a quiescent state on the control socket.
972 */
28ab034a 973int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock, uint64_t metadata_stream_id)
c8f59ee5
DG
974{
975 int ret;
ad7051c0 976 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
977 struct lttcomm_relayd_generic_reply reply;
978
979 /* Code flow error. Safety net. */
a0377dfe 980 LTTNG_ASSERT(rsock);
c8f59ee5
DG
981
982 DBG("Relayd checking quiescent control state");
983
53efb85a 984 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
985 msg.stream_id = htobe64(metadata_stream_id);
986
c8f59ee5 987 /* Send command */
6151a90f 988 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
989 if (ret < 0) {
990 goto error;
991 }
992
20275fe8 993 /* Receive response */
6151a90f 994 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
995 if (ret < 0) {
996 goto error;
997 }
998
999 reply.ret_code = be32toh(reply.ret_code);
1000
1001 /* Return session id or negative ret code. */
1002 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
1003 ret = -1;
1004 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
1005 goto error;
1006 }
1007
1008 /* Control socket is quiescent */
6d805429 1009 return 0;
c8f59ee5
DG
1010
1011error:
1012 return ret;
1013}
f7079f67
DG
1014
1015/*
1016 * Begin a data pending command for a specific session id.
1017 */
6151a90f 1018int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
1019{
1020 int ret;
1021 struct lttcomm_relayd_begin_data_pending msg;
1022 struct lttcomm_relayd_generic_reply reply;
1023
1024 /* Code flow error. Safety net. */
a0377dfe 1025 LTTNG_ASSERT(rsock);
f7079f67
DG
1026
1027 DBG("Relayd begin data pending");
1028
53efb85a 1029 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1030 msg.session_id = htobe64(id);
1031
1032 /* Send command */
6151a90f 1033 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1034 if (ret < 0) {
1035 goto error;
1036 }
1037
20275fe8 1038 /* Receive response */
6151a90f 1039 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1040 if (ret < 0) {
1041 goto error;
1042 }
1043
1044 reply.ret_code = be32toh(reply.ret_code);
1045
1046 /* Return session id or negative ret code. */
1047 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
1048 ret = -1;
1049 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
1050 goto error;
1051 }
1052
1053 return 0;
1054
1055error:
1056 return ret;
1057}
1058
1059/*
1060 * End a data pending command for a specific session id.
1061 *
1062 * Return 0 on success and set is_data_inflight to 0 if no data is being
1063 * streamed or 1 if it is the case.
1064 */
28ab034a
JG
1065int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock,
1066 uint64_t id,
1067 unsigned int *is_data_inflight)
f7079f67 1068{
af6c30b5 1069 int ret, recv_ret;
f7079f67
DG
1070 struct lttcomm_relayd_end_data_pending msg;
1071 struct lttcomm_relayd_generic_reply reply;
1072
1073 /* Code flow error. Safety net. */
a0377dfe 1074 LTTNG_ASSERT(rsock);
f7079f67
DG
1075
1076 DBG("Relayd end data pending");
1077
53efb85a 1078 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1079 msg.session_id = htobe64(id);
1080
1081 /* Send command */
6151a90f 1082 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1083 if (ret < 0) {
1084 goto error;
1085 }
1086
20275fe8 1087 /* Receive response */
6151a90f 1088 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1089 if (ret < 0) {
1090 goto error;
1091 }
1092
af6c30b5
DG
1093 recv_ret = be32toh(reply.ret_code);
1094 if (recv_ret < 0) {
1095 ret = recv_ret;
f7079f67
DG
1096 goto error;
1097 }
1098
af6c30b5 1099 *is_data_inflight = recv_ret;
f7079f67 1100
af6c30b5 1101 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
1102
1103 return 0;
1104
1105error:
1106 return ret;
1107}
1c20f0e2
JD
1108
1109/*
1110 * Send index to the relayd.
1111 */
1112int relayd_send_index(struct lttcomm_relayd_sock *rsock,
28ab034a
JG
1113 struct ctf_packet_index *index,
1114 uint64_t relay_stream_id,
1115 uint64_t net_seq_num)
1c20f0e2
JD
1116{
1117 int ret;
1118 struct lttcomm_relayd_index msg;
1119 struct lttcomm_relayd_generic_reply reply;
1120
1121 /* Code flow error. Safety net. */
a0377dfe 1122 LTTNG_ASSERT(rsock);
1c20f0e2
JD
1123
1124 if (rsock->minor < 4) {
1125 DBG("Not sending indexes before protocol 2.4");
1126 ret = 0;
1127 goto error;
1128 }
1129
1130 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1131
53efb85a 1132 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
1133 msg.relay_stream_id = htobe64(relay_stream_id);
1134 msg.net_seq_num = htobe64(net_seq_num);
1135
1136 /* The index is already in big endian. */
1137 msg.packet_size = index->packet_size;
1138 msg.content_size = index->content_size;
1139 msg.timestamp_begin = index->timestamp_begin;
1140 msg.timestamp_end = index->timestamp_end;
1141 msg.events_discarded = index->events_discarded;
1142 msg.stream_id = index->stream_id;
1143
234cd636
JD
1144 if (rsock->minor >= 8) {
1145 msg.stream_instance_id = index->stream_instance_id;
1146 msg.packet_seq_num = index->packet_seq_num;
1147 }
1148
1c20f0e2 1149 /* Send command */
28ab034a
JG
1150 ret = send_command(
1151 rsock,
1152 RELAYD_SEND_INDEX,
1153 &msg,
1154 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major, rsock->minor),
1155 lttng_to_index_minor(rsock->major, rsock->minor)),
1156 0);
1c20f0e2
JD
1157 if (ret < 0) {
1158 goto error;
1159 }
1160
1161 /* Receive response */
1162 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1163 if (ret < 0) {
1164 goto error;
1165 }
1166
1167 reply.ret_code = be32toh(reply.ret_code);
1168
1169 /* Return session id or negative ret code. */
1170 if (reply.ret_code != LTTNG_OK) {
1171 ret = -1;
1172 ERR("Relayd send index replied error %d", reply.ret_code);
1173 } else {
1174 /* Success */
1175 ret = 0;
1176 }
1177
1178error:
1179 return ret;
1180}
93ec662e
JD
1181
1182/*
1183 * Ask the relay to reset the metadata trace file (regeneration).
1184 */
28ab034a 1185int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version)
93ec662e
JD
1186{
1187 int ret;
1188 struct lttcomm_relayd_reset_metadata msg;
1189 struct lttcomm_relayd_generic_reply reply;
1190
1191 /* Code flow error. Safety net. */
a0377dfe 1192 LTTNG_ASSERT(rsock);
93ec662e
JD
1193
1194 /* Should have been prevented by the sessiond. */
1195 if (rsock->minor < 8) {
1196 ERR("Metadata regeneration unsupported before 2.8");
1197 ret = -1;
1198 goto error;
1199 }
1200
1201 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1202
1203 memset(&msg, 0, sizeof(msg));
1204 msg.stream_id = htobe64(stream_id);
1205 msg.version = htobe64(version);
1206
1207 /* Send command */
1208 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1209 if (ret < 0) {
1210 goto error;
1211 }
1212
1213 /* Receive response */
1214 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1215 if (ret < 0) {
1216 goto error;
1217 }
1218
1219 reply.ret_code = be32toh(reply.ret_code);
1220
1221 /* Return session id or negative ret code. */
1222 if (reply.ret_code != LTTNG_OK) {
1223 ret = -1;
1224 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1225 } else {
1226 /* Success */
1227 ret = 0;
1228 }
1229
1230 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1231
1232error:
1233 return ret;
1234}
a1ae2ea5 1235
c35f9726 1236int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
28ab034a
JG
1237 unsigned int stream_count,
1238 const uint64_t *new_chunk_id,
1239 const struct relayd_stream_rotation_position *positions)
d73bf3d7
JD
1240{
1241 int ret;
c35f9726
JG
1242 unsigned int i;
1243 struct lttng_dynamic_buffer payload;
1244 struct lttcomm_relayd_generic_reply reply = {};
6d15ee45 1245 struct lttcomm_relayd_rotate_streams msg;
c35f9726
JG
1246 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1247 const char *new_chunk_id_str;
d73bf3d7 1248
6d15ee45
JG
1249 msg.stream_count = htobe32((uint32_t) stream_count);
1250 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1251 .is_set = !!new_chunk_id,
1252 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1253 };
1254
070b6a86
MD
1255 if (!relayd_supports_chunks(sock)) {
1256 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1257 return 0;
1258 }
1259
c35f9726 1260 lttng_dynamic_buffer_init(&payload);
d73bf3d7 1261
c35f9726 1262 /* Code flow error. Safety net. */
a0377dfe 1263 LTTNG_ASSERT(sock);
d73bf3d7 1264
c35f9726 1265 if (new_chunk_id) {
28ab034a
JG
1266 ret = snprintf(
1267 new_chunk_id_buf, sizeof(new_chunk_id_buf), "%" PRIu64, *new_chunk_id);
c35f9726
JG
1268 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1269 new_chunk_id_str = "formatting error";
1270 } else {
1271 new_chunk_id_str = new_chunk_id_buf;
1272 }
1273 } else {
1274 new_chunk_id_str = "none";
d73bf3d7
JD
1275 }
1276
c35f9726 1277 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
28ab034a
JG
1278 new_chunk_id_str,
1279 stream_count);
d73bf3d7 1280
c35f9726
JG
1281 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1282 if (ret) {
1283 ERR("Failed to allocate \"rotate streams\" command payload");
d73bf3d7
JD
1284 goto error;
1285 }
1286
c35f9726 1287 for (i = 0; i < stream_count; i++) {
28ab034a 1288 const struct relayd_stream_rotation_position *position = &positions[i];
c35f9726
JG
1289 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1290 .stream_id = htobe64(position->stream_id),
28ab034a 1291 .rotate_at_seq_num = htobe64(position->rotate_at_seq_num),
c35f9726
JG
1292 };
1293
a269202e 1294 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
28ab034a
JG
1295 position->stream_id,
1296 position->rotate_at_seq_num);
1297 ret = lttng_dynamic_buffer_append(&payload, &comm_position, sizeof(comm_position));
c35f9726
JG
1298 if (ret) {
1299 ERR("Failed to allocate \"rotate streams\" command payload");
1300 goto error;
1301 }
1302 }
d73bf3d7
JD
1303
1304 /* Send command. */
28ab034a 1305 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data, payload.size, 0);
d73bf3d7 1306 if (ret < 0) {
c35f9726 1307 ERR("Failed to send \"rotate stream\" command");
d73bf3d7
JD
1308 goto error;
1309 }
1310
1311 /* Receive response. */
c35f9726 1312 ret = recv_reply(sock, &reply, sizeof(reply));
d73bf3d7 1313 if (ret < 0) {
c35f9726 1314 ERR("Failed to receive \"rotate streams\" command reply");
d73bf3d7
JD
1315 goto error;
1316 }
1317
1318 reply.ret_code = be32toh(reply.ret_code);
d73bf3d7
JD
1319 if (reply.ret_code != LTTNG_OK) {
1320 ret = -1;
c35f9726 1321 ERR("Relayd rotate streams replied error %d", reply.ret_code);
d73bf3d7
JD
1322 } else {
1323 /* Success. */
1324 ret = 0;
c35f9726 1325 DBG("Relayd rotated streams successfully");
d73bf3d7
JD
1326 }
1327
1328error:
c35f9726 1329 lttng_dynamic_buffer_reset(&payload);
d73bf3d7
JD
1330 return ret;
1331}
e5add6d0 1332
28ab034a 1333int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, struct lttng_trace_chunk *chunk)
e5add6d0
JG
1334{
1335 int ret = 0;
1336 enum lttng_trace_chunk_status status;
1337 struct lttcomm_relayd_create_trace_chunk msg = {};
1338 struct lttcomm_relayd_generic_reply reply = {};
1339 struct lttng_dynamic_buffer payload;
1340 uint64_t chunk_id;
1341 time_t creation_timestamp;
1342 const char *chunk_name;
1343 size_t chunk_name_length;
913a542b 1344 bool overridden_name;
e5add6d0
JG
1345
1346 lttng_dynamic_buffer_init(&payload);
1347
070b6a86
MD
1348 if (!relayd_supports_chunks(sock)) {
1349 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1350 goto end;
1351 }
1352
e5add6d0
JG
1353 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1354 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1355 ret = -1;
1356 goto end;
1357 }
1358
28ab034a 1359 status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp);
e5add6d0
JG
1360 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1361 ret = -1;
1362 goto end;
1363 }
1364
28ab034a
JG
1365 status = lttng_trace_chunk_get_name(chunk, &chunk_name, &overridden_name);
1366 if (status != LTTNG_TRACE_CHUNK_STATUS_OK && status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
e5add6d0
JG
1367 ret = -1;
1368 goto end;
1369 }
1370
913a542b 1371 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
c15e2d3d
JG
1372 msg.chunk_id = htobe64(chunk_id);
1373 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1374 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
e5add6d0
JG
1375
1376 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1377 if (ret) {
1378 goto end;
1379 }
1380 if (chunk_name_length) {
28ab034a 1381 ret = lttng_dynamic_buffer_append(&payload, chunk_name, chunk_name_length);
e5add6d0
JG
1382 if (ret) {
1383 goto end;
1384 }
1385 }
1386
28ab034a 1387 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data, payload.size, 0);
e5add6d0
JG
1388 if (ret < 0) {
1389 ERR("Failed to send trace chunk creation command to relay daemon");
1390 goto end;
1391 }
1392
1393 ret = recv_reply(sock, &reply, sizeof(reply));
1394 if (ret < 0) {
1395 ERR("Failed to receive relay daemon trace chunk creation command reply");
1396 goto end;
1397 }
1398
1399 reply.ret_code = be32toh(reply.ret_code);
1400 if (reply.ret_code != LTTNG_OK) {
1401 ret = -1;
28ab034a 1402 ERR("Relayd trace chunk create replied error %d", reply.ret_code);
e5add6d0
JG
1403 } else {
1404 ret = 0;
28ab034a 1405 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64, chunk_id);
e5add6d0
JG
1406 }
1407
1408end:
1409 lttng_dynamic_buffer_reset(&payload);
1410 return ret;
1411}
bbc4768c
JG
1412
1413int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
28ab034a
JG
1414 struct lttng_trace_chunk *chunk,
1415 char *path)
bbc4768c
JG
1416{
1417 int ret = 0;
1418 enum lttng_trace_chunk_status status;
1419 struct lttcomm_relayd_close_trace_chunk msg = {};
ecd1a12f 1420 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
bbc4768c
JG
1421 uint64_t chunk_id;
1422 time_t close_timestamp;
1423 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1424
070b6a86
MD
1425 if (!relayd_supports_chunks(sock)) {
1426 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1427 goto end;
1428 }
1429
bbc4768c
JG
1430 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1431 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1432 ERR("Failed to get trace chunk id");
1433 ret = -1;
1434 goto end;
1435 }
1436
1437 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1438 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1439 ERR("Failed to get trace chunk close timestamp");
1440 ret = -1;
1441 goto end;
1442 }
1443
28ab034a 1444 status = lttng_trace_chunk_get_close_command(chunk, &close_command.value);
bbc4768c
JG
1445 switch (status) {
1446 case LTTNG_TRACE_CHUNK_STATUS_OK:
1447 close_command.is_set = 1;
1448 break;
1449 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1450 break;
1451 default:
1452 ERR("Failed to get trace chunk close command");
1453 ret = -1;
1454 goto end;
1455 }
1456
1457 msg = (typeof(msg)){
1458 .chunk_id = htobe64(chunk_id),
1459 .close_timestamp = htobe64((uint64_t) close_timestamp),
1460 .close_command = {
bbc4768c 1461 .is_set = close_command.is_set,
55caead7 1462 .value = htobe32((uint32_t) close_command.value),
bbc4768c
JG
1463 },
1464 };
1465
28ab034a 1466 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg), 0);
bbc4768c
JG
1467 if (ret < 0) {
1468 ERR("Failed to send trace chunk close command to relay daemon");
1469 goto end;
1470 }
1471
1472 ret = recv_reply(sock, &reply, sizeof(reply));
1473 if (ret < 0) {
1474 ERR("Failed to receive relay daemon trace chunk close command reply");
1475 goto end;
1476 }
1477
ecd1a12f
MD
1478 reply.path_length = be32toh(reply.path_length);
1479 if (reply.path_length >= LTTNG_PATH_MAX) {
1480 ERR("Chunk path too long");
1481 ret = -1;
1482 goto end;
1483 }
1484
1485 ret = recv_reply(sock, path, reply.path_length);
1486 if (ret < 0) {
1487 ERR("Failed to receive relay daemon trace chunk close command reply");
1488 goto end;
1489 }
1490 if (path[reply.path_length - 1] != '\0') {
1491 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1492 ret = -1;
1493 goto end;
1494 }
1495
1496 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1497 if (reply.generic.ret_code != LTTNG_OK) {
bbc4768c 1498 ret = -1;
28ab034a 1499 ERR("Relayd trace chunk close replied error %d", reply.generic.ret_code);
bbc4768c
JG
1500 } else {
1501 ret = 0;
28ab034a 1502 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64, chunk_id);
bbc4768c
JG
1503 }
1504end:
1505 return ret;
1506}
c35f9726
JG
1507
1508int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
28ab034a
JG
1509 uint64_t chunk_id,
1510 bool *chunk_exists)
c35f9726
JG
1511{
1512 int ret = 0;
1513 struct lttcomm_relayd_trace_chunk_exists msg = {};
1514 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1515
070b6a86
MD
1516 if (!relayd_supports_chunks(sock)) {
1517 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
caa15afd
JR
1518 /* The chunk will never exist */
1519 *chunk_exists = false;
070b6a86
MD
1520 goto end;
1521 }
1522
c35f9726 1523 msg = (typeof(msg)){
28ab034a 1524 .chunk_id = htobe64(chunk_id),
c35f9726
JG
1525 };
1526
28ab034a 1527 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg), 0);
c35f9726
JG
1528 if (ret < 0) {
1529 ERR("Failed to send trace chunk exists command to relay daemon");
1530 goto end;
1531 }
1532
1533 ret = recv_reply(sock, &reply, sizeof(reply));
1534 if (ret < 0) {
1535 ERR("Failed to receive relay daemon trace chunk close command reply");
1536 goto end;
1537 }
1538
1539 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1540 if (reply.generic.ret_code != LTTNG_OK) {
1541 ret = -1;
28ab034a 1542 ERR("Relayd trace chunk close replied error %d", reply.generic.ret_code);
c35f9726
JG
1543 } else {
1544 ret = 0;
1545 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
28ab034a
JG
1546 ", exists = %s",
1547 chunk_id,
1548 reply.trace_chunk_exists ? "true" : "false");
c35f9726
JG
1549 *chunk_exists = !!reply.trace_chunk_exists;
1550 }
1551end:
1552 return ret;
1553}
8614e600
MD
1554
1555int relayd_get_configuration(struct lttcomm_relayd_sock *sock,
28ab034a
JG
1556 uint64_t query_flags,
1557 uint64_t *result_flags)
8614e600
MD
1558{
1559 int ret = 0;
28ab034a 1560 struct lttcomm_relayd_get_configuration msg = (typeof(msg)){
8614e600
MD
1561 .query_flags = htobe64(query_flags),
1562 };
1563 struct lttcomm_relayd_get_configuration_reply reply = {};
1564
1565 if (!relayd_supports_get_configuration(sock)) {
1566 DBG("Refusing to get relayd configuration (unsupported by relayd)");
1567 if (query_flags) {
1568 ret = -1;
1569 goto end;
1570 }
1571 *result_flags = 0;
1572 goto end;
1573 }
1574
28ab034a 1575 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
8614e600
MD
1576 if (ret < 0) {
1577 ERR("Failed to send get configuration command to relay daemon");
1578 goto end;
1579 }
1580
1581 ret = recv_reply(sock, &reply, sizeof(reply));
1582 if (ret < 0) {
1583 ERR("Failed to receive relay daemon get configuration command reply");
1584 goto end;
1585 }
1586
1587 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1588 if (reply.generic.ret_code != LTTNG_OK) {
1589 ret = -1;
28ab034a 1590 ERR("Relayd get configuration replied error %d", reply.generic.ret_code);
8614e600 1591 } else {
28ab034a 1592 reply.relayd_configuration_flags = be64toh(reply.relayd_configuration_flags);
8614e600
MD
1593 ret = 0;
1594 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
28ab034a
JG
1595 ", results_flags = %" PRIu64,
1596 query_flags,
1597 reply.relayd_configuration_flags);
8614e600
MD
1598 *result_flags = reply.relayd_configuration_flags;
1599 }
1600end:
1601 return ret;
1602}
This page took 0.147225 seconds and 4 git commands to generate.