ed5e45607e3d9bf897f47fdba0817b4cd45f0052
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/compat/endian.h>
29 #include <common/compat/string.h>
30 #include <common/sessiond-comm/relayd.h>
31 #include <common/index/ctf-index.h>
32 #include <common/trace-chunk.h>
33 #include <common/string-utils/format.h>
34
35 #include "relayd.h"
36
37 /*
38 * Send command. Fill up the header and append the data.
39 */
40 static int send_command(struct lttcomm_relayd_sock *rsock,
41 enum lttcomm_relayd_command cmd, const void *data, size_t size,
42 int flags)
43 {
44 int ret;
45 struct lttcomm_relayd_hdr header;
46 char *buf;
47 uint64_t buf_size = sizeof(header);
48
49 if (rsock->sock.fd < 0) {
50 return -ECONNRESET;
51 }
52
53 if (data) {
54 buf_size += size;
55 }
56
57 buf = zmalloc(buf_size);
58 if (buf == NULL) {
59 PERROR("zmalloc relayd send command buf");
60 ret = -1;
61 goto alloc_error;
62 }
63
64 memset(&header, 0, sizeof(header));
65 header.cmd = htobe32(cmd);
66 header.data_size = htobe64(size);
67
68 /* Zeroed for now since not used. */
69 header.cmd_version = 0;
70 header.circuit_id = 0;
71
72 /* Prepare buffer to send. */
73 memcpy(buf, &header, sizeof(header));
74 if (data) {
75 memcpy(buf + sizeof(header), data, size);
76 }
77
78 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
79 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
80 if (ret < 0) {
81 PERROR("Failed to send command %d of size %" PRIu64,
82 (int) cmd, buf_size);
83 ret = -errno;
84 goto error;
85 }
86 error:
87 free(buf);
88 alloc_error:
89 return ret;
90 }
91
92 /*
93 * Receive reply data on socket. This MUST be call after send_command or else
94 * could result in unexpected behavior(s).
95 */
96 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
97 {
98 int ret;
99
100 if (rsock->sock.fd < 0) {
101 return -ECONNRESET;
102 }
103
104 DBG3("Relayd waiting for reply of size %zu", size);
105
106 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
107 if (ret <= 0 || ret != size) {
108 if (ret == 0) {
109 /* Orderly shutdown. */
110 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
111 } else {
112 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
113 rsock->sock.fd, size, ret);
114 }
115 /* Always return -1 here and the caller can use errno. */
116 ret = -1;
117 goto error;
118 }
119
120 error:
121 return ret;
122 }
123
124 /*
125 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
126 * hostname, and base_path) have no length restriction on the sender side.
127 * Length for both payloads is stored in the msg struct. A new dynamic size
128 * payload size is introduced.
129 */
130 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
131 const char *session_name, const char *hostname,
132 const char *base_path, int session_live_timer,
133 unsigned int snapshot, uint64_t sessiond_session_id,
134 const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id,
135 time_t creation_time, bool session_name_contains_creation_time)
136 {
137 int ret;
138 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
139 size_t session_name_len;
140 size_t hostname_len;
141 size_t base_path_len;
142 size_t msg_length;
143 char *dst;
144
145 if (!base_path) {
146 base_path = "";
147 }
148 /* The three names are sent with a '\0' delimiter between them. */
149 session_name_len = strlen(session_name) + 1;
150 hostname_len = strlen(hostname) + 1;
151 base_path_len = base_path ? strlen(base_path) + 1 : 0;
152
153 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
154 msg = zmalloc(msg_length);
155 if (!msg) {
156 PERROR("zmalloc create_session_2_11 command message");
157 ret = -1;
158 goto error;
159 }
160
161 assert(session_name_len <= UINT32_MAX);
162 msg->session_name_len = htobe32(session_name_len);
163
164 assert(hostname_len <= UINT32_MAX);
165 msg->hostname_len = htobe32(hostname_len);
166
167 assert(base_path_len <= UINT32_MAX);
168 msg->base_path_len = htobe32(base_path_len);
169
170 dst = msg->names;
171 if (lttng_strncpy(dst, session_name, session_name_len)) {
172 ret = -1;
173 goto error;
174 }
175 dst += session_name_len;
176 if (lttng_strncpy(dst, hostname, hostname_len)) {
177 ret = -1;
178 goto error;
179 }
180 dst += hostname_len;
181 if (base_path && lttng_strncpy(dst, base_path, base_path_len)) {
182 ret = -1;
183 goto error;
184 }
185
186 msg->live_timer = htobe32(session_live_timer);
187 msg->snapshot = !!snapshot;
188
189 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
190 msg->session_id = htobe64(sessiond_session_id);
191 msg->session_name_contains_creation_time = session_name_contains_creation_time;
192 if (current_chunk_id) {
193 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
194 htobe64(*current_chunk_id));
195 }
196
197 msg->creation_time = htobe64((uint64_t) creation_time);
198
199 /* Send command */
200 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
201 if (ret < 0) {
202 goto error;
203 }
204 error:
205 free(msg);
206 return ret;
207 }
208 /*
209 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
210 * support the live reading capability.
211 */
212 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
213 const char *session_name, const char *hostname,
214 int session_live_timer, unsigned int snapshot)
215 {
216 int ret;
217 struct lttcomm_relayd_create_session_2_4 msg;
218
219 if (lttng_strncpy(msg.session_name, session_name,
220 sizeof(msg.session_name))) {
221 ret = -1;
222 goto error;
223 }
224 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
225 ret = -1;
226 goto error;
227 }
228 msg.live_timer = htobe32(session_live_timer);
229 msg.snapshot = htobe32(snapshot);
230
231 /* Send command */
232 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
233 if (ret < 0) {
234 goto error;
235 }
236
237 error:
238 return ret;
239 }
240
241 /*
242 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
243 */
244 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
245 {
246 int ret;
247
248 /* Send command */
249 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
250 if (ret < 0) {
251 goto error;
252 }
253
254 error:
255 return ret;
256 }
257
258 /*
259 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
260 * set session_id of the relayd if we have a successful reply from the relayd.
261 *
262 * On success, return 0 else a negative value which is either an errno error or
263 * a lttng error code from the relayd.
264 */
265 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
266 uint64_t *relayd_session_id,
267 const char *session_name, const char *hostname,
268 const char *base_path, int session_live_timer,
269 unsigned int snapshot, uint64_t sessiond_session_id,
270 const lttng_uuid sessiond_uuid,
271 const uint64_t *current_chunk_id,
272 time_t creation_time, bool session_name_contains_creation_time)
273 {
274 int ret;
275 struct lttcomm_relayd_status_session reply;
276
277 assert(rsock);
278 assert(relayd_session_id);
279
280 DBG("Relayd create session");
281
282 if (rsock->minor < 4) {
283 /* From 2.1 to 2.3 */
284 ret = relayd_create_session_2_1(rsock);
285 } else if (rsock->minor >= 4 && rsock->minor < 11) {
286 /* From 2.4 to 2.10 */
287 ret = relayd_create_session_2_4(rsock, session_name,
288 hostname, session_live_timer, snapshot);
289 } else {
290 /* From 2.11 to ... */
291 ret = relayd_create_session_2_11(rsock, session_name,
292 hostname, base_path, session_live_timer, snapshot,
293 sessiond_session_id, sessiond_uuid,
294 current_chunk_id, creation_time,
295 session_name_contains_creation_time);
296 }
297
298 if (ret < 0) {
299 goto error;
300 }
301
302 /* Receive response */
303 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
304 if (ret < 0) {
305 goto error;
306 }
307
308 reply.session_id = be64toh(reply.session_id);
309 reply.ret_code = be32toh(reply.ret_code);
310
311 /* Return session id or negative ret code. */
312 if (reply.ret_code != LTTNG_OK) {
313 ret = -1;
314 ERR("Relayd create session replied error %d", reply.ret_code);
315 goto error;
316 } else {
317 ret = 0;
318 *relayd_session_id = reply.session_id;
319 }
320
321 DBG("Relayd session created with id %" PRIu64, reply.session_id);
322
323 error:
324 return ret;
325 }
326
327 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
328 const char *channel_name, const char *pathname)
329 {
330 int ret;
331 struct lttcomm_relayd_add_stream msg;
332
333 memset(&msg, 0, sizeof(msg));
334 if (lttng_strncpy(msg.channel_name, channel_name,
335 sizeof(msg.channel_name))) {
336 ret = -1;
337 goto error;
338 }
339
340 if (lttng_strncpy(msg.pathname, pathname,
341 sizeof(msg.pathname))) {
342 ret = -1;
343 goto error;
344 }
345
346 /* Send command */
347 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
348 if (ret < 0) {
349 ret = -1;
350 goto error;
351 }
352 ret = 0;
353 error:
354 return ret;
355 }
356
357 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
358 const char *channel_name, const char *pathname,
359 uint64_t tracefile_size, uint64_t tracefile_count)
360 {
361 int ret;
362 struct lttcomm_relayd_add_stream_2_2 msg;
363
364 memset(&msg, 0, sizeof(msg));
365 /* Compat with relayd 2.2 to 2.10 */
366 if (lttng_strncpy(msg.channel_name, channel_name,
367 sizeof(msg.channel_name))) {
368 ret = -1;
369 goto error;
370 }
371 if (lttng_strncpy(msg.pathname, pathname,
372 sizeof(msg.pathname))) {
373 ret = -1;
374 goto error;
375 }
376 msg.tracefile_size = htobe64(tracefile_size);
377 msg.tracefile_count = htobe64(tracefile_count);
378
379 /* Send command */
380 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
381 if (ret < 0) {
382 goto error;
383 }
384 ret = 0;
385 error:
386 return ret;
387 }
388
389 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
390 const char *channel_name, const char *pathname,
391 uint64_t tracefile_size, uint64_t tracefile_count,
392 uint64_t trace_archive_id)
393 {
394 int ret;
395 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
396 size_t channel_name_len;
397 size_t pathname_len;
398 size_t msg_length;
399
400 /* The two names are sent with a '\0' delimiter between them. */
401 channel_name_len = strlen(channel_name) + 1;
402 pathname_len = strlen(pathname) + 1;
403
404 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
405 msg = zmalloc(msg_length);
406 if (!msg) {
407 PERROR("zmalloc add_stream_2_11 command message");
408 ret = -1;
409 goto error;
410 }
411
412 assert(channel_name_len <= UINT32_MAX);
413 msg->channel_name_len = htobe32(channel_name_len);
414
415 assert(pathname_len <= UINT32_MAX);
416 msg->pathname_len = htobe32(pathname_len);
417
418 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
419 ret = -1;
420 goto error;
421 }
422 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
423 ret = -1;
424 goto error;
425 }
426
427 msg->tracefile_size = htobe64(tracefile_size);
428 msg->tracefile_count = htobe64(tracefile_count);
429 msg->trace_chunk_id = htobe64(trace_archive_id);
430
431 /* Send command */
432 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
433 if (ret < 0) {
434 goto error;
435 }
436 ret = 0;
437 error:
438 free(msg);
439 return ret;
440 }
441
442 /*
443 * Add stream on the relayd and assign stream handle to the stream_id argument.
444 *
445 * On success return 0 else return ret_code negative value.
446 */
447 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
448 const char *pathname, uint64_t *stream_id,
449 uint64_t tracefile_size, uint64_t tracefile_count,
450 struct lttng_trace_chunk *trace_chunk)
451 {
452 int ret;
453 struct lttcomm_relayd_status_stream reply;
454
455 /* Code flow error. Safety net. */
456 assert(rsock);
457 assert(channel_name);
458 assert(pathname);
459
460 DBG("Relayd adding stream for channel name %s", channel_name);
461
462 /* Compat with relayd 2.1 */
463 if (rsock->minor == 1) {
464 /* For 2.1 */
465 assert(!trace_chunk);
466 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
467
468 } else if (rsock->minor > 1 && rsock->minor < 11) {
469 /* From 2.2 to 2.10 */
470 assert(!trace_chunk);
471 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
472 tracefile_size, tracefile_count);
473 } else {
474 enum lttng_trace_chunk_status chunk_status;
475 uint64_t chunk_id;
476
477 assert(trace_chunk);
478 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
479 &chunk_id);
480 assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
481
482 /* From 2.11 to ...*/
483 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
484 tracefile_size, tracefile_count,
485 chunk_id);
486 }
487
488 if (ret) {
489 ret = -1;
490 goto error;
491 }
492
493 /* Waiting for reply */
494 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
495 if (ret < 0) {
496 goto error;
497 }
498
499 /* Back to host bytes order. */
500 reply.handle = be64toh(reply.handle);
501 reply.ret_code = be32toh(reply.ret_code);
502
503 /* Return session id or negative ret code. */
504 if (reply.ret_code != LTTNG_OK) {
505 ret = -1;
506 ERR("Relayd add stream replied error %d", reply.ret_code);
507 } else {
508 /* Success */
509 ret = 0;
510 *stream_id = reply.handle;
511 }
512
513 DBG("Relayd stream added successfully with handle %" PRIu64,
514 reply.handle);
515
516 error:
517 return ret;
518 }
519
520 /*
521 * Inform the relay that all the streams for the current channel has been sent.
522 *
523 * On success return 0 else return ret_code negative value.
524 */
525 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
526 {
527 int ret;
528 struct lttcomm_relayd_generic_reply reply;
529
530 /* Code flow error. Safety net. */
531 assert(rsock);
532
533 DBG("Relayd sending streams sent.");
534
535 /* This feature was introduced in 2.4, ignore it for earlier versions. */
536 if (rsock->minor < 4) {
537 ret = 0;
538 goto end;
539 }
540
541 /* Send command */
542 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
543 if (ret < 0) {
544 goto error;
545 }
546
547 /* Waiting for reply */
548 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
549 if (ret < 0) {
550 goto error;
551 }
552
553 /* Back to host bytes order. */
554 reply.ret_code = be32toh(reply.ret_code);
555
556 /* Return session id or negative ret code. */
557 if (reply.ret_code != LTTNG_OK) {
558 ret = -1;
559 ERR("Relayd streams sent replied error %d", reply.ret_code);
560 goto error;
561 } else {
562 /* Success */
563 ret = 0;
564 }
565
566 DBG("Relayd streams sent success");
567
568 error:
569 end:
570 return ret;
571 }
572
573 /*
574 * Check version numbers on the relayd.
575 * If major versions are compatible, we assign minor_to_use to the
576 * minor version of the procotol we are going to use for this session.
577 *
578 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
579 * otherwise, or a negative value on network errors.
580 */
581 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
582 {
583 int ret;
584 struct lttcomm_relayd_version msg;
585
586 /* Code flow error. Safety net. */
587 assert(rsock);
588
589 DBG("Relayd version check for major.minor %u.%u", rsock->major,
590 rsock->minor);
591
592 memset(&msg, 0, sizeof(msg));
593 /* Prepare network byte order before transmission. */
594 msg.major = htobe32(rsock->major);
595 msg.minor = htobe32(rsock->minor);
596
597 /* Send command */
598 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
599 if (ret < 0) {
600 goto error;
601 }
602
603 /* Receive response */
604 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
605 if (ret < 0) {
606 goto error;
607 }
608
609 /* Set back to host bytes order */
610 msg.major = be32toh(msg.major);
611 msg.minor = be32toh(msg.minor);
612
613 /*
614 * Only validate the major version. If the other side is higher,
615 * communication is not possible. Only major version equal can talk to each
616 * other. If the minor version differs, the lowest version is used by both
617 * sides.
618 */
619 if (msg.major != rsock->major) {
620 /* Not compatible */
621 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
622 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
623 msg.major, rsock->major);
624 goto error;
625 }
626
627 /*
628 * If the relayd's minor version is higher, it will adapt to our version so
629 * we can continue to use the latest relayd communication data structure.
630 * If the received minor version is higher, the relayd should adapt to us.
631 */
632 if (rsock->minor > msg.minor) {
633 rsock->minor = msg.minor;
634 }
635
636 /* Version number compatible */
637 DBG2("Relayd version is compatible, using protocol version %u.%u",
638 rsock->major, rsock->minor);
639 ret = 0;
640
641 error:
642 return ret;
643 }
644
645 /*
646 * Add stream on the relayd and assign stream handle to the stream_id argument.
647 *
648 * On success return 0 else return ret_code negative value.
649 */
650 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
651 {
652 int ret;
653
654 /* Code flow error. Safety net. */
655 assert(rsock);
656
657 DBG("Relayd sending metadata of size %zu", len);
658
659 /* Send command */
660 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
661 if (ret < 0) {
662 goto error;
663 }
664
665 DBG2("Relayd metadata added successfully");
666
667 /*
668 * After that call, the metadata data MUST be sent to the relayd so the
669 * receive size on the other end matches the len of the metadata packet
670 * header. This is why we don't wait for a reply here.
671 */
672
673 error:
674 return ret;
675 }
676
677 /*
678 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
679 */
680 int relayd_connect(struct lttcomm_relayd_sock *rsock)
681 {
682 /* Code flow error. Safety net. */
683 assert(rsock);
684
685 if (!rsock->sock.ops) {
686 /*
687 * Attempting a connect on a non-initialized socket.
688 */
689 return -ECONNRESET;
690 }
691
692 DBG3("Relayd connect ...");
693
694 return rsock->sock.ops->connect(&rsock->sock);
695 }
696
697 /*
698 * Close relayd socket with an allocated lttcomm_relayd_sock.
699 *
700 * If no socket operations are found, simply return 0 meaning that everything
701 * is fine. Without operations, the socket can not possibly be opened or used.
702 * This is possible if the socket was allocated but not created. However, the
703 * caller could simply use it to store a valid file descriptor for instance
704 * passed over a Unix socket and call this to cleanup but still without a valid
705 * ops pointer.
706 *
707 * Return the close returned value. On error, a negative value is usually
708 * returned back from close(2).
709 */
710 int relayd_close(struct lttcomm_relayd_sock *rsock)
711 {
712 int ret;
713
714 /* Code flow error. Safety net. */
715 assert(rsock);
716
717 /* An invalid fd is fine, return success. */
718 if (rsock->sock.fd < 0) {
719 ret = 0;
720 goto end;
721 }
722
723 DBG3("Relayd closing socket %d", rsock->sock.fd);
724
725 if (rsock->sock.ops) {
726 ret = rsock->sock.ops->close(&rsock->sock);
727 } else {
728 /* Default call if no specific ops found. */
729 ret = close(rsock->sock.fd);
730 if (ret < 0) {
731 PERROR("relayd_close default close");
732 }
733 }
734 rsock->sock.fd = -1;
735
736 end:
737 return ret;
738 }
739
740 /*
741 * Send data header structure to the relayd.
742 */
743 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
744 struct lttcomm_relayd_data_hdr *hdr, size_t size)
745 {
746 int ret;
747
748 /* Code flow error. Safety net. */
749 assert(rsock);
750 assert(hdr);
751
752 if (rsock->sock.fd < 0) {
753 return -ECONNRESET;
754 }
755
756 DBG3("Relayd sending data header of size %zu", size);
757
758 /* Again, safety net */
759 if (size == 0) {
760 size = sizeof(struct lttcomm_relayd_data_hdr);
761 }
762
763 /* Only send data header. */
764 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
765 if (ret < 0) {
766 ret = -errno;
767 goto error;
768 }
769
770 /*
771 * The data MUST be sent right after that command for the receive on the
772 * other end to match the size in the header.
773 */
774
775 error:
776 return ret;
777 }
778
779 /*
780 * Send close stream command to the relayd.
781 */
782 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
783 uint64_t last_net_seq_num)
784 {
785 int ret;
786 struct lttcomm_relayd_close_stream msg;
787 struct lttcomm_relayd_generic_reply reply;
788
789 /* Code flow error. Safety net. */
790 assert(rsock);
791
792 DBG("Relayd closing stream id %" PRIu64, stream_id);
793
794 memset(&msg, 0, sizeof(msg));
795 msg.stream_id = htobe64(stream_id);
796 msg.last_net_seq_num = htobe64(last_net_seq_num);
797
798 /* Send command */
799 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
800 if (ret < 0) {
801 goto error;
802 }
803
804 /* Receive response */
805 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
806 if (ret < 0) {
807 goto error;
808 }
809
810 reply.ret_code = be32toh(reply.ret_code);
811
812 /* Return session id or negative ret code. */
813 if (reply.ret_code != LTTNG_OK) {
814 ret = -1;
815 ERR("Relayd close stream replied error %d", reply.ret_code);
816 } else {
817 /* Success */
818 ret = 0;
819 }
820
821 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
822
823 error:
824 return ret;
825 }
826
827 /*
828 * Check for data availability for a given stream id.
829 *
830 * Return 0 if NOT pending, 1 if so and a negative value on error.
831 */
832 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
833 uint64_t last_net_seq_num)
834 {
835 int ret;
836 struct lttcomm_relayd_data_pending msg;
837 struct lttcomm_relayd_generic_reply reply;
838
839 /* Code flow error. Safety net. */
840 assert(rsock);
841
842 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
843
844 memset(&msg, 0, sizeof(msg));
845 msg.stream_id = htobe64(stream_id);
846 msg.last_net_seq_num = htobe64(last_net_seq_num);
847
848 /* Send command */
849 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
850 sizeof(msg), 0);
851 if (ret < 0) {
852 goto error;
853 }
854
855 /* Receive response */
856 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
857 if (ret < 0) {
858 goto error;
859 }
860
861 reply.ret_code = be32toh(reply.ret_code);
862
863 /* Return session id or negative ret code. */
864 if (reply.ret_code >= LTTNG_OK) {
865 ERR("Relayd data pending replied error %d", reply.ret_code);
866 }
867
868 /* At this point, the ret code is either 1 or 0 */
869 ret = reply.ret_code;
870
871 DBG("Relayd data is %s pending for stream id %" PRIu64,
872 ret == 1 ? "" : "NOT", stream_id);
873
874 error:
875 return ret;
876 }
877
878 /*
879 * Check on the relayd side for a quiescent state on the control socket.
880 */
881 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
882 uint64_t metadata_stream_id)
883 {
884 int ret;
885 struct lttcomm_relayd_quiescent_control msg;
886 struct lttcomm_relayd_generic_reply reply;
887
888 /* Code flow error. Safety net. */
889 assert(rsock);
890
891 DBG("Relayd checking quiescent control state");
892
893 memset(&msg, 0, sizeof(msg));
894 msg.stream_id = htobe64(metadata_stream_id);
895
896 /* Send command */
897 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
898 if (ret < 0) {
899 goto error;
900 }
901
902 /* Receive response */
903 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
904 if (ret < 0) {
905 goto error;
906 }
907
908 reply.ret_code = be32toh(reply.ret_code);
909
910 /* Return session id or negative ret code. */
911 if (reply.ret_code != LTTNG_OK) {
912 ret = -1;
913 ERR("Relayd quiescent control replied error %d", reply.ret_code);
914 goto error;
915 }
916
917 /* Control socket is quiescent */
918 return 0;
919
920 error:
921 return ret;
922 }
923
924 /*
925 * Begin a data pending command for a specific session id.
926 */
927 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
928 {
929 int ret;
930 struct lttcomm_relayd_begin_data_pending msg;
931 struct lttcomm_relayd_generic_reply reply;
932
933 /* Code flow error. Safety net. */
934 assert(rsock);
935
936 DBG("Relayd begin data pending");
937
938 memset(&msg, 0, sizeof(msg));
939 msg.session_id = htobe64(id);
940
941 /* Send command */
942 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
943 if (ret < 0) {
944 goto error;
945 }
946
947 /* Receive response */
948 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
949 if (ret < 0) {
950 goto error;
951 }
952
953 reply.ret_code = be32toh(reply.ret_code);
954
955 /* Return session id or negative ret code. */
956 if (reply.ret_code != LTTNG_OK) {
957 ret = -1;
958 ERR("Relayd begin data pending replied error %d", reply.ret_code);
959 goto error;
960 }
961
962 return 0;
963
964 error:
965 return ret;
966 }
967
968 /*
969 * End a data pending command for a specific session id.
970 *
971 * Return 0 on success and set is_data_inflight to 0 if no data is being
972 * streamed or 1 if it is the case.
973 */
974 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
975 unsigned int *is_data_inflight)
976 {
977 int ret, recv_ret;
978 struct lttcomm_relayd_end_data_pending msg;
979 struct lttcomm_relayd_generic_reply reply;
980
981 /* Code flow error. Safety net. */
982 assert(rsock);
983
984 DBG("Relayd end data pending");
985
986 memset(&msg, 0, sizeof(msg));
987 msg.session_id = htobe64(id);
988
989 /* Send command */
990 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
991 if (ret < 0) {
992 goto error;
993 }
994
995 /* Receive response */
996 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
997 if (ret < 0) {
998 goto error;
999 }
1000
1001 recv_ret = be32toh(reply.ret_code);
1002 if (recv_ret < 0) {
1003 ret = recv_ret;
1004 goto error;
1005 }
1006
1007 *is_data_inflight = recv_ret;
1008
1009 DBG("Relayd end data pending is data inflight: %d", recv_ret);
1010
1011 return 0;
1012
1013 error:
1014 return ret;
1015 }
1016
1017 /*
1018 * Send index to the relayd.
1019 */
1020 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
1021 struct ctf_packet_index *index, uint64_t relay_stream_id,
1022 uint64_t net_seq_num)
1023 {
1024 int ret;
1025 struct lttcomm_relayd_index msg;
1026 struct lttcomm_relayd_generic_reply reply;
1027
1028 /* Code flow error. Safety net. */
1029 assert(rsock);
1030
1031 if (rsock->minor < 4) {
1032 DBG("Not sending indexes before protocol 2.4");
1033 ret = 0;
1034 goto error;
1035 }
1036
1037 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1038
1039 memset(&msg, 0, sizeof(msg));
1040 msg.relay_stream_id = htobe64(relay_stream_id);
1041 msg.net_seq_num = htobe64(net_seq_num);
1042
1043 /* The index is already in big endian. */
1044 msg.packet_size = index->packet_size;
1045 msg.content_size = index->content_size;
1046 msg.timestamp_begin = index->timestamp_begin;
1047 msg.timestamp_end = index->timestamp_end;
1048 msg.events_discarded = index->events_discarded;
1049 msg.stream_id = index->stream_id;
1050
1051 if (rsock->minor >= 8) {
1052 msg.stream_instance_id = index->stream_instance_id;
1053 msg.packet_seq_num = index->packet_seq_num;
1054 }
1055
1056 /* Send command */
1057 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1058 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1059 rsock->minor),
1060 lttng_to_index_minor(rsock->major, rsock->minor)),
1061 0);
1062 if (ret < 0) {
1063 goto error;
1064 }
1065
1066 /* Receive response */
1067 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1068 if (ret < 0) {
1069 goto error;
1070 }
1071
1072 reply.ret_code = be32toh(reply.ret_code);
1073
1074 /* Return session id or negative ret code. */
1075 if (reply.ret_code != LTTNG_OK) {
1076 ret = -1;
1077 ERR("Relayd send index replied error %d", reply.ret_code);
1078 } else {
1079 /* Success */
1080 ret = 0;
1081 }
1082
1083 error:
1084 return ret;
1085 }
1086
1087 /*
1088 * Ask the relay to reset the metadata trace file (regeneration).
1089 */
1090 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1091 uint64_t stream_id, uint64_t version)
1092 {
1093 int ret;
1094 struct lttcomm_relayd_reset_metadata msg;
1095 struct lttcomm_relayd_generic_reply reply;
1096
1097 /* Code flow error. Safety net. */
1098 assert(rsock);
1099
1100 /* Should have been prevented by the sessiond. */
1101 if (rsock->minor < 8) {
1102 ERR("Metadata regeneration unsupported before 2.8");
1103 ret = -1;
1104 goto error;
1105 }
1106
1107 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1108
1109 memset(&msg, 0, sizeof(msg));
1110 msg.stream_id = htobe64(stream_id);
1111 msg.version = htobe64(version);
1112
1113 /* Send command */
1114 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1115 if (ret < 0) {
1116 goto error;
1117 }
1118
1119 /* Receive response */
1120 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1121 if (ret < 0) {
1122 goto error;
1123 }
1124
1125 reply.ret_code = be32toh(reply.ret_code);
1126
1127 /* Return session id or negative ret code. */
1128 if (reply.ret_code != LTTNG_OK) {
1129 ret = -1;
1130 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1131 } else {
1132 /* Success */
1133 ret = 0;
1134 }
1135
1136 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1137
1138 error:
1139 return ret;
1140 }
1141
1142 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
1143 unsigned int stream_count, const uint64_t *new_chunk_id,
1144 const struct relayd_stream_rotation_position *positions)
1145 {
1146 int ret;
1147 unsigned int i;
1148 struct lttng_dynamic_buffer payload;
1149 struct lttcomm_relayd_generic_reply reply = {};
1150 const struct lttcomm_relayd_rotate_streams msg = {
1151 .stream_count = htobe32((uint32_t) stream_count),
1152 .new_chunk_id = (typeof(msg.new_chunk_id)) {
1153 .is_set = !!new_chunk_id,
1154 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1155 },
1156 };
1157 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1158 const char *new_chunk_id_str;
1159
1160 lttng_dynamic_buffer_init(&payload);
1161
1162 /* Code flow error. Safety net. */
1163 assert(sock);
1164
1165 if (new_chunk_id) {
1166 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1167 "%" PRIu64, *new_chunk_id);
1168 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1169 new_chunk_id_str = "formatting error";
1170 } else {
1171 new_chunk_id_str = new_chunk_id_buf;
1172 }
1173 } else {
1174 new_chunk_id_str = "none";
1175 }
1176
1177 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1178 new_chunk_id_str, stream_count);
1179
1180 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1181 if (ret) {
1182 ERR("Failed to allocate \"rotate streams\" command payload");
1183 goto error;
1184 }
1185
1186 for (i = 0; i < stream_count; i++) {
1187 const struct relayd_stream_rotation_position *position =
1188 &positions[i];
1189 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1190 .stream_id = htobe64(position->stream_id),
1191 .rotate_at_seq_num = htobe64(
1192 position->rotate_at_seq_num),
1193 };
1194
1195 DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
1196 position->stream_id,
1197 position->rotate_at_seq_num);
1198 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1199 sizeof(comm_position));
1200 if (ret) {
1201 ERR("Failed to allocate \"rotate streams\" command payload");
1202 goto error;
1203 }
1204 }
1205
1206 /* Send command. */
1207 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1208 payload.size, 0);
1209 if (ret < 0) {
1210 ERR("Failed to send \"rotate stream\" command");
1211 goto error;
1212 }
1213
1214 /* Receive response. */
1215 ret = recv_reply(sock, &reply, sizeof(reply));
1216 if (ret < 0) {
1217 ERR("Failed to receive \"rotate streams\" command reply");
1218 goto error;
1219 }
1220
1221 reply.ret_code = be32toh(reply.ret_code);
1222 if (reply.ret_code != LTTNG_OK) {
1223 ret = -1;
1224 ERR("Relayd rotate streams replied error %d", reply.ret_code);
1225 } else {
1226 /* Success. */
1227 ret = 0;
1228 DBG("Relayd rotated streams successfully");
1229 }
1230
1231 error:
1232 lttng_dynamic_buffer_reset(&payload);
1233 return ret;
1234 }
1235
1236 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1237 struct lttng_trace_chunk *chunk)
1238 {
1239 int ret = 0;
1240 enum lttng_trace_chunk_status status;
1241 struct lttcomm_relayd_create_trace_chunk msg = {};
1242 struct lttcomm_relayd_generic_reply reply = {};
1243 struct lttng_dynamic_buffer payload;
1244 uint64_t chunk_id;
1245 time_t creation_timestamp;
1246 const char *chunk_name;
1247 size_t chunk_name_length;
1248 bool overridden_name;
1249
1250 lttng_dynamic_buffer_init(&payload);
1251
1252 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1253 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1254 ret = -1;
1255 goto end;
1256 }
1257
1258 status = lttng_trace_chunk_get_creation_timestamp(
1259 chunk, &creation_timestamp);
1260 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1261 ret = -1;
1262 goto end;
1263 }
1264
1265 status = lttng_trace_chunk_get_name(
1266 chunk, &chunk_name, &overridden_name);
1267 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1268 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1269 ret = -1;
1270 goto end;
1271 }
1272
1273 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
1274 msg = (typeof(msg)){
1275 .chunk_id = htobe64(chunk_id),
1276 .creation_timestamp = htobe64((uint64_t) creation_timestamp),
1277 .override_name_length = htobe32((uint32_t) chunk_name_length),
1278 };
1279
1280 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1281 if (ret) {
1282 goto end;
1283 }
1284 if (chunk_name_length) {
1285 ret = lttng_dynamic_buffer_append(
1286 &payload, chunk_name, chunk_name_length);
1287 if (ret) {
1288 goto end;
1289 }
1290 }
1291
1292 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1293 payload.size, 0);
1294 if (ret < 0) {
1295 ERR("Failed to send trace chunk creation command to relay daemon");
1296 goto end;
1297 }
1298
1299 ret = recv_reply(sock, &reply, sizeof(reply));
1300 if (ret < 0) {
1301 ERR("Failed to receive relay daemon trace chunk creation command reply");
1302 goto end;
1303 }
1304
1305 reply.ret_code = be32toh(reply.ret_code);
1306 if (reply.ret_code != LTTNG_OK) {
1307 ret = -1;
1308 ERR("Relayd trace chunk create replied error %d",
1309 reply.ret_code);
1310 } else {
1311 ret = 0;
1312 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1313 chunk_id);
1314 }
1315
1316 end:
1317 lttng_dynamic_buffer_reset(&payload);
1318 return ret;
1319 }
1320
1321 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1322 struct lttng_trace_chunk *chunk)
1323 {
1324 int ret = 0;
1325 enum lttng_trace_chunk_status status;
1326 struct lttcomm_relayd_close_trace_chunk msg = {};
1327 struct lttcomm_relayd_generic_reply reply = {};
1328 uint64_t chunk_id;
1329 time_t close_timestamp;
1330 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1331
1332 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1333 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1334 ERR("Failed to get trace chunk id");
1335 ret = -1;
1336 goto end;
1337 }
1338
1339 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1340 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1341 ERR("Failed to get trace chunk close timestamp");
1342 ret = -1;
1343 goto end;
1344 }
1345
1346 status = lttng_trace_chunk_get_close_command(chunk,
1347 &close_command.value);
1348 switch (status) {
1349 case LTTNG_TRACE_CHUNK_STATUS_OK:
1350 close_command.is_set = 1;
1351 break;
1352 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1353 break;
1354 default:
1355 ERR("Failed to get trace chunk close command");
1356 ret = -1;
1357 goto end;
1358 }
1359
1360 msg = (typeof(msg)){
1361 .chunk_id = htobe64(chunk_id),
1362 .close_timestamp = htobe64((uint64_t) close_timestamp),
1363 .close_command = {
1364 .value = htobe32((uint32_t) close_command.value),
1365 .is_set = close_command.is_set,
1366 },
1367 };
1368
1369 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1370 0);
1371 if (ret < 0) {
1372 ERR("Failed to send trace chunk close command to relay daemon");
1373 goto end;
1374 }
1375
1376 ret = recv_reply(sock, &reply, sizeof(reply));
1377 if (ret < 0) {
1378 ERR("Failed to receive relay daemon trace chunk close command reply");
1379 goto end;
1380 }
1381
1382 reply.ret_code = be32toh(reply.ret_code);
1383 if (reply.ret_code != LTTNG_OK) {
1384 ret = -1;
1385 ERR("Relayd trace chunk close replied error %d",
1386 reply.ret_code);
1387 } else {
1388 ret = 0;
1389 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1390 chunk_id);
1391 }
1392 end:
1393 return ret;
1394 }
1395
1396 int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1397 uint64_t chunk_id, bool *chunk_exists)
1398 {
1399 int ret = 0;
1400 struct lttcomm_relayd_trace_chunk_exists msg = {};
1401 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1402
1403 msg = (typeof(msg)){
1404 .chunk_id = htobe64(chunk_id),
1405 };
1406
1407 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1408 0);
1409 if (ret < 0) {
1410 ERR("Failed to send trace chunk exists command to relay daemon");
1411 goto end;
1412 }
1413
1414 ret = recv_reply(sock, &reply, sizeof(reply));
1415 if (ret < 0) {
1416 ERR("Failed to receive relay daemon trace chunk close command reply");
1417 goto end;
1418 }
1419
1420 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1421 if (reply.generic.ret_code != LTTNG_OK) {
1422 ret = -1;
1423 ERR("Relayd trace chunk close replied error %d",
1424 reply.generic.ret_code);
1425 } else {
1426 ret = 0;
1427 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1428 ", exists = %s", chunk_id,
1429 reply.trace_chunk_exists ? "true" : "false");
1430 *chunk_exists = !!reply.trace_chunk_exists;
1431 }
1432 end:
1433 return ret;
1434 }
This page took 0.097422 seconds and 4 git commands to generate.