Cleanup: remove duplicated code in snapshot record command
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/compat/endian.h>
29 #include <common/compat/string.h>
30 #include <common/sessiond-comm/relayd.h>
31 #include <common/index/ctf-index.h>
32
33 #include "relayd.h"
34
35 /*
36 * Send command. Fill up the header and append the data.
37 */
38 static int send_command(struct lttcomm_relayd_sock *rsock,
39 enum lttcomm_relayd_command cmd, const void *data, size_t size,
40 int flags)
41 {
42 int ret;
43 struct lttcomm_relayd_hdr header;
44 char *buf;
45 uint64_t buf_size = sizeof(header);
46
47 if (rsock->sock.fd < 0) {
48 return -ECONNRESET;
49 }
50
51 if (data) {
52 buf_size += size;
53 }
54
55 buf = zmalloc(buf_size);
56 if (buf == NULL) {
57 PERROR("zmalloc relayd send command buf");
58 ret = -1;
59 goto alloc_error;
60 }
61
62 memset(&header, 0, sizeof(header));
63 header.cmd = htobe32(cmd);
64 header.data_size = htobe64(size);
65
66 /* Zeroed for now since not used. */
67 header.cmd_version = 0;
68 header.circuit_id = 0;
69
70 /* Prepare buffer to send. */
71 memcpy(buf, &header, sizeof(header));
72 if (data) {
73 memcpy(buf + sizeof(header), data, size);
74 }
75
76 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
77 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
78 if (ret < 0) {
79 PERROR("Failed to send command %d of size %" PRIu64,
80 (int) cmd, buf_size);
81 ret = -errno;
82 goto error;
83 }
84 error:
85 free(buf);
86 alloc_error:
87 return ret;
88 }
89
90 /*
91 * Receive reply data on socket. This MUST be call after send_command or else
92 * could result in unexpected behavior(s).
93 */
94 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
95 {
96 int ret;
97
98 if (rsock->sock.fd < 0) {
99 return -ECONNRESET;
100 }
101
102 DBG3("Relayd waiting for reply of size %zu", size);
103
104 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
105 if (ret <= 0 || ret != size) {
106 if (ret == 0) {
107 /* Orderly shutdown. */
108 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
109 } else {
110 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
111 rsock->sock.fd, size, ret);
112 }
113 /* Always return -1 here and the caller can use errno. */
114 ret = -1;
115 goto error;
116 }
117
118 error:
119 return ret;
120 }
121
122 /*
123 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name & hostname)
124 * have no length restriction on the sender side.
125 * Length for both payloads is stored in the msg struct. A new dynamic size
126 * payload size is introduced.
127 */
128 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
129 const char *session_name, const char *hostname,
130 int session_live_timer, unsigned int snapshot,
131 uint64_t sessiond_session_id, const lttng_uuid sessiond_uuid)
132 {
133 int ret;
134 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
135 size_t session_name_len;
136 size_t hostname_len;
137 size_t msg_length;
138
139 /* The two names are sent with a '\0' delimiter between them. */
140 session_name_len = strlen(session_name) + 1;
141 hostname_len = strlen(hostname) + 1;
142
143 msg_length = sizeof(*msg) + session_name_len + hostname_len;
144 msg = zmalloc(msg_length);
145 if (!msg) {
146 PERROR("zmalloc create_session_2_11 command message");
147 ret = -1;
148 goto error;
149 }
150
151 assert(session_name_len <= UINT32_MAX);
152 msg->session_name_len = htobe32(session_name_len);
153
154 assert(hostname_len <= UINT32_MAX);
155 msg->hostname_len = htobe32(hostname_len);
156
157 if (lttng_strncpy(msg->names, session_name, session_name_len)) {
158 ret = -1;
159 goto error;
160 }
161 if (lttng_strncpy(msg->names + session_name_len, hostname, hostname_len)) {
162 ret = -1;
163 goto error;
164 }
165
166 msg->live_timer = htobe32(session_live_timer);
167 msg->snapshot = !!snapshot;
168
169 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
170 msg->session_id = htobe64(sessiond_session_id);
171
172 /* Send command */
173 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
174 if (ret < 0) {
175 goto error;
176 }
177 error:
178 free(msg);
179 return ret;
180 }
181 /*
182 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
183 * support the live reading capability.
184 */
185 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
186 const char *session_name, const char *hostname,
187 int session_live_timer, unsigned int snapshot)
188 {
189 int ret;
190 struct lttcomm_relayd_create_session_2_4 msg;
191
192 if (lttng_strncpy(msg.session_name, session_name,
193 sizeof(msg.session_name))) {
194 ret = -1;
195 goto error;
196 }
197 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
198 ret = -1;
199 goto error;
200 }
201 msg.live_timer = htobe32(session_live_timer);
202 msg.snapshot = htobe32(snapshot);
203
204 /* Send command */
205 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
206 if (ret < 0) {
207 goto error;
208 }
209
210 error:
211 return ret;
212 }
213
214 /*
215 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
216 */
217 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
218 {
219 int ret;
220
221 /* Send command */
222 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
223 if (ret < 0) {
224 goto error;
225 }
226
227 error:
228 return ret;
229 }
230
231 /*
232 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
233 * set session_id of the relayd if we have a successful reply from the relayd.
234 *
235 * On success, return 0 else a negative value which is either an errno error or
236 * a lttng error code from the relayd.
237 */
238 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
239 uint64_t *relayd_session_id,
240 const char *session_name, const char *hostname,
241 int session_live_timer,
242 unsigned int snapshot, uint64_t sessiond_session_id,
243 const lttng_uuid sessiond_uuid)
244 {
245 int ret;
246 struct lttcomm_relayd_status_session reply;
247
248 assert(rsock);
249 assert(relayd_session_id);
250
251 DBG("Relayd create session");
252
253 if (rsock->minor < 4) {
254 /* From 2.1 to 2.3 */
255 ret = relayd_create_session_2_1(rsock);
256 } else if (rsock->minor >= 4 && rsock->minor < 11) {
257 /* From 2.4 to 2.10 */
258 ret = relayd_create_session_2_4(rsock, session_name,
259 hostname, session_live_timer, snapshot);
260 } else {
261 /* From 2.11 to ... */
262 ret = relayd_create_session_2_11(rsock, session_name,
263 hostname, session_live_timer, snapshot,
264 sessiond_session_id, sessiond_uuid);
265 }
266
267 if (ret < 0) {
268 goto error;
269 }
270
271 /* Receive response */
272 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
273 if (ret < 0) {
274 goto error;
275 }
276
277 reply.session_id = be64toh(reply.session_id);
278 reply.ret_code = be32toh(reply.ret_code);
279
280 /* Return session id or negative ret code. */
281 if (reply.ret_code != LTTNG_OK) {
282 ret = -1;
283 ERR("Relayd create session replied error %d", reply.ret_code);
284 goto error;
285 } else {
286 ret = 0;
287 *relayd_session_id = reply.session_id;
288 }
289
290 DBG("Relayd session created with id %" PRIu64, reply.session_id);
291
292 error:
293 return ret;
294 }
295
296 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
297 const char *channel_name, const char *pathname)
298 {
299 int ret;
300 struct lttcomm_relayd_add_stream msg;
301
302 memset(&msg, 0, sizeof(msg));
303 if (lttng_strncpy(msg.channel_name, channel_name,
304 sizeof(msg.channel_name))) {
305 ret = -1;
306 goto error;
307 }
308
309 if (lttng_strncpy(msg.pathname, pathname,
310 sizeof(msg.pathname))) {
311 ret = -1;
312 goto error;
313 }
314
315 /* Send command */
316 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
317 if (ret < 0) {
318 ret = -1;
319 goto error;
320 }
321 ret = 0;
322 error:
323 return ret;
324 }
325
326 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
327 const char *channel_name, const char *pathname,
328 uint64_t tracefile_size, uint64_t tracefile_count)
329 {
330 int ret;
331 struct lttcomm_relayd_add_stream_2_2 msg;
332
333 memset(&msg, 0, sizeof(msg));
334 /* Compat with relayd 2.2 to 2.10 */
335 if (lttng_strncpy(msg.channel_name, channel_name,
336 sizeof(msg.channel_name))) {
337 ret = -1;
338 goto error;
339 }
340 if (lttng_strncpy(msg.pathname, pathname,
341 sizeof(msg.pathname))) {
342 ret = -1;
343 goto error;
344 }
345 msg.tracefile_size = htobe64(tracefile_size);
346 msg.tracefile_count = htobe64(tracefile_count);
347
348 /* Send command */
349 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
350 if (ret < 0) {
351 goto error;
352 }
353 ret = 0;
354 error:
355 return ret;
356 }
357
358 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
359 const char *channel_name, const char *pathname,
360 uint64_t tracefile_size, uint64_t tracefile_count,
361 uint64_t trace_archive_id)
362 {
363 int ret;
364 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
365 size_t channel_name_len;
366 size_t pathname_len;
367 size_t msg_length;
368
369 /* The two names are sent with a '\0' delimiter between them. */
370 channel_name_len = strlen(channel_name) + 1;
371 pathname_len = strlen(pathname) + 1;
372
373 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
374 msg = zmalloc(msg_length);
375 if (!msg) {
376 PERROR("zmalloc add_stream_2_11 command message");
377 ret = -1;
378 goto error;
379 }
380
381 assert(channel_name_len <= UINT32_MAX);
382 msg->channel_name_len = htobe32(channel_name_len);
383
384 assert(pathname_len <= UINT32_MAX);
385 msg->pathname_len = htobe32(pathname_len);
386
387 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
388 ret = -1;
389 goto error;
390 }
391 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
392 ret = -1;
393 goto error;
394 }
395
396 msg->tracefile_size = htobe64(tracefile_size);
397 msg->tracefile_count = htobe64(tracefile_count);
398 msg->trace_archive_id = htobe64(trace_archive_id);
399
400 /* Send command */
401 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
402 if (ret < 0) {
403 goto error;
404 }
405 ret = 0;
406 error:
407 free(msg);
408 return ret;
409 }
410
411 /*
412 * Add stream on the relayd and assign stream handle to the stream_id argument.
413 *
414 * On success return 0 else return ret_code negative value.
415 */
416 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
417 const char *pathname, uint64_t *stream_id,
418 uint64_t tracefile_size, uint64_t tracefile_count,
419 uint64_t trace_archive_id)
420 {
421 int ret;
422 struct lttcomm_relayd_status_stream reply;
423
424 /* Code flow error. Safety net. */
425 assert(rsock);
426 assert(channel_name);
427 assert(pathname);
428
429 DBG("Relayd adding stream for channel name %s", channel_name);
430
431 /* Compat with relayd 2.1 */
432 if (rsock->minor == 1) {
433 /* For 2.1 */
434 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
435
436 } else if (rsock->minor > 1 && rsock->minor < 11) {
437 /* From 2.2 to 2.10 */
438 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
439 tracefile_size, tracefile_count);
440 } else {
441 /* From 2.11 to ...*/
442 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
443 tracefile_size, tracefile_count,
444 trace_archive_id);
445 }
446
447 if (ret) {
448 ret = -1;
449 goto error;
450 }
451
452 /* Waiting for reply */
453 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
454 if (ret < 0) {
455 goto error;
456 }
457
458 /* Back to host bytes order. */
459 reply.handle = be64toh(reply.handle);
460 reply.ret_code = be32toh(reply.ret_code);
461
462 /* Return session id or negative ret code. */
463 if (reply.ret_code != LTTNG_OK) {
464 ret = -1;
465 ERR("Relayd add stream replied error %d", reply.ret_code);
466 } else {
467 /* Success */
468 ret = 0;
469 *stream_id = reply.handle;
470 }
471
472 DBG("Relayd stream added successfully with handle %" PRIu64,
473 reply.handle);
474
475 error:
476 return ret;
477 }
478
479 /*
480 * Inform the relay that all the streams for the current channel has been sent.
481 *
482 * On success return 0 else return ret_code negative value.
483 */
484 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
485 {
486 int ret;
487 struct lttcomm_relayd_generic_reply reply;
488
489 /* Code flow error. Safety net. */
490 assert(rsock);
491
492 DBG("Relayd sending streams sent.");
493
494 /* This feature was introduced in 2.4, ignore it for earlier versions. */
495 if (rsock->minor < 4) {
496 ret = 0;
497 goto end;
498 }
499
500 /* Send command */
501 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
502 if (ret < 0) {
503 goto error;
504 }
505
506 /* Waiting for reply */
507 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
508 if (ret < 0) {
509 goto error;
510 }
511
512 /* Back to host bytes order. */
513 reply.ret_code = be32toh(reply.ret_code);
514
515 /* Return session id or negative ret code. */
516 if (reply.ret_code != LTTNG_OK) {
517 ret = -1;
518 ERR("Relayd streams sent replied error %d", reply.ret_code);
519 goto error;
520 } else {
521 /* Success */
522 ret = 0;
523 }
524
525 DBG("Relayd streams sent success");
526
527 error:
528 end:
529 return ret;
530 }
531
532 /*
533 * Check version numbers on the relayd.
534 * If major versions are compatible, we assign minor_to_use to the
535 * minor version of the procotol we are going to use for this session.
536 *
537 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
538 * otherwise, or a negative value on network errors.
539 */
540 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
541 {
542 int ret;
543 struct lttcomm_relayd_version msg;
544
545 /* Code flow error. Safety net. */
546 assert(rsock);
547
548 DBG("Relayd version check for major.minor %u.%u", rsock->major,
549 rsock->minor);
550
551 memset(&msg, 0, sizeof(msg));
552 /* Prepare network byte order before transmission. */
553 msg.major = htobe32(rsock->major);
554 msg.minor = htobe32(rsock->minor);
555
556 /* Send command */
557 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
558 if (ret < 0) {
559 goto error;
560 }
561
562 /* Receive response */
563 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
564 if (ret < 0) {
565 goto error;
566 }
567
568 /* Set back to host bytes order */
569 msg.major = be32toh(msg.major);
570 msg.minor = be32toh(msg.minor);
571
572 /*
573 * Only validate the major version. If the other side is higher,
574 * communication is not possible. Only major version equal can talk to each
575 * other. If the minor version differs, the lowest version is used by both
576 * sides.
577 */
578 if (msg.major != rsock->major) {
579 /* Not compatible */
580 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
581 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
582 msg.major, rsock->major);
583 goto error;
584 }
585
586 /*
587 * If the relayd's minor version is higher, it will adapt to our version so
588 * we can continue to use the latest relayd communication data structure.
589 * If the received minor version is higher, the relayd should adapt to us.
590 */
591 if (rsock->minor > msg.minor) {
592 rsock->minor = msg.minor;
593 }
594
595 /* Version number compatible */
596 DBG2("Relayd version is compatible, using protocol version %u.%u",
597 rsock->major, rsock->minor);
598 ret = 0;
599
600 error:
601 return ret;
602 }
603
604 /*
605 * Add stream on the relayd and assign stream handle to the stream_id argument.
606 *
607 * On success return 0 else return ret_code negative value.
608 */
609 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
610 {
611 int ret;
612
613 /* Code flow error. Safety net. */
614 assert(rsock);
615
616 DBG("Relayd sending metadata of size %zu", len);
617
618 /* Send command */
619 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
620 if (ret < 0) {
621 goto error;
622 }
623
624 DBG2("Relayd metadata added successfully");
625
626 /*
627 * After that call, the metadata data MUST be sent to the relayd so the
628 * receive size on the other end matches the len of the metadata packet
629 * header. This is why we don't wait for a reply here.
630 */
631
632 error:
633 return ret;
634 }
635
636 /*
637 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
638 */
639 int relayd_connect(struct lttcomm_relayd_sock *rsock)
640 {
641 /* Code flow error. Safety net. */
642 assert(rsock);
643
644 if (!rsock->sock.ops) {
645 /*
646 * Attempting a connect on a non-initialized socket.
647 */
648 return -ECONNRESET;
649 }
650
651 DBG3("Relayd connect ...");
652
653 return rsock->sock.ops->connect(&rsock->sock);
654 }
655
656 /*
657 * Close relayd socket with an allocated lttcomm_relayd_sock.
658 *
659 * If no socket operations are found, simply return 0 meaning that everything
660 * is fine. Without operations, the socket can not possibly be opened or used.
661 * This is possible if the socket was allocated but not created. However, the
662 * caller could simply use it to store a valid file descriptor for instance
663 * passed over a Unix socket and call this to cleanup but still without a valid
664 * ops pointer.
665 *
666 * Return the close returned value. On error, a negative value is usually
667 * returned back from close(2).
668 */
669 int relayd_close(struct lttcomm_relayd_sock *rsock)
670 {
671 int ret;
672
673 /* Code flow error. Safety net. */
674 assert(rsock);
675
676 /* An invalid fd is fine, return success. */
677 if (rsock->sock.fd < 0) {
678 ret = 0;
679 goto end;
680 }
681
682 DBG3("Relayd closing socket %d", rsock->sock.fd);
683
684 if (rsock->sock.ops) {
685 ret = rsock->sock.ops->close(&rsock->sock);
686 } else {
687 /* Default call if no specific ops found. */
688 ret = close(rsock->sock.fd);
689 if (ret < 0) {
690 PERROR("relayd_close default close");
691 }
692 }
693 rsock->sock.fd = -1;
694
695 end:
696 return ret;
697 }
698
699 /*
700 * Send data header structure to the relayd.
701 */
702 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
703 struct lttcomm_relayd_data_hdr *hdr, size_t size)
704 {
705 int ret;
706
707 /* Code flow error. Safety net. */
708 assert(rsock);
709 assert(hdr);
710
711 if (rsock->sock.fd < 0) {
712 return -ECONNRESET;
713 }
714
715 DBG3("Relayd sending data header of size %zu", size);
716
717 /* Again, safety net */
718 if (size == 0) {
719 size = sizeof(struct lttcomm_relayd_data_hdr);
720 }
721
722 /* Only send data header. */
723 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
724 if (ret < 0) {
725 ret = -errno;
726 goto error;
727 }
728
729 /*
730 * The data MUST be sent right after that command for the receive on the
731 * other end to match the size in the header.
732 */
733
734 error:
735 return ret;
736 }
737
738 /*
739 * Send close stream command to the relayd.
740 */
741 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
742 uint64_t last_net_seq_num)
743 {
744 int ret;
745 struct lttcomm_relayd_close_stream msg;
746 struct lttcomm_relayd_generic_reply reply;
747
748 /* Code flow error. Safety net. */
749 assert(rsock);
750
751 DBG("Relayd closing stream id %" PRIu64, stream_id);
752
753 memset(&msg, 0, sizeof(msg));
754 msg.stream_id = htobe64(stream_id);
755 msg.last_net_seq_num = htobe64(last_net_seq_num);
756
757 /* Send command */
758 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
759 if (ret < 0) {
760 goto error;
761 }
762
763 /* Receive response */
764 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
765 if (ret < 0) {
766 goto error;
767 }
768
769 reply.ret_code = be32toh(reply.ret_code);
770
771 /* Return session id or negative ret code. */
772 if (reply.ret_code != LTTNG_OK) {
773 ret = -1;
774 ERR("Relayd close stream replied error %d", reply.ret_code);
775 } else {
776 /* Success */
777 ret = 0;
778 }
779
780 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
781
782 error:
783 return ret;
784 }
785
786 /*
787 * Check for data availability for a given stream id.
788 *
789 * Return 0 if NOT pending, 1 if so and a negative value on error.
790 */
791 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
792 uint64_t last_net_seq_num)
793 {
794 int ret;
795 struct lttcomm_relayd_data_pending msg;
796 struct lttcomm_relayd_generic_reply reply;
797
798 /* Code flow error. Safety net. */
799 assert(rsock);
800
801 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
802
803 memset(&msg, 0, sizeof(msg));
804 msg.stream_id = htobe64(stream_id);
805 msg.last_net_seq_num = htobe64(last_net_seq_num);
806
807 /* Send command */
808 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
809 sizeof(msg), 0);
810 if (ret < 0) {
811 goto error;
812 }
813
814 /* Receive response */
815 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
816 if (ret < 0) {
817 goto error;
818 }
819
820 reply.ret_code = be32toh(reply.ret_code);
821
822 /* Return session id or negative ret code. */
823 if (reply.ret_code >= LTTNG_OK) {
824 ERR("Relayd data pending replied error %d", reply.ret_code);
825 }
826
827 /* At this point, the ret code is either 1 or 0 */
828 ret = reply.ret_code;
829
830 DBG("Relayd data is %s pending for stream id %" PRIu64,
831 ret == 1 ? "" : "NOT", stream_id);
832
833 error:
834 return ret;
835 }
836
837 /*
838 * Check on the relayd side for a quiescent state on the control socket.
839 */
840 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
841 uint64_t metadata_stream_id)
842 {
843 int ret;
844 struct lttcomm_relayd_quiescent_control msg;
845 struct lttcomm_relayd_generic_reply reply;
846
847 /* Code flow error. Safety net. */
848 assert(rsock);
849
850 DBG("Relayd checking quiescent control state");
851
852 memset(&msg, 0, sizeof(msg));
853 msg.stream_id = htobe64(metadata_stream_id);
854
855 /* Send command */
856 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
857 if (ret < 0) {
858 goto error;
859 }
860
861 /* Receive response */
862 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
863 if (ret < 0) {
864 goto error;
865 }
866
867 reply.ret_code = be32toh(reply.ret_code);
868
869 /* Return session id or negative ret code. */
870 if (reply.ret_code != LTTNG_OK) {
871 ret = -1;
872 ERR("Relayd quiescent control replied error %d", reply.ret_code);
873 goto error;
874 }
875
876 /* Control socket is quiescent */
877 return 0;
878
879 error:
880 return ret;
881 }
882
883 /*
884 * Begin a data pending command for a specific session id.
885 */
886 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
887 {
888 int ret;
889 struct lttcomm_relayd_begin_data_pending msg;
890 struct lttcomm_relayd_generic_reply reply;
891
892 /* Code flow error. Safety net. */
893 assert(rsock);
894
895 DBG("Relayd begin data pending");
896
897 memset(&msg, 0, sizeof(msg));
898 msg.session_id = htobe64(id);
899
900 /* Send command */
901 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
902 if (ret < 0) {
903 goto error;
904 }
905
906 /* Receive response */
907 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
908 if (ret < 0) {
909 goto error;
910 }
911
912 reply.ret_code = be32toh(reply.ret_code);
913
914 /* Return session id or negative ret code. */
915 if (reply.ret_code != LTTNG_OK) {
916 ret = -1;
917 ERR("Relayd begin data pending replied error %d", reply.ret_code);
918 goto error;
919 }
920
921 return 0;
922
923 error:
924 return ret;
925 }
926
927 /*
928 * End a data pending command for a specific session id.
929 *
930 * Return 0 on success and set is_data_inflight to 0 if no data is being
931 * streamed or 1 if it is the case.
932 */
933 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
934 unsigned int *is_data_inflight)
935 {
936 int ret, recv_ret;
937 struct lttcomm_relayd_end_data_pending msg;
938 struct lttcomm_relayd_generic_reply reply;
939
940 /* Code flow error. Safety net. */
941 assert(rsock);
942
943 DBG("Relayd end data pending");
944
945 memset(&msg, 0, sizeof(msg));
946 msg.session_id = htobe64(id);
947
948 /* Send command */
949 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
950 if (ret < 0) {
951 goto error;
952 }
953
954 /* Receive response */
955 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
956 if (ret < 0) {
957 goto error;
958 }
959
960 recv_ret = be32toh(reply.ret_code);
961 if (recv_ret < 0) {
962 ret = recv_ret;
963 goto error;
964 }
965
966 *is_data_inflight = recv_ret;
967
968 DBG("Relayd end data pending is data inflight: %d", recv_ret);
969
970 return 0;
971
972 error:
973 return ret;
974 }
975
976 /*
977 * Send index to the relayd.
978 */
979 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
980 struct ctf_packet_index *index, uint64_t relay_stream_id,
981 uint64_t net_seq_num)
982 {
983 int ret;
984 struct lttcomm_relayd_index msg;
985 struct lttcomm_relayd_generic_reply reply;
986
987 /* Code flow error. Safety net. */
988 assert(rsock);
989
990 if (rsock->minor < 4) {
991 DBG("Not sending indexes before protocol 2.4");
992 ret = 0;
993 goto error;
994 }
995
996 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
997
998 memset(&msg, 0, sizeof(msg));
999 msg.relay_stream_id = htobe64(relay_stream_id);
1000 msg.net_seq_num = htobe64(net_seq_num);
1001
1002 /* The index is already in big endian. */
1003 msg.packet_size = index->packet_size;
1004 msg.content_size = index->content_size;
1005 msg.timestamp_begin = index->timestamp_begin;
1006 msg.timestamp_end = index->timestamp_end;
1007 msg.events_discarded = index->events_discarded;
1008 msg.stream_id = index->stream_id;
1009
1010 if (rsock->minor >= 8) {
1011 msg.stream_instance_id = index->stream_instance_id;
1012 msg.packet_seq_num = index->packet_seq_num;
1013 }
1014
1015 /* Send command */
1016 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1017 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1018 rsock->minor),
1019 lttng_to_index_minor(rsock->major, rsock->minor)),
1020 0);
1021 if (ret < 0) {
1022 goto error;
1023 }
1024
1025 /* Receive response */
1026 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1027 if (ret < 0) {
1028 goto error;
1029 }
1030
1031 reply.ret_code = be32toh(reply.ret_code);
1032
1033 /* Return session id or negative ret code. */
1034 if (reply.ret_code != LTTNG_OK) {
1035 ret = -1;
1036 ERR("Relayd send index replied error %d", reply.ret_code);
1037 } else {
1038 /* Success */
1039 ret = 0;
1040 }
1041
1042 error:
1043 return ret;
1044 }
1045
1046 /*
1047 * Ask the relay to reset the metadata trace file (regeneration).
1048 */
1049 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1050 uint64_t stream_id, uint64_t version)
1051 {
1052 int ret;
1053 struct lttcomm_relayd_reset_metadata msg;
1054 struct lttcomm_relayd_generic_reply reply;
1055
1056 /* Code flow error. Safety net. */
1057 assert(rsock);
1058
1059 /* Should have been prevented by the sessiond. */
1060 if (rsock->minor < 8) {
1061 ERR("Metadata regeneration unsupported before 2.8");
1062 ret = -1;
1063 goto error;
1064 }
1065
1066 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1067
1068 memset(&msg, 0, sizeof(msg));
1069 msg.stream_id = htobe64(stream_id);
1070 msg.version = htobe64(version);
1071
1072 /* Send command */
1073 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1074 if (ret < 0) {
1075 goto error;
1076 }
1077
1078 /* Receive response */
1079 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1080 if (ret < 0) {
1081 goto error;
1082 }
1083
1084 reply.ret_code = be32toh(reply.ret_code);
1085
1086 /* Return session id or negative ret code. */
1087 if (reply.ret_code != LTTNG_OK) {
1088 ret = -1;
1089 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1090 } else {
1091 /* Success */
1092 ret = 0;
1093 }
1094
1095 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1096
1097 error:
1098 return ret;
1099 }
1100
1101 int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1102 const char *new_pathname, uint64_t new_chunk_id,
1103 uint64_t seq_num)
1104 {
1105 int ret;
1106 struct lttcomm_relayd_rotate_stream *msg = NULL;
1107 struct lttcomm_relayd_generic_reply reply;
1108 size_t len;
1109 int msg_len;
1110
1111 /* Code flow error. Safety net. */
1112 assert(rsock);
1113
1114 DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
1115
1116 /* Account for the trailing NULL. */
1117 len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
1118 if (len > LTTNG_PATH_MAX) {
1119 ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
1120 ret = -1;
1121 goto error;
1122 }
1123
1124 msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
1125 msg = zmalloc(msg_len);
1126 if (!msg) {
1127 PERROR("Failed to allocate relayd rotate stream command of %d bytes",
1128 msg_len);
1129 ret = -1;
1130 goto error;
1131 }
1132
1133 if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
1134 ret = -1;
1135 ERR("Failed to copy relayd rotate stream command's new path name");
1136 goto error;
1137 }
1138
1139 msg->pathname_length = htobe32(len);
1140 msg->stream_id = htobe64(stream_id);
1141 msg->new_chunk_id = htobe64(new_chunk_id);
1142 /*
1143 * The seq_num is invalid for metadata streams, but it is ignored on
1144 * the relay.
1145 */
1146 msg->rotate_at_seq_num = htobe64(seq_num);
1147
1148 /* Send command. */
1149 ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
1150 if (ret < 0) {
1151 ERR("Send rotate command");
1152 goto error;
1153 }
1154
1155 /* Receive response. */
1156 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1157 if (ret < 0) {
1158 ERR("Receive rotate reply");
1159 goto error;
1160 }
1161
1162 reply.ret_code = be32toh(reply.ret_code);
1163
1164 /* Return session id or negative ret code. */
1165 if (reply.ret_code != LTTNG_OK) {
1166 ret = -1;
1167 ERR("Relayd rotate stream replied error %d", reply.ret_code);
1168 } else {
1169 /* Success. */
1170 ret = 0;
1171 DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
1172 }
1173
1174 error:
1175 free(msg);
1176 return ret;
1177 }
1178
1179 int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
1180 const char *old_path, const char *new_path)
1181 {
1182 int ret;
1183 struct lttcomm_relayd_rotate_rename *msg = NULL;
1184 struct lttcomm_relayd_generic_reply reply;
1185 size_t old_path_length, new_path_length;
1186 size_t msg_length;
1187
1188 /* Code flow error. Safety net. */
1189 assert(rsock);
1190
1191 DBG("Relayd rename chunk %s to %s", old_path, new_path);
1192
1193 /* The two paths are sent with a '\0' delimiter between them. */
1194 old_path_length = strlen(old_path) + 1;
1195 new_path_length = strlen(new_path) + 1;
1196
1197 msg_length = sizeof(*msg) + old_path_length + new_path_length;
1198 msg = zmalloc(msg_length);
1199 if (!msg) {
1200 PERROR("zmalloc rotate-rename command message");
1201 ret = -1;
1202 goto error;
1203 }
1204
1205 assert(old_path_length <= UINT32_MAX);
1206 msg->old_path_length = htobe32(old_path_length);
1207
1208 assert(new_path_length <= UINT32_MAX);
1209 msg->new_path_length = htobe32(new_path_length);
1210
1211 strcpy(msg->paths, old_path);
1212 strcpy(msg->paths + old_path_length, new_path);
1213
1214 /* Send command */
1215 ret = send_command(rsock, RELAYD_ROTATE_RENAME, (const void *) msg,
1216 msg_length, 0);
1217 if (ret < 0) {
1218 goto error;
1219 }
1220
1221 /* Receive response */
1222 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1223 if (ret < 0) {
1224 goto error;
1225 }
1226
1227 reply.ret_code = be32toh(reply.ret_code);
1228
1229 /* Return session id or negative ret code. */
1230 if (reply.ret_code != LTTNG_OK) {
1231 ret = -1;
1232 ERR("Relayd rotate rename replied error %d", reply.ret_code);
1233 } else {
1234 /* Success */
1235 ret = 0;
1236 }
1237
1238 DBG("Relayd rotate rename completed successfully");
1239
1240 error:
1241 free(msg);
1242 return ret;
1243 }
1244
1245 int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
1246 {
1247 int ret;
1248 struct lttcomm_relayd_rotate_pending msg;
1249 struct lttcomm_relayd_rotate_pending_reply reply;
1250
1251 /* Code flow error. Safety net. */
1252 assert(rsock);
1253
1254 DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
1255 chunk_id);
1256
1257 memset(&msg, 0, sizeof(msg));
1258 msg.chunk_id = htobe64(chunk_id);
1259
1260 /* Send command */
1261 ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
1262 sizeof(msg), 0);
1263 if (ret < 0) {
1264 goto error;
1265 }
1266
1267 /* Receive response */
1268 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1269 if (ret < 0) {
1270 goto error;
1271 }
1272
1273 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1274
1275 /* Return session id or negative ret code. */
1276 if (reply.generic.ret_code != LTTNG_OK) {
1277 ret = -reply.generic.ret_code;
1278 ERR("Relayd rotate pending replied with error %d", ret);
1279 goto error;
1280 } else {
1281 /* No error, just rotate pending state */
1282 if (reply.is_pending == 0 || reply.is_pending == 1) {
1283 ret = reply.is_pending;
1284 DBG("Relayd rotate pending command completed successfully with result \"%s\"",
1285 ret ? "rotation pending" : "rotation NOT pending");
1286 } else {
1287 ret = -LTTNG_ERR_UNK;
1288 }
1289 }
1290
1291 error:
1292 return ret;
1293 }
1294
1295 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
1296 {
1297 int ret;
1298 struct lttcomm_relayd_mkdir *msg;
1299 struct lttcomm_relayd_generic_reply reply;
1300 size_t len;
1301
1302 /* Code flow error. Safety net. */
1303 assert(rsock);
1304
1305 DBG("Relayd mkdir path %s", path);
1306
1307 len = strlen(path) + 1;
1308 msg = zmalloc(sizeof(msg->length) + len);
1309 if (!msg) {
1310 PERROR("Alloc mkdir msg");
1311 ret = -1;
1312 goto error;
1313 }
1314 msg->length = htobe32((uint32_t) len);
1315
1316 if (lttng_strncpy(msg->path, path, len)) {
1317 ret = -1;
1318 goto error;
1319 }
1320
1321 /* Send command */
1322 ret = send_command(rsock, RELAYD_MKDIR, (void *) msg,
1323 sizeof(msg->length) + len, 0);
1324 if (ret < 0) {
1325 goto error;
1326 }
1327
1328 /* Receive response */
1329 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1330 if (ret < 0) {
1331 goto error;
1332 }
1333
1334 reply.ret_code = be32toh(reply.ret_code);
1335
1336 /* Return session id or negative ret code. */
1337 if (reply.ret_code != LTTNG_OK) {
1338 ret = -1;
1339 ERR("Relayd mkdir replied error %d", reply.ret_code);
1340 } else {
1341 /* Success */
1342 ret = 0;
1343 }
1344
1345 DBG("Relayd mkdir completed successfully");
1346
1347 error:
1348 free(msg);
1349 return ret;
1350 }
This page took 0.092716 seconds and 5 git commands to generate.