Add an internal uuid formatting utility
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/compat/endian.h>
29 #include <common/compat/string.h>
30 #include <common/sessiond-comm/relayd.h>
31 #include <common/index/ctf-index.h>
32
33 #include "relayd.h"
34
35 /*
36 * Send command. Fill up the header and append the data.
37 */
38 static int send_command(struct lttcomm_relayd_sock *rsock,
39 enum lttcomm_relayd_command cmd, const void *data, size_t size,
40 int flags)
41 {
42 int ret;
43 struct lttcomm_relayd_hdr header;
44 char *buf;
45 uint64_t buf_size = sizeof(header);
46
47 if (rsock->sock.fd < 0) {
48 return -ECONNRESET;
49 }
50
51 if (data) {
52 buf_size += size;
53 }
54
55 buf = zmalloc(buf_size);
56 if (buf == NULL) {
57 PERROR("zmalloc relayd send command buf");
58 ret = -1;
59 goto alloc_error;
60 }
61
62 memset(&header, 0, sizeof(header));
63 header.cmd = htobe32(cmd);
64 header.data_size = htobe64(size);
65
66 /* Zeroed for now since not used. */
67 header.cmd_version = 0;
68 header.circuit_id = 0;
69
70 /* Prepare buffer to send. */
71 memcpy(buf, &header, sizeof(header));
72 if (data) {
73 memcpy(buf + sizeof(header), data, size);
74 }
75
76 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
77 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
78 if (ret < 0) {
79 PERROR("Failed to send command %d of size %" PRIu64,
80 (int) cmd, buf_size);
81 ret = -errno;
82 goto error;
83 }
84 error:
85 free(buf);
86 alloc_error:
87 return ret;
88 }
89
90 /*
91 * Receive reply data on socket. This MUST be call after send_command or else
92 * could result in unexpected behavior(s).
93 */
94 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
95 {
96 int ret;
97
98 if (rsock->sock.fd < 0) {
99 return -ECONNRESET;
100 }
101
102 DBG3("Relayd waiting for reply of size %zu", size);
103
104 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
105 if (ret <= 0 || ret != size) {
106 if (ret == 0) {
107 /* Orderly shutdown. */
108 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
109 } else {
110 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
111 rsock->sock.fd, size, ret);
112 }
113 /* Always return -1 here and the caller can use errno. */
114 ret = -1;
115 goto error;
116 }
117
118 error:
119 return ret;
120 }
121
122 /*
123 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name & hostname)
124 * have no length restriction on the sender side.
125 * Length for both payloads is stored in the msg struct. A new dynamic size
126 * payload size is introduced.
127 */
128 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
129 char *session_name, char *hostname,
130 int session_live_timer, unsigned int snapshot)
131 {
132 int ret;
133 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
134 size_t session_name_len;
135 size_t hostname_len;
136 size_t msg_length;
137
138 /* The two names are sent with a '\0' delimiter between them. */
139 session_name_len = strlen(session_name) + 1;
140 hostname_len = strlen(hostname) + 1;
141
142 msg_length = sizeof(*msg) + session_name_len + hostname_len;
143 msg = zmalloc(msg_length);
144 if (!msg) {
145 PERROR("zmalloc create_session_2_11 command message");
146 ret = -1;
147 goto error;
148 }
149
150 assert(session_name_len <= UINT32_MAX);
151 msg->session_name_len = htobe32(session_name_len);
152
153 assert(hostname_len <= UINT32_MAX);
154 msg->hostname_len = htobe32(hostname_len);
155
156 if (lttng_strncpy(msg->names, session_name, session_name_len)) {
157 ret = -1;
158 goto error;
159 }
160 if (lttng_strncpy(msg->names + session_name_len, hostname, hostname_len)) {
161 ret = -1;
162 goto error;
163 }
164
165 msg->live_timer = htobe32(session_live_timer);
166 msg->snapshot = !!snapshot;
167
168 /* Send command */
169 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
170 if (ret < 0) {
171 goto error;
172 }
173 error:
174 free(msg);
175 return ret;
176 }
177 /*
178 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
179 * support the live reading capability.
180 */
181 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
182 char *session_name, char *hostname, int session_live_timer,
183 unsigned int snapshot)
184 {
185 int ret;
186 struct lttcomm_relayd_create_session_2_4 msg;
187
188 if (lttng_strncpy(msg.session_name, session_name,
189 sizeof(msg.session_name))) {
190 ret = -1;
191 goto error;
192 }
193 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
194 ret = -1;
195 goto error;
196 }
197 msg.live_timer = htobe32(session_live_timer);
198 msg.snapshot = htobe32(snapshot);
199
200 /* Send command */
201 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
202 if (ret < 0) {
203 goto error;
204 }
205
206 error:
207 return ret;
208 }
209
210 /*
211 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
212 */
213 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
214 {
215 int ret;
216
217 /* Send command */
218 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
219 if (ret < 0) {
220 goto error;
221 }
222
223 error:
224 return ret;
225 }
226
227 /*
228 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
229 * set session_id of the relayd if we have a successful reply from the relayd.
230 *
231 * On success, return 0 else a negative value which is either an errno error or
232 * a lttng error code from the relayd.
233 */
234 int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id,
235 char *session_name, char *hostname, int session_live_timer,
236 unsigned int snapshot)
237 {
238 int ret;
239 struct lttcomm_relayd_status_session reply;
240
241 assert(rsock);
242 assert(session_id);
243
244 DBG("Relayd create session");
245
246 if (rsock->minor < 4) {
247 /* From 2.1 to 2.3 */
248 ret = relayd_create_session_2_1(rsock);
249 } else if (rsock->minor >= 4 && rsock->minor < 11) {
250 /* From 2.4 to 2.10 */
251 ret = relayd_create_session_2_4(rsock, session_name,
252 hostname, session_live_timer, snapshot);
253 } else {
254 /* From 2.11 to ... */
255 ret = relayd_create_session_2_11(rsock, session_name,
256 hostname, session_live_timer, snapshot);
257 }
258
259 if (ret < 0) {
260 goto error;
261 }
262
263 /* Receive response */
264 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
265 if (ret < 0) {
266 goto error;
267 }
268
269 reply.session_id = be64toh(reply.session_id);
270 reply.ret_code = be32toh(reply.ret_code);
271
272 /* Return session id or negative ret code. */
273 if (reply.ret_code != LTTNG_OK) {
274 ret = -1;
275 ERR("Relayd create session replied error %d", reply.ret_code);
276 goto error;
277 } else {
278 ret = 0;
279 *session_id = reply.session_id;
280 }
281
282 DBG("Relayd session created with id %" PRIu64, reply.session_id);
283
284 error:
285 return ret;
286 }
287
288 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
289 const char *channel_name, const char *pathname)
290 {
291 int ret;
292 struct lttcomm_relayd_add_stream msg;
293
294 memset(&msg, 0, sizeof(msg));
295 if (lttng_strncpy(msg.channel_name, channel_name,
296 sizeof(msg.channel_name))) {
297 ret = -1;
298 goto error;
299 }
300
301 if (lttng_strncpy(msg.pathname, pathname,
302 sizeof(msg.pathname))) {
303 ret = -1;
304 goto error;
305 }
306
307 /* Send command */
308 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
309 if (ret < 0) {
310 ret = -1;
311 goto error;
312 }
313 ret = 0;
314 error:
315 return ret;
316 }
317
318 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
319 const char *channel_name, const char *pathname,
320 uint64_t tracefile_size, uint64_t tracefile_count)
321 {
322 int ret;
323 struct lttcomm_relayd_add_stream_2_2 msg;
324
325 memset(&msg, 0, sizeof(msg));
326 /* Compat with relayd 2.2 to 2.10 */
327 if (lttng_strncpy(msg.channel_name, channel_name,
328 sizeof(msg.channel_name))) {
329 ret = -1;
330 goto error;
331 }
332 if (lttng_strncpy(msg.pathname, pathname,
333 sizeof(msg.pathname))) {
334 ret = -1;
335 goto error;
336 }
337 msg.tracefile_size = htobe64(tracefile_size);
338 msg.tracefile_count = htobe64(tracefile_count);
339
340 /* Send command */
341 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
342 if (ret < 0) {
343 goto error;
344 }
345 ret = 0;
346 error:
347 return ret;
348 }
349
350 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
351 const char *channel_name, const char *pathname,
352 uint64_t tracefile_size, uint64_t tracefile_count,
353 uint64_t trace_archive_id)
354 {
355 int ret;
356 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
357 size_t channel_name_len;
358 size_t pathname_len;
359 size_t msg_length;
360
361 /* The two names are sent with a '\0' delimiter between them. */
362 channel_name_len = strlen(channel_name) + 1;
363 pathname_len = strlen(pathname) + 1;
364
365 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
366 msg = zmalloc(msg_length);
367 if (!msg) {
368 PERROR("zmalloc add_stream_2_11 command message");
369 ret = -1;
370 goto error;
371 }
372
373 assert(channel_name_len <= UINT32_MAX);
374 msg->channel_name_len = htobe32(channel_name_len);
375
376 assert(pathname_len <= UINT32_MAX);
377 msg->pathname_len = htobe32(pathname_len);
378
379 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
380 ret = -1;
381 goto error;
382 }
383 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
384 ret = -1;
385 goto error;
386 }
387
388 msg->tracefile_size = htobe64(tracefile_size);
389 msg->tracefile_count = htobe64(tracefile_count);
390 msg->trace_archive_id = htobe64(trace_archive_id);
391
392 /* Send command */
393 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
394 if (ret < 0) {
395 goto error;
396 }
397 ret = 0;
398 error:
399 free(msg);
400 return ret;
401 }
402
403 /*
404 * Add stream on the relayd and assign stream handle to the stream_id argument.
405 *
406 * On success return 0 else return ret_code negative value.
407 */
408 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
409 const char *pathname, uint64_t *stream_id,
410 uint64_t tracefile_size, uint64_t tracefile_count,
411 uint64_t trace_archive_id)
412 {
413 int ret;
414 struct lttcomm_relayd_status_stream reply;
415
416 /* Code flow error. Safety net. */
417 assert(rsock);
418 assert(channel_name);
419 assert(pathname);
420
421 DBG("Relayd adding stream for channel name %s", channel_name);
422
423 /* Compat with relayd 2.1 */
424 if (rsock->minor == 1) {
425 /* For 2.1 */
426 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
427
428 } else if (rsock->minor > 1 && rsock->minor < 11) {
429 /* From 2.2 to 2.10 */
430 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
431 tracefile_size, tracefile_count);
432 } else {
433 /* From 2.11 to ...*/
434 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
435 tracefile_size, tracefile_count,
436 trace_archive_id);
437 }
438
439 if (ret) {
440 ret = -1;
441 goto error;
442 }
443
444 /* Waiting for reply */
445 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
446 if (ret < 0) {
447 goto error;
448 }
449
450 /* Back to host bytes order. */
451 reply.handle = be64toh(reply.handle);
452 reply.ret_code = be32toh(reply.ret_code);
453
454 /* Return session id or negative ret code. */
455 if (reply.ret_code != LTTNG_OK) {
456 ret = -1;
457 ERR("Relayd add stream replied error %d", reply.ret_code);
458 } else {
459 /* Success */
460 ret = 0;
461 *stream_id = reply.handle;
462 }
463
464 DBG("Relayd stream added successfully with handle %" PRIu64,
465 reply.handle);
466
467 error:
468 return ret;
469 }
470
471 /*
472 * Inform the relay that all the streams for the current channel has been sent.
473 *
474 * On success return 0 else return ret_code negative value.
475 */
476 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
477 {
478 int ret;
479 struct lttcomm_relayd_generic_reply reply;
480
481 /* Code flow error. Safety net. */
482 assert(rsock);
483
484 DBG("Relayd sending streams sent.");
485
486 /* This feature was introduced in 2.4, ignore it for earlier versions. */
487 if (rsock->minor < 4) {
488 ret = 0;
489 goto end;
490 }
491
492 /* Send command */
493 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
494 if (ret < 0) {
495 goto error;
496 }
497
498 /* Waiting for reply */
499 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
500 if (ret < 0) {
501 goto error;
502 }
503
504 /* Back to host bytes order. */
505 reply.ret_code = be32toh(reply.ret_code);
506
507 /* Return session id or negative ret code. */
508 if (reply.ret_code != LTTNG_OK) {
509 ret = -1;
510 ERR("Relayd streams sent replied error %d", reply.ret_code);
511 goto error;
512 } else {
513 /* Success */
514 ret = 0;
515 }
516
517 DBG("Relayd streams sent success");
518
519 error:
520 end:
521 return ret;
522 }
523
524 /*
525 * Check version numbers on the relayd.
526 * If major versions are compatible, we assign minor_to_use to the
527 * minor version of the procotol we are going to use for this session.
528 *
529 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
530 * otherwise, or a negative value on network errors.
531 */
532 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
533 {
534 int ret;
535 struct lttcomm_relayd_version msg;
536
537 /* Code flow error. Safety net. */
538 assert(rsock);
539
540 DBG("Relayd version check for major.minor %u.%u", rsock->major,
541 rsock->minor);
542
543 memset(&msg, 0, sizeof(msg));
544 /* Prepare network byte order before transmission. */
545 msg.major = htobe32(rsock->major);
546 msg.minor = htobe32(rsock->minor);
547
548 /* Send command */
549 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
550 if (ret < 0) {
551 goto error;
552 }
553
554 /* Receive response */
555 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
556 if (ret < 0) {
557 goto error;
558 }
559
560 /* Set back to host bytes order */
561 msg.major = be32toh(msg.major);
562 msg.minor = be32toh(msg.minor);
563
564 /*
565 * Only validate the major version. If the other side is higher,
566 * communication is not possible. Only major version equal can talk to each
567 * other. If the minor version differs, the lowest version is used by both
568 * sides.
569 */
570 if (msg.major != rsock->major) {
571 /* Not compatible */
572 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
573 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
574 msg.major, rsock->major);
575 goto error;
576 }
577
578 /*
579 * If the relayd's minor version is higher, it will adapt to our version so
580 * we can continue to use the latest relayd communication data structure.
581 * If the received minor version is higher, the relayd should adapt to us.
582 */
583 if (rsock->minor > msg.minor) {
584 rsock->minor = msg.minor;
585 }
586
587 /* Version number compatible */
588 DBG2("Relayd version is compatible, using protocol version %u.%u",
589 rsock->major, rsock->minor);
590 ret = 0;
591
592 error:
593 return ret;
594 }
595
596 /*
597 * Add stream on the relayd and assign stream handle to the stream_id argument.
598 *
599 * On success return 0 else return ret_code negative value.
600 */
601 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
602 {
603 int ret;
604
605 /* Code flow error. Safety net. */
606 assert(rsock);
607
608 DBG("Relayd sending metadata of size %zu", len);
609
610 /* Send command */
611 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
612 if (ret < 0) {
613 goto error;
614 }
615
616 DBG2("Relayd metadata added successfully");
617
618 /*
619 * After that call, the metadata data MUST be sent to the relayd so the
620 * receive size on the other end matches the len of the metadata packet
621 * header. This is why we don't wait for a reply here.
622 */
623
624 error:
625 return ret;
626 }
627
628 /*
629 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
630 */
631 int relayd_connect(struct lttcomm_relayd_sock *rsock)
632 {
633 /* Code flow error. Safety net. */
634 assert(rsock);
635
636 if (!rsock->sock.ops) {
637 /*
638 * Attempting a connect on a non-initialized socket.
639 */
640 return -ECONNRESET;
641 }
642
643 DBG3("Relayd connect ...");
644
645 return rsock->sock.ops->connect(&rsock->sock);
646 }
647
648 /*
649 * Close relayd socket with an allocated lttcomm_relayd_sock.
650 *
651 * If no socket operations are found, simply return 0 meaning that everything
652 * is fine. Without operations, the socket can not possibly be opened or used.
653 * This is possible if the socket was allocated but not created. However, the
654 * caller could simply use it to store a valid file descriptor for instance
655 * passed over a Unix socket and call this to cleanup but still without a valid
656 * ops pointer.
657 *
658 * Return the close returned value. On error, a negative value is usually
659 * returned back from close(2).
660 */
661 int relayd_close(struct lttcomm_relayd_sock *rsock)
662 {
663 int ret;
664
665 /* Code flow error. Safety net. */
666 assert(rsock);
667
668 /* An invalid fd is fine, return success. */
669 if (rsock->sock.fd < 0) {
670 ret = 0;
671 goto end;
672 }
673
674 DBG3("Relayd closing socket %d", rsock->sock.fd);
675
676 if (rsock->sock.ops) {
677 ret = rsock->sock.ops->close(&rsock->sock);
678 } else {
679 /* Default call if no specific ops found. */
680 ret = close(rsock->sock.fd);
681 if (ret < 0) {
682 PERROR("relayd_close default close");
683 }
684 }
685 rsock->sock.fd = -1;
686
687 end:
688 return ret;
689 }
690
691 /*
692 * Send data header structure to the relayd.
693 */
694 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
695 struct lttcomm_relayd_data_hdr *hdr, size_t size)
696 {
697 int ret;
698
699 /* Code flow error. Safety net. */
700 assert(rsock);
701 assert(hdr);
702
703 if (rsock->sock.fd < 0) {
704 return -ECONNRESET;
705 }
706
707 DBG3("Relayd sending data header of size %zu", size);
708
709 /* Again, safety net */
710 if (size == 0) {
711 size = sizeof(struct lttcomm_relayd_data_hdr);
712 }
713
714 /* Only send data header. */
715 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
716 if (ret < 0) {
717 ret = -errno;
718 goto error;
719 }
720
721 /*
722 * The data MUST be sent right after that command for the receive on the
723 * other end to match the size in the header.
724 */
725
726 error:
727 return ret;
728 }
729
730 /*
731 * Send close stream command to the relayd.
732 */
733 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
734 uint64_t last_net_seq_num)
735 {
736 int ret;
737 struct lttcomm_relayd_close_stream msg;
738 struct lttcomm_relayd_generic_reply reply;
739
740 /* Code flow error. Safety net. */
741 assert(rsock);
742
743 DBG("Relayd closing stream id %" PRIu64, stream_id);
744
745 memset(&msg, 0, sizeof(msg));
746 msg.stream_id = htobe64(stream_id);
747 msg.last_net_seq_num = htobe64(last_net_seq_num);
748
749 /* Send command */
750 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
751 if (ret < 0) {
752 goto error;
753 }
754
755 /* Receive response */
756 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
757 if (ret < 0) {
758 goto error;
759 }
760
761 reply.ret_code = be32toh(reply.ret_code);
762
763 /* Return session id or negative ret code. */
764 if (reply.ret_code != LTTNG_OK) {
765 ret = -1;
766 ERR("Relayd close stream replied error %d", reply.ret_code);
767 } else {
768 /* Success */
769 ret = 0;
770 }
771
772 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
773
774 error:
775 return ret;
776 }
777
778 /*
779 * Check for data availability for a given stream id.
780 *
781 * Return 0 if NOT pending, 1 if so and a negative value on error.
782 */
783 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
784 uint64_t last_net_seq_num)
785 {
786 int ret;
787 struct lttcomm_relayd_data_pending msg;
788 struct lttcomm_relayd_generic_reply reply;
789
790 /* Code flow error. Safety net. */
791 assert(rsock);
792
793 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
794
795 memset(&msg, 0, sizeof(msg));
796 msg.stream_id = htobe64(stream_id);
797 msg.last_net_seq_num = htobe64(last_net_seq_num);
798
799 /* Send command */
800 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
801 sizeof(msg), 0);
802 if (ret < 0) {
803 goto error;
804 }
805
806 /* Receive response */
807 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
808 if (ret < 0) {
809 goto error;
810 }
811
812 reply.ret_code = be32toh(reply.ret_code);
813
814 /* Return session id or negative ret code. */
815 if (reply.ret_code >= LTTNG_OK) {
816 ERR("Relayd data pending replied error %d", reply.ret_code);
817 }
818
819 /* At this point, the ret code is either 1 or 0 */
820 ret = reply.ret_code;
821
822 DBG("Relayd data is %s pending for stream id %" PRIu64,
823 ret == 1 ? "" : "NOT", stream_id);
824
825 error:
826 return ret;
827 }
828
829 /*
830 * Check on the relayd side for a quiescent state on the control socket.
831 */
832 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
833 uint64_t metadata_stream_id)
834 {
835 int ret;
836 struct lttcomm_relayd_quiescent_control msg;
837 struct lttcomm_relayd_generic_reply reply;
838
839 /* Code flow error. Safety net. */
840 assert(rsock);
841
842 DBG("Relayd checking quiescent control state");
843
844 memset(&msg, 0, sizeof(msg));
845 msg.stream_id = htobe64(metadata_stream_id);
846
847 /* Send command */
848 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
849 if (ret < 0) {
850 goto error;
851 }
852
853 /* Receive response */
854 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
855 if (ret < 0) {
856 goto error;
857 }
858
859 reply.ret_code = be32toh(reply.ret_code);
860
861 /* Return session id or negative ret code. */
862 if (reply.ret_code != LTTNG_OK) {
863 ret = -1;
864 ERR("Relayd quiescent control replied error %d", reply.ret_code);
865 goto error;
866 }
867
868 /* Control socket is quiescent */
869 return 0;
870
871 error:
872 return ret;
873 }
874
875 /*
876 * Begin a data pending command for a specific session id.
877 */
878 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
879 {
880 int ret;
881 struct lttcomm_relayd_begin_data_pending msg;
882 struct lttcomm_relayd_generic_reply reply;
883
884 /* Code flow error. Safety net. */
885 assert(rsock);
886
887 DBG("Relayd begin data pending");
888
889 memset(&msg, 0, sizeof(msg));
890 msg.session_id = htobe64(id);
891
892 /* Send command */
893 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
894 if (ret < 0) {
895 goto error;
896 }
897
898 /* Receive response */
899 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
900 if (ret < 0) {
901 goto error;
902 }
903
904 reply.ret_code = be32toh(reply.ret_code);
905
906 /* Return session id or negative ret code. */
907 if (reply.ret_code != LTTNG_OK) {
908 ret = -1;
909 ERR("Relayd begin data pending replied error %d", reply.ret_code);
910 goto error;
911 }
912
913 return 0;
914
915 error:
916 return ret;
917 }
918
919 /*
920 * End a data pending command for a specific session id.
921 *
922 * Return 0 on success and set is_data_inflight to 0 if no data is being
923 * streamed or 1 if it is the case.
924 */
925 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
926 unsigned int *is_data_inflight)
927 {
928 int ret, recv_ret;
929 struct lttcomm_relayd_end_data_pending msg;
930 struct lttcomm_relayd_generic_reply reply;
931
932 /* Code flow error. Safety net. */
933 assert(rsock);
934
935 DBG("Relayd end data pending");
936
937 memset(&msg, 0, sizeof(msg));
938 msg.session_id = htobe64(id);
939
940 /* Send command */
941 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
942 if (ret < 0) {
943 goto error;
944 }
945
946 /* Receive response */
947 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
948 if (ret < 0) {
949 goto error;
950 }
951
952 recv_ret = be32toh(reply.ret_code);
953 if (recv_ret < 0) {
954 ret = recv_ret;
955 goto error;
956 }
957
958 *is_data_inflight = recv_ret;
959
960 DBG("Relayd end data pending is data inflight: %d", recv_ret);
961
962 return 0;
963
964 error:
965 return ret;
966 }
967
968 /*
969 * Send index to the relayd.
970 */
971 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
972 struct ctf_packet_index *index, uint64_t relay_stream_id,
973 uint64_t net_seq_num)
974 {
975 int ret;
976 struct lttcomm_relayd_index msg;
977 struct lttcomm_relayd_generic_reply reply;
978
979 /* Code flow error. Safety net. */
980 assert(rsock);
981
982 if (rsock->minor < 4) {
983 DBG("Not sending indexes before protocol 2.4");
984 ret = 0;
985 goto error;
986 }
987
988 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
989
990 memset(&msg, 0, sizeof(msg));
991 msg.relay_stream_id = htobe64(relay_stream_id);
992 msg.net_seq_num = htobe64(net_seq_num);
993
994 /* The index is already in big endian. */
995 msg.packet_size = index->packet_size;
996 msg.content_size = index->content_size;
997 msg.timestamp_begin = index->timestamp_begin;
998 msg.timestamp_end = index->timestamp_end;
999 msg.events_discarded = index->events_discarded;
1000 msg.stream_id = index->stream_id;
1001
1002 if (rsock->minor >= 8) {
1003 msg.stream_instance_id = index->stream_instance_id;
1004 msg.packet_seq_num = index->packet_seq_num;
1005 }
1006
1007 /* Send command */
1008 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1009 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1010 rsock->minor),
1011 lttng_to_index_minor(rsock->major, rsock->minor)),
1012 0);
1013 if (ret < 0) {
1014 goto error;
1015 }
1016
1017 /* Receive response */
1018 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1019 if (ret < 0) {
1020 goto error;
1021 }
1022
1023 reply.ret_code = be32toh(reply.ret_code);
1024
1025 /* Return session id or negative ret code. */
1026 if (reply.ret_code != LTTNG_OK) {
1027 ret = -1;
1028 ERR("Relayd send index replied error %d", reply.ret_code);
1029 } else {
1030 /* Success */
1031 ret = 0;
1032 }
1033
1034 error:
1035 return ret;
1036 }
1037
1038 /*
1039 * Ask the relay to reset the metadata trace file (regeneration).
1040 */
1041 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1042 uint64_t stream_id, uint64_t version)
1043 {
1044 int ret;
1045 struct lttcomm_relayd_reset_metadata msg;
1046 struct lttcomm_relayd_generic_reply reply;
1047
1048 /* Code flow error. Safety net. */
1049 assert(rsock);
1050
1051 /* Should have been prevented by the sessiond. */
1052 if (rsock->minor < 8) {
1053 ERR("Metadata regeneration unsupported before 2.8");
1054 ret = -1;
1055 goto error;
1056 }
1057
1058 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1059
1060 memset(&msg, 0, sizeof(msg));
1061 msg.stream_id = htobe64(stream_id);
1062 msg.version = htobe64(version);
1063
1064 /* Send command */
1065 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1066 if (ret < 0) {
1067 goto error;
1068 }
1069
1070 /* Receive response */
1071 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1072 if (ret < 0) {
1073 goto error;
1074 }
1075
1076 reply.ret_code = be32toh(reply.ret_code);
1077
1078 /* Return session id or negative ret code. */
1079 if (reply.ret_code != LTTNG_OK) {
1080 ret = -1;
1081 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1082 } else {
1083 /* Success */
1084 ret = 0;
1085 }
1086
1087 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1088
1089 error:
1090 return ret;
1091 }
1092
1093 int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1094 const char *new_pathname, uint64_t new_chunk_id,
1095 uint64_t seq_num)
1096 {
1097 int ret;
1098 struct lttcomm_relayd_rotate_stream *msg = NULL;
1099 struct lttcomm_relayd_generic_reply reply;
1100 size_t len;
1101 int msg_len;
1102
1103 /* Code flow error. Safety net. */
1104 assert(rsock);
1105
1106 DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
1107
1108 /* Account for the trailing NULL. */
1109 len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
1110 if (len > LTTNG_PATH_MAX) {
1111 ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
1112 ret = -1;
1113 goto error;
1114 }
1115
1116 msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
1117 msg = zmalloc(msg_len);
1118 if (!msg) {
1119 PERROR("Failed to allocate relayd rotate stream command of %d bytes",
1120 msg_len);
1121 ret = -1;
1122 goto error;
1123 }
1124
1125 if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
1126 ret = -1;
1127 ERR("Failed to copy relayd rotate stream command's new path name");
1128 goto error;
1129 }
1130
1131 msg->pathname_length = htobe32(len);
1132 msg->stream_id = htobe64(stream_id);
1133 msg->new_chunk_id = htobe64(new_chunk_id);
1134 /*
1135 * The seq_num is invalid for metadata streams, but it is ignored on
1136 * the relay.
1137 */
1138 msg->rotate_at_seq_num = htobe64(seq_num);
1139
1140 /* Send command. */
1141 ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
1142 if (ret < 0) {
1143 ERR("Send rotate command");
1144 goto error;
1145 }
1146
1147 /* Receive response. */
1148 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1149 if (ret < 0) {
1150 ERR("Receive rotate reply");
1151 goto error;
1152 }
1153
1154 reply.ret_code = be32toh(reply.ret_code);
1155
1156 /* Return session id or negative ret code. */
1157 if (reply.ret_code != LTTNG_OK) {
1158 ret = -1;
1159 ERR("Relayd rotate stream replied error %d", reply.ret_code);
1160 } else {
1161 /* Success. */
1162 ret = 0;
1163 DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
1164 }
1165
1166 error:
1167 free(msg);
1168 return ret;
1169 }
1170
1171 int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
1172 const char *old_path, const char *new_path)
1173 {
1174 int ret;
1175 struct lttcomm_relayd_rotate_rename *msg = NULL;
1176 struct lttcomm_relayd_generic_reply reply;
1177 size_t old_path_length, new_path_length;
1178 size_t msg_length;
1179
1180 /* Code flow error. Safety net. */
1181 assert(rsock);
1182
1183 DBG("Relayd rename chunk %s to %s", old_path, new_path);
1184
1185 /* The two paths are sent with a '\0' delimiter between them. */
1186 old_path_length = strlen(old_path) + 1;
1187 new_path_length = strlen(new_path) + 1;
1188
1189 msg_length = sizeof(*msg) + old_path_length + new_path_length;
1190 msg = zmalloc(msg_length);
1191 if (!msg) {
1192 PERROR("zmalloc rotate-rename command message");
1193 ret = -1;
1194 goto error;
1195 }
1196
1197 assert(old_path_length <= UINT32_MAX);
1198 msg->old_path_length = htobe32(old_path_length);
1199
1200 assert(new_path_length <= UINT32_MAX);
1201 msg->new_path_length = htobe32(new_path_length);
1202
1203 strcpy(msg->paths, old_path);
1204 strcpy(msg->paths + old_path_length, new_path);
1205
1206 /* Send command */
1207 ret = send_command(rsock, RELAYD_ROTATE_RENAME, (const void *) msg,
1208 msg_length, 0);
1209 if (ret < 0) {
1210 goto error;
1211 }
1212
1213 /* Receive response */
1214 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1215 if (ret < 0) {
1216 goto error;
1217 }
1218
1219 reply.ret_code = be32toh(reply.ret_code);
1220
1221 /* Return session id or negative ret code. */
1222 if (reply.ret_code != LTTNG_OK) {
1223 ret = -1;
1224 ERR("Relayd rotate rename replied error %d", reply.ret_code);
1225 } else {
1226 /* Success */
1227 ret = 0;
1228 }
1229
1230 DBG("Relayd rotate rename completed successfully");
1231
1232 error:
1233 free(msg);
1234 return ret;
1235 }
1236
1237 int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
1238 {
1239 int ret;
1240 struct lttcomm_relayd_rotate_pending msg;
1241 struct lttcomm_relayd_rotate_pending_reply reply;
1242
1243 /* Code flow error. Safety net. */
1244 assert(rsock);
1245
1246 DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
1247 chunk_id);
1248
1249 memset(&msg, 0, sizeof(msg));
1250 msg.chunk_id = htobe64(chunk_id);
1251
1252 /* Send command */
1253 ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
1254 sizeof(msg), 0);
1255 if (ret < 0) {
1256 goto error;
1257 }
1258
1259 /* Receive response */
1260 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1261 if (ret < 0) {
1262 goto error;
1263 }
1264
1265 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1266
1267 /* Return session id or negative ret code. */
1268 if (reply.generic.ret_code != LTTNG_OK) {
1269 ret = -reply.generic.ret_code;
1270 ERR("Relayd rotate pending replied with error %d", ret);
1271 goto error;
1272 } else {
1273 /* No error, just rotate pending state */
1274 if (reply.is_pending == 0 || reply.is_pending == 1) {
1275 ret = reply.is_pending;
1276 DBG("Relayd rotate pending command completed successfully with result \"%s\"",
1277 ret ? "rotation pending" : "rotation NOT pending");
1278 } else {
1279 ret = -LTTNG_ERR_UNK;
1280 }
1281 }
1282
1283 error:
1284 return ret;
1285 }
1286
1287 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
1288 {
1289 int ret;
1290 struct lttcomm_relayd_mkdir *msg;
1291 struct lttcomm_relayd_generic_reply reply;
1292 size_t len;
1293
1294 /* Code flow error. Safety net. */
1295 assert(rsock);
1296
1297 DBG("Relayd mkdir path %s", path);
1298
1299 len = strlen(path) + 1;
1300 msg = zmalloc(sizeof(msg->length) + len);
1301 if (!msg) {
1302 PERROR("Alloc mkdir msg");
1303 ret = -1;
1304 goto error;
1305 }
1306 msg->length = htobe32((uint32_t) len);
1307
1308 if (lttng_strncpy(msg->path, path, len)) {
1309 ret = -1;
1310 goto error;
1311 }
1312
1313 /* Send command */
1314 ret = send_command(rsock, RELAYD_MKDIR, (void *) msg,
1315 sizeof(msg->length) + len, 0);
1316 if (ret < 0) {
1317 goto error;
1318 }
1319
1320 /* Receive response */
1321 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1322 if (ret < 0) {
1323 goto error;
1324 }
1325
1326 reply.ret_code = be32toh(reply.ret_code);
1327
1328 /* Return session id or negative ret code. */
1329 if (reply.ret_code != LTTNG_OK) {
1330 ret = -1;
1331 ERR("Relayd mkdir replied error %d", reply.ret_code);
1332 } else {
1333 /* Success */
1334 ret = 0;
1335 }
1336
1337 DBG("Relayd mkdir completed successfully");
1338
1339 error:
1340 free(msg);
1341 return ret;
1342 }
This page took 0.056314 seconds and 4 git commands to generate.