Clean-up: consumer.hpp: coding style indentation fix
[lttng-tools.git] / src / common / relayd / relayd.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include "relayd.hpp"
10
11 #include <common/common.hpp>
12 #include <common/compat/endian.hpp>
13 #include <common/compat/string.hpp>
14 #include <common/defaults.hpp>
15 #include <common/index/ctf-index.hpp>
16 #include <common/sessiond-comm/relayd.hpp>
17 #include <common/string-utils/format.hpp>
18 #include <common/trace-chunk.hpp>
19
20 #include <inttypes.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/stat.h>
25
26 static bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
27 {
28 if (sock->major > 2) {
29 return true;
30 } else if (sock->major == 2 && sock->minor >= 11) {
31 return true;
32 }
33 return false;
34 }
35
36 static bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
37 {
38 if (sock->major > 2) {
39 return true;
40 } else if (sock->major == 2 && sock->minor >= 12) {
41 return true;
42 }
43 return false;
44 }
45
46 /*
47 * Send command. Fill up the header and append the data.
48 */
49 static int send_command(struct lttcomm_relayd_sock *rsock,
50 enum lttcomm_relayd_command cmd,
51 const void *data,
52 size_t size,
53 int flags)
54 {
55 int ret;
56 struct lttcomm_relayd_hdr header;
57 char *buf;
58 uint64_t buf_size = sizeof(header);
59
60 if (rsock->sock.fd < 0) {
61 return -ECONNRESET;
62 }
63
64 if (data) {
65 buf_size += size;
66 }
67
68 buf = calloc<char>(buf_size);
69 if (buf == nullptr) {
70 PERROR("zmalloc relayd send command buf");
71 ret = -1;
72 goto alloc_error;
73 }
74
75 memset(&header, 0, sizeof(header));
76 header.cmd = htobe32(cmd);
77 header.data_size = htobe64(size);
78
79 /* Zeroed for now since not used. */
80 header.cmd_version = 0;
81 header.circuit_id = 0;
82
83 /* Prepare buffer to send. */
84 memcpy(buf, &header, sizeof(header));
85 if (data) {
86 memcpy(buf + sizeof(header), data, size);
87 }
88
89 DBG3("Relayd sending command %s of size %" PRIu64,
90 lttcomm_relayd_command_str(cmd),
91 buf_size);
92 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
93 if (ret < 0) {
94 PERROR("Failed to send command %s of size %" PRIu64,
95 lttcomm_relayd_command_str(cmd),
96 buf_size);
97 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
98 ret = -errno;
99 goto error;
100 }
101 error:
102 free(buf);
103 alloc_error:
104 return ret;
105 }
106
107 /*
108 * Receive reply data on socket. This MUST be call after send_command or else
109 * could result in unexpected behavior(s).
110 */
111 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
112 {
113 int ret;
114
115 if (rsock->sock.fd < 0) {
116 return -ECONNRESET;
117 }
118
119 DBG3("Relayd waiting for reply of size %zu", size);
120
121 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
122 if (ret <= 0 || ret != size) {
123 if (ret == 0) {
124 /* Orderly shutdown. */
125 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
126 } else {
127 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
128 rsock->sock.fd,
129 size,
130 ret);
131 }
132 /* Always return -1 here and the caller can use errno. */
133 ret = -1;
134 goto error;
135 }
136
137 error:
138 return ret;
139 }
140
141 /*
142 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
143 * hostname, and base_path) have no length restriction on the sender side.
144 * Length for both payloads is stored in the msg struct. A new dynamic size
145 * payload size is introduced.
146 */
147 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
148 const char *session_name,
149 const char *hostname,
150 const char *base_path,
151 int session_live_timer,
152 unsigned int snapshot,
153 uint64_t sessiond_session_id,
154 const lttng_uuid& sessiond_uuid,
155 const uint64_t *current_chunk_id,
156 time_t creation_time,
157 bool session_name_contains_creation_time,
158 struct lttcomm_relayd_create_session_reply_2_11 *reply,
159 char *output_path)
160 {
161 int ret;
162 struct lttcomm_relayd_create_session_2_11 *msg = nullptr;
163 size_t session_name_len;
164 size_t hostname_len;
165 size_t base_path_len;
166 size_t msg_length;
167 char *dst;
168
169 if (!base_path) {
170 base_path = "";
171 }
172 /* The three names are sent with a '\0' delimiter between them. */
173 session_name_len = strlen(session_name) + 1;
174 hostname_len = strlen(hostname) + 1;
175 base_path_len = strlen(base_path) + 1;
176
177 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
178 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
179 if (!msg) {
180 PERROR("zmalloc create_session_2_11 command message");
181 ret = -1;
182 goto error;
183 }
184
185 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
186 msg->session_name_len = htobe32(session_name_len);
187
188 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
189 msg->hostname_len = htobe32(hostname_len);
190
191 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
192 msg->base_path_len = htobe32(base_path_len);
193
194 dst = msg->names;
195 if (lttng_strncpy(dst, session_name, session_name_len)) {
196 ret = -1;
197 goto error;
198 }
199 dst += session_name_len;
200 if (lttng_strncpy(dst, hostname, hostname_len)) {
201 ret = -1;
202 goto error;
203 }
204 dst += hostname_len;
205 if (lttng_strncpy(dst, base_path, base_path_len)) {
206 ret = -1;
207 goto error;
208 }
209
210 msg->live_timer = htobe32(session_live_timer);
211 msg->snapshot = !!snapshot;
212
213 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
214 msg->session_id = htobe64(sessiond_session_id);
215 msg->session_name_contains_creation_time = session_name_contains_creation_time;
216 if (current_chunk_id) {
217 LTTNG_OPTIONAL_SET(&msg->current_chunk_id, htobe64(*current_chunk_id));
218 }
219
220 msg->creation_time = htobe64((uint64_t) creation_time);
221
222 /* Send command */
223 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
224 if (ret < 0) {
225 goto error;
226 }
227 /* Receive response */
228 ret = recv_reply(rsock, reply, sizeof(*reply));
229 if (ret < 0) {
230 goto error;
231 }
232 reply->generic.session_id = be64toh(reply->generic.session_id);
233 reply->generic.ret_code = be32toh(reply->generic.ret_code);
234 reply->output_path_length = be32toh(reply->output_path_length);
235 if (reply->output_path_length >= LTTNG_PATH_MAX) {
236 ERR("Invalid session output path length in reply (%" PRIu32
237 " bytes) exceeds maximal allowed length (%d bytes)",
238 reply->output_path_length,
239 LTTNG_PATH_MAX);
240 ret = -1;
241 goto error;
242 }
243 ret = recv_reply(rsock, output_path, reply->output_path_length);
244 if (ret < 0) {
245 goto error;
246 }
247 error:
248 free(msg);
249 return ret;
250 }
251 /*
252 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
253 * support the live reading capability.
254 */
255 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
256 const char *session_name,
257 const char *hostname,
258 int session_live_timer,
259 unsigned int snapshot,
260 struct lttcomm_relayd_status_session *reply)
261 {
262 int ret;
263 struct lttcomm_relayd_create_session_2_4 msg;
264
265 if (lttng_strncpy(msg.session_name, session_name, sizeof(msg.session_name))) {
266 ret = -1;
267 goto error;
268 }
269 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
270 ret = -1;
271 goto error;
272 }
273 msg.live_timer = htobe32(session_live_timer);
274 msg.snapshot = htobe32(snapshot);
275
276 /* Send command */
277 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
278 if (ret < 0) {
279 goto error;
280 }
281
282 /* Receive response */
283 ret = recv_reply(rsock, reply, sizeof(*reply));
284 if (ret < 0) {
285 goto error;
286 }
287 reply->session_id = be64toh(reply->session_id);
288 reply->ret_code = be32toh(reply->ret_code);
289 error:
290 return ret;
291 }
292
293 /*
294 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
295 */
296 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
297 struct lttcomm_relayd_status_session *reply)
298 {
299 int ret;
300
301 /* Send command */
302 ret = send_command(rsock, RELAYD_CREATE_SESSION, nullptr, 0, 0);
303 if (ret < 0) {
304 goto error;
305 }
306
307 /* Receive response */
308 ret = recv_reply(rsock, reply, sizeof(*reply));
309 if (ret < 0) {
310 goto error;
311 }
312 reply->session_id = be64toh(reply->session_id);
313 reply->ret_code = be32toh(reply->ret_code);
314 error:
315 return ret;
316 }
317
318 /*
319 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
320 * set session_id of the relayd if we have a successful reply from the relayd.
321 *
322 * On success, return 0 else a negative value which is either an errno error or
323 * a lttng error code from the relayd.
324 */
325 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
326 uint64_t *relayd_session_id,
327 const char *session_name,
328 const char *hostname,
329 const char *base_path,
330 int session_live_timer,
331 unsigned int snapshot,
332 uint64_t sessiond_session_id,
333 const lttng_uuid& sessiond_uuid,
334 const uint64_t *current_chunk_id,
335 time_t creation_time,
336 bool session_name_contains_creation_time,
337 char *output_path)
338 {
339 int ret;
340 struct lttcomm_relayd_create_session_reply_2_11 reply = {};
341
342 LTTNG_ASSERT(rsock);
343 LTTNG_ASSERT(relayd_session_id);
344
345 DBG("Relayd create session");
346
347 if (rsock->minor < 4) {
348 /* From 2.1 to 2.3 */
349 ret = relayd_create_session_2_1(rsock, &reply.generic);
350 } else if (rsock->minor >= 4 && rsock->minor < 11) {
351 /* From 2.4 to 2.10 */
352 ret = relayd_create_session_2_4(
353 rsock, session_name, hostname, session_live_timer, snapshot, &reply.generic);
354 } else {
355 /* From 2.11 to ... */
356 ret = relayd_create_session_2_11(rsock,
357 session_name,
358 hostname,
359 base_path,
360 session_live_timer,
361 snapshot,
362 sessiond_session_id,
363 sessiond_uuid,
364 current_chunk_id,
365 creation_time,
366 session_name_contains_creation_time,
367 &reply,
368 output_path);
369 }
370
371 if (ret < 0) {
372 goto error;
373 }
374
375 /* Return session id or negative ret code. */
376 if (reply.generic.ret_code != LTTNG_OK) {
377 ret = -1;
378 ERR("Relayd create session replied error %d", reply.generic.ret_code);
379 goto error;
380 } else {
381 ret = 0;
382 *relayd_session_id = reply.generic.session_id;
383 }
384
385 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
386
387 error:
388 return ret;
389 }
390
391 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
392 const char *channel_name,
393 const char *pathname)
394 {
395 int ret;
396 struct lttcomm_relayd_add_stream msg;
397
398 memset(&msg, 0, sizeof(msg));
399 if (lttng_strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name))) {
400 ret = -1;
401 goto error;
402 }
403
404 if (lttng_strncpy(msg.pathname, pathname, sizeof(msg.pathname))) {
405 ret = -1;
406 goto error;
407 }
408
409 /* Send command */
410 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
411 if (ret < 0) {
412 ret = -1;
413 goto error;
414 }
415 ret = 0;
416 error:
417 return ret;
418 }
419
420 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
421 const char *channel_name,
422 const char *pathname,
423 uint64_t tracefile_size,
424 uint64_t tracefile_count)
425 {
426 int ret;
427 struct lttcomm_relayd_add_stream_2_2 msg;
428
429 memset(&msg, 0, sizeof(msg));
430 /* Compat with relayd 2.2 to 2.10 */
431 if (lttng_strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name))) {
432 ret = -1;
433 goto error;
434 }
435 if (lttng_strncpy(msg.pathname, pathname, sizeof(msg.pathname))) {
436 ret = -1;
437 goto error;
438 }
439 msg.tracefile_size = htobe64(tracefile_size);
440 msg.tracefile_count = htobe64(tracefile_count);
441
442 /* Send command */
443 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
444 if (ret < 0) {
445 goto error;
446 }
447 ret = 0;
448 error:
449 return ret;
450 }
451
452 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
453 const char *channel_name,
454 const char *pathname,
455 uint64_t tracefile_size,
456 uint64_t tracefile_count,
457 uint64_t trace_archive_id)
458 {
459 int ret;
460 struct lttcomm_relayd_add_stream_2_11 *msg = nullptr;
461 size_t channel_name_len;
462 size_t pathname_len;
463 size_t msg_length;
464
465 /* The two names are sent with a '\0' delimiter between them. */
466 channel_name_len = strlen(channel_name) + 1;
467 pathname_len = strlen(pathname) + 1;
468
469 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
470 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
471 if (!msg) {
472 PERROR("zmalloc add_stream_2_11 command message");
473 ret = -1;
474 goto error;
475 }
476
477 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
478 msg->channel_name_len = htobe32(channel_name_len);
479
480 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
481 msg->pathname_len = htobe32(pathname_len);
482
483 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
484 ret = -1;
485 goto error;
486 }
487 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
488 ret = -1;
489 goto error;
490 }
491
492 msg->tracefile_size = htobe64(tracefile_size);
493 msg->tracefile_count = htobe64(tracefile_count);
494 msg->trace_chunk_id = htobe64(trace_archive_id);
495
496 /* Send command */
497 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
498 if (ret < 0) {
499 goto error;
500 }
501 ret = 0;
502 error:
503 free(msg);
504 return ret;
505 }
506
507 /*
508 * Add stream on the relayd and assign stream handle to the stream_id argument.
509 *
510 * Chunks are not supported by relayd prior to 2.11, but are used to
511 * internally between session daemon and consumer daemon to keep track
512 * of the channel and stream output path.
513 *
514 * On success return 0 else return ret_code negative value.
515 */
516 int relayd_add_stream(struct lttcomm_relayd_sock *rsock,
517 const char *channel_name,
518 const char *domain_name,
519 const char *_pathname,
520 uint64_t *stream_id,
521 uint64_t tracefile_size,
522 uint64_t tracefile_count,
523 struct lttng_trace_chunk *trace_chunk)
524 {
525 int ret;
526 struct lttcomm_relayd_status_stream reply;
527 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
528
529 /* Code flow error. Safety net. */
530 LTTNG_ASSERT(rsock);
531 LTTNG_ASSERT(channel_name);
532 LTTNG_ASSERT(domain_name);
533 LTTNG_ASSERT(_pathname);
534 LTTNG_ASSERT(trace_chunk);
535
536 DBG("Relayd adding stream for channel name %s", channel_name);
537
538 /* Compat with relayd 2.1 */
539 if (rsock->minor == 1) {
540 /* For 2.1 */
541 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
542
543 } else if (rsock->minor > 1 && rsock->minor < 11) {
544 /* From 2.2 to 2.10 */
545 ret = relayd_add_stream_2_2(
546 rsock, channel_name, _pathname, tracefile_size, tracefile_count);
547 } else {
548 const char *separator;
549 enum lttng_trace_chunk_status chunk_status;
550 uint64_t chunk_id;
551
552 if (_pathname[0] == '\0') {
553 separator = "";
554 } else {
555 separator = "/";
556 }
557
558 ret = snprintf(pathname,
559 RELAYD_COMM_LTTNG_PATH_MAX,
560 "%s%s%s",
561 domain_name,
562 separator,
563 _pathname);
564 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
565 ERR("Failed to format stream path: %s",
566 ret <= 0 ? "formatting error" : "path exceeds maximal allowed length");
567 ret = -1;
568 goto error;
569 }
570
571 chunk_status = lttng_trace_chunk_get_id(trace_chunk, &chunk_id);
572 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
573
574 /* From 2.11 to ...*/
575 ret = relayd_add_stream_2_11(
576 rsock, channel_name, pathname, tracefile_size, tracefile_count, chunk_id);
577 }
578
579 if (ret) {
580 ret = -1;
581 goto error;
582 }
583
584 /* Waiting for reply */
585 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
586 if (ret < 0) {
587 goto error;
588 }
589
590 /* Back to host bytes order. */
591 reply.handle = be64toh(reply.handle);
592 reply.ret_code = be32toh(reply.ret_code);
593
594 /* Return session id or negative ret code. */
595 if (reply.ret_code != LTTNG_OK) {
596 ret = -1;
597 ERR("Relayd add stream replied error %d", reply.ret_code);
598 } else {
599 /* Success */
600 ret = 0;
601 *stream_id = reply.handle;
602 }
603
604 DBG("Relayd stream added successfully with handle %" PRIu64, reply.handle);
605
606 error:
607 return ret;
608 }
609
610 /*
611 * Inform the relay that all the streams for the current channel has been sent.
612 *
613 * On success return 0 else return ret_code negative value.
614 */
615 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
616 {
617 int ret;
618 struct lttcomm_relayd_generic_reply reply;
619
620 /* Code flow error. Safety net. */
621 LTTNG_ASSERT(rsock);
622
623 DBG("Relayd sending streams sent.");
624
625 /* This feature was introduced in 2.4, ignore it for earlier versions. */
626 if (rsock->minor < 4) {
627 ret = 0;
628 goto end;
629 }
630
631 /* Send command */
632 ret = send_command(rsock, RELAYD_STREAMS_SENT, nullptr, 0, 0);
633 if (ret < 0) {
634 goto error;
635 }
636
637 /* Waiting for reply */
638 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
639 if (ret < 0) {
640 goto error;
641 }
642
643 /* Back to host bytes order. */
644 reply.ret_code = be32toh(reply.ret_code);
645
646 /* Return session id or negative ret code. */
647 if (reply.ret_code != LTTNG_OK) {
648 ret = -1;
649 ERR("Relayd streams sent replied error %d", reply.ret_code);
650 goto error;
651 } else {
652 /* Success */
653 ret = 0;
654 }
655
656 DBG("Relayd streams sent success");
657
658 error:
659 end:
660 return ret;
661 }
662
663 /*
664 * Check version numbers on the relayd.
665 * If major versions are compatible, we assign minor_to_use to the
666 * minor version of the procotol we are going to use for this session.
667 *
668 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
669 * otherwise, or a negative value on network errors.
670 */
671 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
672 {
673 int ret;
674 struct lttcomm_relayd_version msg;
675
676 /* Code flow error. Safety net. */
677 LTTNG_ASSERT(rsock);
678
679 DBG("Relayd version check for major.minor %u.%u", rsock->major, rsock->minor);
680
681 memset(&msg, 0, sizeof(msg));
682 /* Prepare network byte order before transmission. */
683 msg.major = htobe32(rsock->major);
684 msg.minor = htobe32(rsock->minor);
685
686 /* Send command */
687 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
688 if (ret < 0) {
689 goto error;
690 }
691
692 /* Receive response */
693 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
694 if (ret < 0) {
695 goto error;
696 }
697
698 /* Set back to host bytes order */
699 msg.major = be32toh(msg.major);
700 msg.minor = be32toh(msg.minor);
701
702 /*
703 * Only validate the major version. If the other side is higher,
704 * communication is not possible. Only major version equal can talk to each
705 * other. If the minor version differs, the lowest version is used by both
706 * sides.
707 */
708 if (msg.major != rsock->major) {
709 /* Not compatible */
710 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
711 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
712 msg.major,
713 rsock->major);
714 goto error;
715 }
716
717 /*
718 * If the relayd's minor version is higher, it will adapt to our version so
719 * we can continue to use the latest relayd communication data structure.
720 * If the received minor version is higher, the relayd should adapt to us.
721 */
722 if (rsock->minor > msg.minor) {
723 rsock->minor = msg.minor;
724 }
725
726 /* Version number compatible */
727 DBG2("Relayd version is compatible, using protocol version %u.%u",
728 rsock->major,
729 rsock->minor);
730 ret = 0;
731
732 error:
733 return ret;
734 }
735
736 /*
737 * Add stream on the relayd and assign stream handle to the stream_id argument.
738 *
739 * On success return 0 else return ret_code negative value.
740 */
741 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
742 {
743 int ret;
744
745 /* Code flow error. Safety net. */
746 LTTNG_ASSERT(rsock);
747
748 DBG("Relayd sending metadata of size %zu", len);
749
750 /* Send command */
751 ret = send_command(rsock, RELAYD_SEND_METADATA, nullptr, len, 0);
752 if (ret < 0) {
753 goto error;
754 }
755
756 DBG2("Relayd metadata added successfully");
757
758 /*
759 * After that call, the metadata data MUST be sent to the relayd so the
760 * receive size on the other end matches the len of the metadata packet
761 * header. This is why we don't wait for a reply here.
762 */
763
764 error:
765 return ret;
766 }
767
768 /*
769 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
770 */
771 int relayd_connect(struct lttcomm_relayd_sock *rsock)
772 {
773 /* Code flow error. Safety net. */
774 LTTNG_ASSERT(rsock);
775
776 if (!rsock->sock.ops) {
777 /*
778 * Attempting a connect on a non-initialized socket.
779 */
780 return -ECONNRESET;
781 }
782
783 DBG3("Relayd connect ...");
784
785 return rsock->sock.ops->connect(&rsock->sock);
786 }
787
788 /*
789 * Close relayd socket with an allocated lttcomm_relayd_sock.
790 *
791 * If no socket operations are found, simply return 0 meaning that everything
792 * is fine. Without operations, the socket can not possibly be opened or used.
793 * This is possible if the socket was allocated but not created. However, the
794 * caller could simply use it to store a valid file descriptor for instance
795 * passed over a Unix socket and call this to cleanup but still without a valid
796 * ops pointer.
797 *
798 * Return the close returned value. On error, a negative value is usually
799 * returned back from close(2).
800 */
801 int relayd_close(struct lttcomm_relayd_sock *rsock)
802 {
803 int ret;
804
805 /* Code flow error. Safety net. */
806 LTTNG_ASSERT(rsock);
807
808 /* An invalid fd is fine, return success. */
809 if (rsock->sock.fd < 0) {
810 ret = 0;
811 goto end;
812 }
813
814 DBG3("Relayd closing socket %d", rsock->sock.fd);
815
816 if (rsock->sock.ops) {
817 ret = rsock->sock.ops->close(&rsock->sock);
818 } else {
819 /* Default call if no specific ops found. */
820 ret = close(rsock->sock.fd);
821 if (ret < 0) {
822 PERROR("relayd_close default close");
823 }
824 }
825 rsock->sock.fd = -1;
826
827 end:
828 return ret;
829 }
830
831 /*
832 * Send data header structure to the relayd.
833 */
834 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
835 struct lttcomm_relayd_data_hdr *hdr,
836 size_t size)
837 {
838 int ret;
839
840 /* Code flow error. Safety net. */
841 LTTNG_ASSERT(rsock);
842 LTTNG_ASSERT(hdr);
843
844 if (rsock->sock.fd < 0) {
845 return -ECONNRESET;
846 }
847
848 DBG3("Relayd sending data header of size %zu", size);
849
850 /* Again, safety net */
851 if (size == 0) {
852 size = sizeof(struct lttcomm_relayd_data_hdr);
853 }
854
855 /* Only send data header. */
856 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
857 if (ret < 0) {
858 ret = -errno;
859 goto error;
860 }
861
862 /*
863 * The data MUST be sent right after that command for the receive on the
864 * other end to match the size in the header.
865 */
866
867 error:
868 return ret;
869 }
870
871 /*
872 * Send close stream command to the relayd.
873 */
874 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock,
875 uint64_t stream_id,
876 uint64_t last_net_seq_num)
877 {
878 int ret;
879 struct lttcomm_relayd_close_stream msg;
880 struct lttcomm_relayd_generic_reply reply;
881
882 /* Code flow error. Safety net. */
883 LTTNG_ASSERT(rsock);
884
885 DBG("Relayd closing stream id %" PRIu64, stream_id);
886
887 memset(&msg, 0, sizeof(msg));
888 msg.stream_id = htobe64(stream_id);
889 msg.last_net_seq_num = htobe64(last_net_seq_num);
890
891 /* Send command */
892 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
893 if (ret < 0) {
894 goto error;
895 }
896
897 /* Receive response */
898 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
899 if (ret < 0) {
900 goto error;
901 }
902
903 reply.ret_code = be32toh(reply.ret_code);
904
905 /* Return session id or negative ret code. */
906 if (reply.ret_code != LTTNG_OK) {
907 ret = -1;
908 ERR("Relayd close stream replied error %d", reply.ret_code);
909 } else {
910 /* Success */
911 ret = 0;
912 }
913
914 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
915
916 error:
917 return ret;
918 }
919
920 /*
921 * Check for data availability for a given stream id.
922 *
923 * Return 0 if NOT pending, 1 if so and a negative value on error.
924 */
925 int relayd_data_pending(struct lttcomm_relayd_sock *rsock,
926 uint64_t stream_id,
927 uint64_t last_net_seq_num)
928 {
929 int ret;
930 struct lttcomm_relayd_data_pending msg;
931 struct lttcomm_relayd_generic_reply reply;
932
933 /* Code flow error. Safety net. */
934 LTTNG_ASSERT(rsock);
935
936 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
937
938 memset(&msg, 0, sizeof(msg));
939 msg.stream_id = htobe64(stream_id);
940 msg.last_net_seq_num = htobe64(last_net_seq_num);
941
942 /* Send command */
943 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg, sizeof(msg), 0);
944 if (ret < 0) {
945 goto error;
946 }
947
948 /* Receive response */
949 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
950 if (ret < 0) {
951 goto error;
952 }
953
954 reply.ret_code = be32toh(reply.ret_code);
955
956 /* Return session id or negative ret code. */
957 if (reply.ret_code >= LTTNG_OK) {
958 ERR("Relayd data pending replied error %d", reply.ret_code);
959 }
960
961 /* At this point, the ret code is either 1 or 0 */
962 ret = reply.ret_code;
963
964 DBG("Relayd data is %s pending for stream id %" PRIu64, ret == 1 ? "" : "NOT", stream_id);
965
966 error:
967 return ret;
968 }
969
970 /*
971 * Check on the relayd side for a quiescent state on the control socket.
972 */
973 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock, uint64_t metadata_stream_id)
974 {
975 int ret;
976 struct lttcomm_relayd_quiescent_control msg;
977 struct lttcomm_relayd_generic_reply reply;
978
979 /* Code flow error. Safety net. */
980 LTTNG_ASSERT(rsock);
981
982 DBG("Relayd checking quiescent control state");
983
984 memset(&msg, 0, sizeof(msg));
985 msg.stream_id = htobe64(metadata_stream_id);
986
987 /* Send command */
988 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
989 if (ret < 0) {
990 goto error;
991 }
992
993 /* Receive response */
994 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
995 if (ret < 0) {
996 goto error;
997 }
998
999 reply.ret_code = be32toh(reply.ret_code);
1000
1001 /* Return session id or negative ret code. */
1002 if (reply.ret_code != LTTNG_OK) {
1003 ret = -1;
1004 ERR("Relayd quiescent control replied error %d", reply.ret_code);
1005 goto error;
1006 }
1007
1008 /* Control socket is quiescent */
1009 return 0;
1010
1011 error:
1012 return ret;
1013 }
1014
1015 /*
1016 * Begin a data pending command for a specific session id.
1017 */
1018 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
1019 {
1020 int ret;
1021 struct lttcomm_relayd_begin_data_pending msg;
1022 struct lttcomm_relayd_generic_reply reply;
1023
1024 /* Code flow error. Safety net. */
1025 LTTNG_ASSERT(rsock);
1026
1027 DBG("Relayd begin data pending");
1028
1029 memset(&msg, 0, sizeof(msg));
1030 msg.session_id = htobe64(id);
1031
1032 /* Send command */
1033 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
1034 if (ret < 0) {
1035 goto error;
1036 }
1037
1038 /* Receive response */
1039 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1040 if (ret < 0) {
1041 goto error;
1042 }
1043
1044 reply.ret_code = be32toh(reply.ret_code);
1045
1046 /* Return session id or negative ret code. */
1047 if (reply.ret_code != LTTNG_OK) {
1048 ret = -1;
1049 ERR("Relayd begin data pending replied error %d", reply.ret_code);
1050 goto error;
1051 }
1052
1053 return 0;
1054
1055 error:
1056 return ret;
1057 }
1058
1059 /*
1060 * End a data pending command for a specific session id.
1061 *
1062 * Return 0 on success and set is_data_inflight to 0 if no data is being
1063 * streamed or 1 if it is the case.
1064 */
1065 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock,
1066 uint64_t id,
1067 unsigned int *is_data_inflight)
1068 {
1069 int ret, recv_ret;
1070 struct lttcomm_relayd_end_data_pending msg;
1071 struct lttcomm_relayd_generic_reply reply;
1072
1073 /* Code flow error. Safety net. */
1074 LTTNG_ASSERT(rsock);
1075
1076 DBG("Relayd end data pending");
1077
1078 memset(&msg, 0, sizeof(msg));
1079 msg.session_id = htobe64(id);
1080
1081 /* Send command */
1082 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
1083 if (ret < 0) {
1084 goto error;
1085 }
1086
1087 /* Receive response */
1088 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1089 if (ret < 0) {
1090 goto error;
1091 }
1092
1093 recv_ret = be32toh(reply.ret_code);
1094 if (recv_ret < 0) {
1095 ret = recv_ret;
1096 goto error;
1097 }
1098
1099 *is_data_inflight = recv_ret;
1100
1101 DBG("Relayd end data pending is data inflight: %d", recv_ret);
1102
1103 return 0;
1104
1105 error:
1106 return ret;
1107 }
1108
1109 /*
1110 * Send index to the relayd.
1111 */
1112 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
1113 struct ctf_packet_index *index,
1114 uint64_t relay_stream_id,
1115 uint64_t net_seq_num)
1116 {
1117 int ret;
1118 struct lttcomm_relayd_index msg;
1119 struct lttcomm_relayd_generic_reply reply;
1120
1121 /* Code flow error. Safety net. */
1122 LTTNG_ASSERT(rsock);
1123
1124 if (rsock->minor < 4) {
1125 DBG("Not sending indexes before protocol 2.4");
1126 ret = 0;
1127 goto error;
1128 }
1129
1130 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1131
1132 memset(&msg, 0, sizeof(msg));
1133 msg.relay_stream_id = htobe64(relay_stream_id);
1134 msg.net_seq_num = htobe64(net_seq_num);
1135
1136 /* The index is already in big endian. */
1137 msg.packet_size = index->packet_size;
1138 msg.content_size = index->content_size;
1139 msg.timestamp_begin = index->timestamp_begin;
1140 msg.timestamp_end = index->timestamp_end;
1141 msg.events_discarded = index->events_discarded;
1142 msg.stream_id = index->stream_id;
1143
1144 if (rsock->minor >= 8) {
1145 msg.stream_instance_id = index->stream_instance_id;
1146 msg.packet_seq_num = index->packet_seq_num;
1147 }
1148
1149 /* Send command */
1150 ret = send_command(
1151 rsock,
1152 RELAYD_SEND_INDEX,
1153 &msg,
1154 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major, rsock->minor),
1155 lttng_to_index_minor(rsock->major, rsock->minor)),
1156 0);
1157 if (ret < 0) {
1158 goto error;
1159 }
1160
1161 /* Receive response */
1162 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1163 if (ret < 0) {
1164 goto error;
1165 }
1166
1167 reply.ret_code = be32toh(reply.ret_code);
1168
1169 /* Return session id or negative ret code. */
1170 if (reply.ret_code != LTTNG_OK) {
1171 ret = -1;
1172 ERR("Relayd send index replied error %d", reply.ret_code);
1173 } else {
1174 /* Success */
1175 ret = 0;
1176 }
1177
1178 error:
1179 return ret;
1180 }
1181
1182 /*
1183 * Ask the relay to reset the metadata trace file (regeneration).
1184 */
1185 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version)
1186 {
1187 int ret;
1188 struct lttcomm_relayd_reset_metadata msg;
1189 struct lttcomm_relayd_generic_reply reply;
1190
1191 /* Code flow error. Safety net. */
1192 LTTNG_ASSERT(rsock);
1193
1194 /* Should have been prevented by the sessiond. */
1195 if (rsock->minor < 8) {
1196 ERR("Metadata regeneration unsupported before 2.8");
1197 ret = -1;
1198 goto error;
1199 }
1200
1201 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1202
1203 memset(&msg, 0, sizeof(msg));
1204 msg.stream_id = htobe64(stream_id);
1205 msg.version = htobe64(version);
1206
1207 /* Send command */
1208 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1209 if (ret < 0) {
1210 goto error;
1211 }
1212
1213 /* Receive response */
1214 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1215 if (ret < 0) {
1216 goto error;
1217 }
1218
1219 reply.ret_code = be32toh(reply.ret_code);
1220
1221 /* Return session id or negative ret code. */
1222 if (reply.ret_code != LTTNG_OK) {
1223 ret = -1;
1224 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1225 } else {
1226 /* Success */
1227 ret = 0;
1228 }
1229
1230 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1231
1232 error:
1233 return ret;
1234 }
1235
1236 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
1237 unsigned int stream_count,
1238 const uint64_t *new_chunk_id,
1239 const struct relayd_stream_rotation_position *positions)
1240 {
1241 int ret;
1242 unsigned int i;
1243 struct lttng_dynamic_buffer payload;
1244 struct lttcomm_relayd_generic_reply reply = {};
1245 struct lttcomm_relayd_rotate_streams msg;
1246 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1247 const char *new_chunk_id_str;
1248
1249 msg.stream_count = htobe32((uint32_t) stream_count);
1250 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1251 .is_set = !!new_chunk_id,
1252 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1253 };
1254
1255 if (!relayd_supports_chunks(sock)) {
1256 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1257 return 0;
1258 }
1259
1260 lttng_dynamic_buffer_init(&payload);
1261
1262 /* Code flow error. Safety net. */
1263 LTTNG_ASSERT(sock);
1264
1265 if (new_chunk_id) {
1266 ret = snprintf(
1267 new_chunk_id_buf, sizeof(new_chunk_id_buf), "%" PRIu64, *new_chunk_id);
1268 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1269 new_chunk_id_str = "formatting error";
1270 } else {
1271 new_chunk_id_str = new_chunk_id_buf;
1272 }
1273 } else {
1274 new_chunk_id_str = "none";
1275 }
1276
1277 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1278 new_chunk_id_str,
1279 stream_count);
1280
1281 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1282 if (ret) {
1283 ERR("Failed to allocate \"rotate streams\" command payload");
1284 goto error;
1285 }
1286
1287 for (i = 0; i < stream_count; i++) {
1288 const struct relayd_stream_rotation_position *position = &positions[i];
1289 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1290 .stream_id = htobe64(position->stream_id),
1291 .rotate_at_seq_num = htobe64(position->rotate_at_seq_num),
1292 };
1293
1294 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
1295 position->stream_id,
1296 position->rotate_at_seq_num);
1297 ret = lttng_dynamic_buffer_append(&payload, &comm_position, sizeof(comm_position));
1298 if (ret) {
1299 ERR("Failed to allocate \"rotate streams\" command payload");
1300 goto error;
1301 }
1302 }
1303
1304 /* Send command. */
1305 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data, payload.size, 0);
1306 if (ret < 0) {
1307 ERR("Failed to send \"rotate stream\" command");
1308 goto error;
1309 }
1310
1311 /* Receive response. */
1312 ret = recv_reply(sock, &reply, sizeof(reply));
1313 if (ret < 0) {
1314 ERR("Failed to receive \"rotate streams\" command reply");
1315 goto error;
1316 }
1317
1318 reply.ret_code = be32toh(reply.ret_code);
1319 if (reply.ret_code != LTTNG_OK) {
1320 ret = -1;
1321 ERR("Relayd rotate streams replied error %d", reply.ret_code);
1322 } else {
1323 /* Success. */
1324 ret = 0;
1325 DBG("Relayd rotated streams successfully");
1326 }
1327
1328 error:
1329 lttng_dynamic_buffer_reset(&payload);
1330 return ret;
1331 }
1332
1333 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock, struct lttng_trace_chunk *chunk)
1334 {
1335 int ret = 0;
1336 enum lttng_trace_chunk_status status;
1337 struct lttcomm_relayd_create_trace_chunk msg = {};
1338 struct lttcomm_relayd_generic_reply reply = {};
1339 struct lttng_dynamic_buffer payload;
1340 uint64_t chunk_id;
1341 time_t creation_timestamp;
1342 const char *chunk_name;
1343 size_t chunk_name_length;
1344 bool overridden_name;
1345
1346 lttng_dynamic_buffer_init(&payload);
1347
1348 if (!relayd_supports_chunks(sock)) {
1349 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1350 goto end;
1351 }
1352
1353 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1354 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1355 ret = -1;
1356 goto end;
1357 }
1358
1359 status = lttng_trace_chunk_get_creation_timestamp(chunk, &creation_timestamp);
1360 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1361 ret = -1;
1362 goto end;
1363 }
1364
1365 status = lttng_trace_chunk_get_name(chunk, &chunk_name, &overridden_name);
1366 if (status != LTTNG_TRACE_CHUNK_STATUS_OK && status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1367 ret = -1;
1368 goto end;
1369 }
1370
1371 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
1372 msg.chunk_id = htobe64(chunk_id);
1373 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1374 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
1375
1376 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1377 if (ret) {
1378 goto end;
1379 }
1380 if (chunk_name_length) {
1381 ret = lttng_dynamic_buffer_append(&payload, chunk_name, chunk_name_length);
1382 if (ret) {
1383 goto end;
1384 }
1385 }
1386
1387 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data, payload.size, 0);
1388 if (ret < 0) {
1389 ERR("Failed to send trace chunk creation command to relay daemon");
1390 goto end;
1391 }
1392
1393 ret = recv_reply(sock, &reply, sizeof(reply));
1394 if (ret < 0) {
1395 ERR("Failed to receive relay daemon trace chunk creation command reply");
1396 goto end;
1397 }
1398
1399 reply.ret_code = be32toh(reply.ret_code);
1400 if (reply.ret_code != LTTNG_OK) {
1401 ret = -1;
1402 ERR("Relayd trace chunk create replied error %d", reply.ret_code);
1403 } else {
1404 ret = 0;
1405 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64, chunk_id);
1406 }
1407
1408 end:
1409 lttng_dynamic_buffer_reset(&payload);
1410 return ret;
1411 }
1412
1413 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1414 struct lttng_trace_chunk *chunk,
1415 char *path)
1416 {
1417 int ret = 0;
1418 enum lttng_trace_chunk_status status;
1419 struct lttcomm_relayd_close_trace_chunk msg = {};
1420 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
1421 uint64_t chunk_id;
1422 time_t close_timestamp;
1423 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1424
1425 if (!relayd_supports_chunks(sock)) {
1426 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1427 goto end;
1428 }
1429
1430 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1431 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1432 ERR("Failed to get trace chunk id");
1433 ret = -1;
1434 goto end;
1435 }
1436
1437 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1438 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1439 ERR("Failed to get trace chunk close timestamp");
1440 ret = -1;
1441 goto end;
1442 }
1443
1444 status = lttng_trace_chunk_get_close_command(chunk, &close_command.value);
1445 switch (status) {
1446 case LTTNG_TRACE_CHUNK_STATUS_OK:
1447 close_command.is_set = 1;
1448 break;
1449 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1450 break;
1451 default:
1452 ERR("Failed to get trace chunk close command");
1453 ret = -1;
1454 goto end;
1455 }
1456
1457 msg = (typeof(msg)){
1458 .chunk_id = htobe64(chunk_id),
1459 .close_timestamp = htobe64((uint64_t) close_timestamp),
1460 .close_command = {
1461 .is_set = close_command.is_set,
1462 .value = htobe32((uint32_t) close_command.value),
1463 },
1464 };
1465
1466 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg), 0);
1467 if (ret < 0) {
1468 ERR("Failed to send trace chunk close command to relay daemon");
1469 goto end;
1470 }
1471
1472 ret = recv_reply(sock, &reply, sizeof(reply));
1473 if (ret < 0) {
1474 ERR("Failed to receive relay daemon trace chunk close command reply");
1475 goto end;
1476 }
1477
1478 reply.path_length = be32toh(reply.path_length);
1479 if (reply.path_length >= LTTNG_PATH_MAX) {
1480 ERR("Chunk path too long");
1481 ret = -1;
1482 goto end;
1483 }
1484
1485 ret = recv_reply(sock, path, reply.path_length);
1486 if (ret < 0) {
1487 ERR("Failed to receive relay daemon trace chunk close command reply");
1488 goto end;
1489 }
1490 if (path[reply.path_length - 1] != '\0') {
1491 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1492 ret = -1;
1493 goto end;
1494 }
1495
1496 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1497 if (reply.generic.ret_code != LTTNG_OK) {
1498 ret = -1;
1499 ERR("Relayd trace chunk close replied error %d", reply.generic.ret_code);
1500 } else {
1501 ret = 0;
1502 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64, chunk_id);
1503 }
1504 end:
1505 return ret;
1506 }
1507
1508 int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1509 uint64_t chunk_id,
1510 bool *chunk_exists)
1511 {
1512 int ret = 0;
1513 struct lttcomm_relayd_trace_chunk_exists msg = {};
1514 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1515
1516 if (!relayd_supports_chunks(sock)) {
1517 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
1518 /* The chunk will never exist */
1519 *chunk_exists = false;
1520 goto end;
1521 }
1522
1523 msg = (typeof(msg)){
1524 .chunk_id = htobe64(chunk_id),
1525 };
1526
1527 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg), 0);
1528 if (ret < 0) {
1529 ERR("Failed to send trace chunk exists command to relay daemon");
1530 goto end;
1531 }
1532
1533 ret = recv_reply(sock, &reply, sizeof(reply));
1534 if (ret < 0) {
1535 ERR("Failed to receive relay daemon trace chunk close command reply");
1536 goto end;
1537 }
1538
1539 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1540 if (reply.generic.ret_code != LTTNG_OK) {
1541 ret = -1;
1542 ERR("Relayd trace chunk close replied error %d", reply.generic.ret_code);
1543 } else {
1544 ret = 0;
1545 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1546 ", exists = %s",
1547 chunk_id,
1548 reply.trace_chunk_exists ? "true" : "false");
1549 *chunk_exists = !!reply.trace_chunk_exists;
1550 }
1551 end:
1552 return ret;
1553 }
1554
1555 int relayd_get_configuration(struct lttcomm_relayd_sock *sock,
1556 uint64_t query_flags,
1557 uint64_t *result_flags)
1558 {
1559 int ret = 0;
1560 struct lttcomm_relayd_get_configuration msg = (typeof(msg)){
1561 .query_flags = htobe64(query_flags),
1562 };
1563 struct lttcomm_relayd_get_configuration_reply reply = {};
1564
1565 if (!relayd_supports_get_configuration(sock)) {
1566 DBG("Refusing to get relayd configuration (unsupported by relayd)");
1567 if (query_flags) {
1568 ret = -1;
1569 goto end;
1570 }
1571 *result_flags = 0;
1572 goto end;
1573 }
1574
1575 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
1576 if (ret < 0) {
1577 ERR("Failed to send get configuration command to relay daemon");
1578 goto end;
1579 }
1580
1581 ret = recv_reply(sock, &reply, sizeof(reply));
1582 if (ret < 0) {
1583 ERR("Failed to receive relay daemon get configuration command reply");
1584 goto end;
1585 }
1586
1587 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1588 if (reply.generic.ret_code != LTTNG_OK) {
1589 ret = -1;
1590 ERR("Relayd get configuration replied error %d", reply.generic.ret_code);
1591 } else {
1592 reply.relayd_configuration_flags = be64toh(reply.relayd_configuration_flags);
1593 ret = 0;
1594 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
1595 ", results_flags = %" PRIu64,
1596 query_flags,
1597 reply.relayd_configuration_flags);
1598 *result_flags = reply.relayd_configuration_flags;
1599 }
1600 end:
1601 return ret;
1602 }
This page took 0.118106 seconds and 4 git commands to generate.