51de7b82f4a2216f780b15e13c2c9ec899a1b73e
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/compat/endian.h>
29 #include <common/compat/string.h>
30 #include <common/sessiond-comm/relayd.h>
31 #include <common/index/ctf-index.h>
32 #include <common/trace-chunk.h>
33 #include <common/string-utils/format.h>
34
35 #include "relayd.h"
36
37 /*
38 * Send command. Fill up the header and append the data.
39 */
40 static int send_command(struct lttcomm_relayd_sock *rsock,
41 enum lttcomm_relayd_command cmd, const void *data, size_t size,
42 int flags)
43 {
44 int ret;
45 struct lttcomm_relayd_hdr header;
46 char *buf;
47 uint64_t buf_size = sizeof(header);
48
49 if (rsock->sock.fd < 0) {
50 return -ECONNRESET;
51 }
52
53 if (data) {
54 buf_size += size;
55 }
56
57 buf = zmalloc(buf_size);
58 if (buf == NULL) {
59 PERROR("zmalloc relayd send command buf");
60 ret = -1;
61 goto alloc_error;
62 }
63
64 memset(&header, 0, sizeof(header));
65 header.cmd = htobe32(cmd);
66 header.data_size = htobe64(size);
67
68 /* Zeroed for now since not used. */
69 header.cmd_version = 0;
70 header.circuit_id = 0;
71
72 /* Prepare buffer to send. */
73 memcpy(buf, &header, sizeof(header));
74 if (data) {
75 memcpy(buf + sizeof(header), data, size);
76 }
77
78 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
79 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
80 if (ret < 0) {
81 PERROR("Failed to send command %d of size %" PRIu64,
82 (int) cmd, buf_size);
83 ret = -errno;
84 goto error;
85 }
86 error:
87 free(buf);
88 alloc_error:
89 return ret;
90 }
91
92 /*
93 * Receive reply data on socket. This MUST be call after send_command or else
94 * could result in unexpected behavior(s).
95 */
96 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
97 {
98 int ret;
99
100 if (rsock->sock.fd < 0) {
101 return -ECONNRESET;
102 }
103
104 DBG3("Relayd waiting for reply of size %zu", size);
105
106 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
107 if (ret <= 0 || ret != size) {
108 if (ret == 0) {
109 /* Orderly shutdown. */
110 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
111 } else {
112 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
113 rsock->sock.fd, size, ret);
114 }
115 /* Always return -1 here and the caller can use errno. */
116 ret = -1;
117 goto error;
118 }
119
120 error:
121 return ret;
122 }
123
124 /*
125 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
126 * hostname, and base_path) have no length restriction on the sender side.
127 * Length for both payloads is stored in the msg struct. A new dynamic size
128 * payload size is introduced.
129 */
130 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
131 const char *session_name, const char *hostname,
132 const char *base_path, int session_live_timer,
133 unsigned int snapshot, uint64_t sessiond_session_id,
134 const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id,
135 time_t creation_time)
136 {
137 int ret;
138 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
139 size_t session_name_len;
140 size_t hostname_len;
141 size_t base_path_len;
142 size_t msg_length;
143 char *dst;
144
145 /* The two names are sent with a '\0' delimiter between them. */
146 session_name_len = strlen(session_name) + 1;
147 hostname_len = strlen(hostname) + 1;
148 base_path_len = base_path ? strlen(base_path) + 1 : 0;
149
150 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
151 msg = zmalloc(msg_length);
152 if (!msg) {
153 PERROR("zmalloc create_session_2_11 command message");
154 ret = -1;
155 goto error;
156 }
157
158 assert(session_name_len <= UINT32_MAX);
159 msg->session_name_len = htobe32(session_name_len);
160
161 assert(hostname_len <= UINT32_MAX);
162 msg->hostname_len = htobe32(hostname_len);
163
164 assert(base_path_len <= UINT32_MAX);
165 msg->base_path_len = htobe32(base_path_len);
166
167 dst = msg->names;
168 if (lttng_strncpy(dst, session_name, session_name_len)) {
169 ret = -1;
170 goto error;
171 }
172 dst += session_name_len;
173 if (lttng_strncpy(dst, hostname, hostname_len)) {
174 ret = -1;
175 goto error;
176 }
177 dst += hostname_len;
178 if (base_path && lttng_strncpy(dst, base_path, base_path_len)) {
179 ret = -1;
180 goto error;
181 }
182
183 msg->live_timer = htobe32(session_live_timer);
184 msg->snapshot = !!snapshot;
185
186 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
187 msg->session_id = htobe64(sessiond_session_id);
188
189 if (current_chunk_id) {
190 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
191 htobe64(*current_chunk_id));
192 }
193
194 msg->creation_time = htobe64((uint64_t) creation_time);
195
196 /* Send command */
197 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
198 if (ret < 0) {
199 goto error;
200 }
201 error:
202 free(msg);
203 return ret;
204 }
205 /*
206 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
207 * support the live reading capability.
208 */
209 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
210 const char *session_name, const char *hostname,
211 int session_live_timer, unsigned int snapshot)
212 {
213 int ret;
214 struct lttcomm_relayd_create_session_2_4 msg;
215
216 if (lttng_strncpy(msg.session_name, session_name,
217 sizeof(msg.session_name))) {
218 ret = -1;
219 goto error;
220 }
221 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
222 ret = -1;
223 goto error;
224 }
225 msg.live_timer = htobe32(session_live_timer);
226 msg.snapshot = htobe32(snapshot);
227
228 /* Send command */
229 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
230 if (ret < 0) {
231 goto error;
232 }
233
234 error:
235 return ret;
236 }
237
238 /*
239 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
240 */
241 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
242 {
243 int ret;
244
245 /* Send command */
246 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
247 if (ret < 0) {
248 goto error;
249 }
250
251 error:
252 return ret;
253 }
254
255 /*
256 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
257 * set session_id of the relayd if we have a successful reply from the relayd.
258 *
259 * On success, return 0 else a negative value which is either an errno error or
260 * a lttng error code from the relayd.
261 */
262 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
263 uint64_t *relayd_session_id,
264 const char *session_name, const char *hostname,
265 const char *base_path, int session_live_timer,
266 unsigned int snapshot, uint64_t sessiond_session_id,
267 const lttng_uuid sessiond_uuid,
268 const uint64_t *current_chunk_id,
269 time_t creation_time)
270 {
271 int ret;
272 struct lttcomm_relayd_status_session reply;
273
274 assert(rsock);
275 assert(relayd_session_id);
276
277 DBG("Relayd create session");
278
279 if (rsock->minor < 4) {
280 /* From 2.1 to 2.3 */
281 ret = relayd_create_session_2_1(rsock);
282 } else if (rsock->minor >= 4 && rsock->minor < 11) {
283 /* From 2.4 to 2.10 */
284 ret = relayd_create_session_2_4(rsock, session_name,
285 hostname, session_live_timer, snapshot);
286 } else {
287 /* From 2.11 to ... */
288 ret = relayd_create_session_2_11(rsock, session_name,
289 hostname, base_path, session_live_timer, snapshot,
290 sessiond_session_id, sessiond_uuid,
291 current_chunk_id, creation_time);
292 }
293
294 if (ret < 0) {
295 goto error;
296 }
297
298 /* Receive response */
299 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
300 if (ret < 0) {
301 goto error;
302 }
303
304 reply.session_id = be64toh(reply.session_id);
305 reply.ret_code = be32toh(reply.ret_code);
306
307 /* Return session id or negative ret code. */
308 if (reply.ret_code != LTTNG_OK) {
309 ret = -1;
310 ERR("Relayd create session replied error %d", reply.ret_code);
311 goto error;
312 } else {
313 ret = 0;
314 *relayd_session_id = reply.session_id;
315 }
316
317 DBG("Relayd session created with id %" PRIu64, reply.session_id);
318
319 error:
320 return ret;
321 }
322
323 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
324 const char *channel_name, const char *pathname)
325 {
326 int ret;
327 struct lttcomm_relayd_add_stream msg;
328
329 memset(&msg, 0, sizeof(msg));
330 if (lttng_strncpy(msg.channel_name, channel_name,
331 sizeof(msg.channel_name))) {
332 ret = -1;
333 goto error;
334 }
335
336 if (lttng_strncpy(msg.pathname, pathname,
337 sizeof(msg.pathname))) {
338 ret = -1;
339 goto error;
340 }
341
342 /* Send command */
343 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
344 if (ret < 0) {
345 ret = -1;
346 goto error;
347 }
348 ret = 0;
349 error:
350 return ret;
351 }
352
353 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
354 const char *channel_name, const char *pathname,
355 uint64_t tracefile_size, uint64_t tracefile_count)
356 {
357 int ret;
358 struct lttcomm_relayd_add_stream_2_2 msg;
359
360 memset(&msg, 0, sizeof(msg));
361 /* Compat with relayd 2.2 to 2.10 */
362 if (lttng_strncpy(msg.channel_name, channel_name,
363 sizeof(msg.channel_name))) {
364 ret = -1;
365 goto error;
366 }
367 if (lttng_strncpy(msg.pathname, pathname,
368 sizeof(msg.pathname))) {
369 ret = -1;
370 goto error;
371 }
372 msg.tracefile_size = htobe64(tracefile_size);
373 msg.tracefile_count = htobe64(tracefile_count);
374
375 /* Send command */
376 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
377 if (ret < 0) {
378 goto error;
379 }
380 ret = 0;
381 error:
382 return ret;
383 }
384
385 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
386 const char *channel_name, const char *pathname,
387 uint64_t tracefile_size, uint64_t tracefile_count,
388 uint64_t trace_archive_id)
389 {
390 int ret;
391 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
392 size_t channel_name_len;
393 size_t pathname_len;
394 size_t msg_length;
395
396 /* The two names are sent with a '\0' delimiter between them. */
397 channel_name_len = strlen(channel_name) + 1;
398 pathname_len = strlen(pathname) + 1;
399
400 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
401 msg = zmalloc(msg_length);
402 if (!msg) {
403 PERROR("zmalloc add_stream_2_11 command message");
404 ret = -1;
405 goto error;
406 }
407
408 assert(channel_name_len <= UINT32_MAX);
409 msg->channel_name_len = htobe32(channel_name_len);
410
411 assert(pathname_len <= UINT32_MAX);
412 msg->pathname_len = htobe32(pathname_len);
413
414 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
415 ret = -1;
416 goto error;
417 }
418 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
419 ret = -1;
420 goto error;
421 }
422
423 msg->tracefile_size = htobe64(tracefile_size);
424 msg->tracefile_count = htobe64(tracefile_count);
425 msg->trace_chunk_id = htobe64(trace_archive_id);
426
427 /* Send command */
428 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
429 if (ret < 0) {
430 goto error;
431 }
432 ret = 0;
433 error:
434 free(msg);
435 return ret;
436 }
437
438 /*
439 * Add stream on the relayd and assign stream handle to the stream_id argument.
440 *
441 * On success return 0 else return ret_code negative value.
442 */
443 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
444 const char *pathname, uint64_t *stream_id,
445 uint64_t tracefile_size, uint64_t tracefile_count,
446 struct lttng_trace_chunk *trace_chunk)
447 {
448 int ret;
449 struct lttcomm_relayd_status_stream reply;
450
451 /* Code flow error. Safety net. */
452 assert(rsock);
453 assert(channel_name);
454 assert(pathname);
455
456 DBG("Relayd adding stream for channel name %s", channel_name);
457
458 /* Compat with relayd 2.1 */
459 if (rsock->minor == 1) {
460 /* For 2.1 */
461 assert(!trace_chunk);
462 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
463
464 } else if (rsock->minor > 1 && rsock->minor < 11) {
465 /* From 2.2 to 2.10 */
466 assert(!trace_chunk);
467 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
468 tracefile_size, tracefile_count);
469 } else {
470 enum lttng_trace_chunk_status chunk_status;
471 uint64_t chunk_id;
472
473 assert(trace_chunk);
474 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
475 &chunk_id);
476 assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
477
478 /* From 2.11 to ...*/
479 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
480 tracefile_size, tracefile_count,
481 chunk_id);
482 }
483
484 if (ret) {
485 ret = -1;
486 goto error;
487 }
488
489 /* Waiting for reply */
490 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
491 if (ret < 0) {
492 goto error;
493 }
494
495 /* Back to host bytes order. */
496 reply.handle = be64toh(reply.handle);
497 reply.ret_code = be32toh(reply.ret_code);
498
499 /* Return session id or negative ret code. */
500 if (reply.ret_code != LTTNG_OK) {
501 ret = -1;
502 ERR("Relayd add stream replied error %d", reply.ret_code);
503 } else {
504 /* Success */
505 ret = 0;
506 *stream_id = reply.handle;
507 }
508
509 DBG("Relayd stream added successfully with handle %" PRIu64,
510 reply.handle);
511
512 error:
513 return ret;
514 }
515
516 /*
517 * Inform the relay that all the streams for the current channel has been sent.
518 *
519 * On success return 0 else return ret_code negative value.
520 */
521 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
522 {
523 int ret;
524 struct lttcomm_relayd_generic_reply reply;
525
526 /* Code flow error. Safety net. */
527 assert(rsock);
528
529 DBG("Relayd sending streams sent.");
530
531 /* This feature was introduced in 2.4, ignore it for earlier versions. */
532 if (rsock->minor < 4) {
533 ret = 0;
534 goto end;
535 }
536
537 /* Send command */
538 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
539 if (ret < 0) {
540 goto error;
541 }
542
543 /* Waiting for reply */
544 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
545 if (ret < 0) {
546 goto error;
547 }
548
549 /* Back to host bytes order. */
550 reply.ret_code = be32toh(reply.ret_code);
551
552 /* Return session id or negative ret code. */
553 if (reply.ret_code != LTTNG_OK) {
554 ret = -1;
555 ERR("Relayd streams sent replied error %d", reply.ret_code);
556 goto error;
557 } else {
558 /* Success */
559 ret = 0;
560 }
561
562 DBG("Relayd streams sent success");
563
564 error:
565 end:
566 return ret;
567 }
568
569 /*
570 * Check version numbers on the relayd.
571 * If major versions are compatible, we assign minor_to_use to the
572 * minor version of the procotol we are going to use for this session.
573 *
574 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
575 * otherwise, or a negative value on network errors.
576 */
577 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
578 {
579 int ret;
580 struct lttcomm_relayd_version msg;
581
582 /* Code flow error. Safety net. */
583 assert(rsock);
584
585 DBG("Relayd version check for major.minor %u.%u", rsock->major,
586 rsock->minor);
587
588 memset(&msg, 0, sizeof(msg));
589 /* Prepare network byte order before transmission. */
590 msg.major = htobe32(rsock->major);
591 msg.minor = htobe32(rsock->minor);
592
593 /* Send command */
594 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
595 if (ret < 0) {
596 goto error;
597 }
598
599 /* Receive response */
600 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
601 if (ret < 0) {
602 goto error;
603 }
604
605 /* Set back to host bytes order */
606 msg.major = be32toh(msg.major);
607 msg.minor = be32toh(msg.minor);
608
609 /*
610 * Only validate the major version. If the other side is higher,
611 * communication is not possible. Only major version equal can talk to each
612 * other. If the minor version differs, the lowest version is used by both
613 * sides.
614 */
615 if (msg.major != rsock->major) {
616 /* Not compatible */
617 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
618 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
619 msg.major, rsock->major);
620 goto error;
621 }
622
623 /*
624 * If the relayd's minor version is higher, it will adapt to our version so
625 * we can continue to use the latest relayd communication data structure.
626 * If the received minor version is higher, the relayd should adapt to us.
627 */
628 if (rsock->minor > msg.minor) {
629 rsock->minor = msg.minor;
630 }
631
632 /* Version number compatible */
633 DBG2("Relayd version is compatible, using protocol version %u.%u",
634 rsock->major, rsock->minor);
635 ret = 0;
636
637 error:
638 return ret;
639 }
640
641 /*
642 * Add stream on the relayd and assign stream handle to the stream_id argument.
643 *
644 * On success return 0 else return ret_code negative value.
645 */
646 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
647 {
648 int ret;
649
650 /* Code flow error. Safety net. */
651 assert(rsock);
652
653 DBG("Relayd sending metadata of size %zu", len);
654
655 /* Send command */
656 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
657 if (ret < 0) {
658 goto error;
659 }
660
661 DBG2("Relayd metadata added successfully");
662
663 /*
664 * After that call, the metadata data MUST be sent to the relayd so the
665 * receive size on the other end matches the len of the metadata packet
666 * header. This is why we don't wait for a reply here.
667 */
668
669 error:
670 return ret;
671 }
672
673 /*
674 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
675 */
676 int relayd_connect(struct lttcomm_relayd_sock *rsock)
677 {
678 /* Code flow error. Safety net. */
679 assert(rsock);
680
681 if (!rsock->sock.ops) {
682 /*
683 * Attempting a connect on a non-initialized socket.
684 */
685 return -ECONNRESET;
686 }
687
688 DBG3("Relayd connect ...");
689
690 return rsock->sock.ops->connect(&rsock->sock);
691 }
692
693 /*
694 * Close relayd socket with an allocated lttcomm_relayd_sock.
695 *
696 * If no socket operations are found, simply return 0 meaning that everything
697 * is fine. Without operations, the socket can not possibly be opened or used.
698 * This is possible if the socket was allocated but not created. However, the
699 * caller could simply use it to store a valid file descriptor for instance
700 * passed over a Unix socket and call this to cleanup but still without a valid
701 * ops pointer.
702 *
703 * Return the close returned value. On error, a negative value is usually
704 * returned back from close(2).
705 */
706 int relayd_close(struct lttcomm_relayd_sock *rsock)
707 {
708 int ret;
709
710 /* Code flow error. Safety net. */
711 assert(rsock);
712
713 /* An invalid fd is fine, return success. */
714 if (rsock->sock.fd < 0) {
715 ret = 0;
716 goto end;
717 }
718
719 DBG3("Relayd closing socket %d", rsock->sock.fd);
720
721 if (rsock->sock.ops) {
722 ret = rsock->sock.ops->close(&rsock->sock);
723 } else {
724 /* Default call if no specific ops found. */
725 ret = close(rsock->sock.fd);
726 if (ret < 0) {
727 PERROR("relayd_close default close");
728 }
729 }
730 rsock->sock.fd = -1;
731
732 end:
733 return ret;
734 }
735
736 /*
737 * Send data header structure to the relayd.
738 */
739 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
740 struct lttcomm_relayd_data_hdr *hdr, size_t size)
741 {
742 int ret;
743
744 /* Code flow error. Safety net. */
745 assert(rsock);
746 assert(hdr);
747
748 if (rsock->sock.fd < 0) {
749 return -ECONNRESET;
750 }
751
752 DBG3("Relayd sending data header of size %zu", size);
753
754 /* Again, safety net */
755 if (size == 0) {
756 size = sizeof(struct lttcomm_relayd_data_hdr);
757 }
758
759 /* Only send data header. */
760 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
761 if (ret < 0) {
762 ret = -errno;
763 goto error;
764 }
765
766 /*
767 * The data MUST be sent right after that command for the receive on the
768 * other end to match the size in the header.
769 */
770
771 error:
772 return ret;
773 }
774
775 /*
776 * Send close stream command to the relayd.
777 */
778 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
779 uint64_t last_net_seq_num)
780 {
781 int ret;
782 struct lttcomm_relayd_close_stream msg;
783 struct lttcomm_relayd_generic_reply reply;
784
785 /* Code flow error. Safety net. */
786 assert(rsock);
787
788 DBG("Relayd closing stream id %" PRIu64, stream_id);
789
790 memset(&msg, 0, sizeof(msg));
791 msg.stream_id = htobe64(stream_id);
792 msg.last_net_seq_num = htobe64(last_net_seq_num);
793
794 /* Send command */
795 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
796 if (ret < 0) {
797 goto error;
798 }
799
800 /* Receive response */
801 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
802 if (ret < 0) {
803 goto error;
804 }
805
806 reply.ret_code = be32toh(reply.ret_code);
807
808 /* Return session id or negative ret code. */
809 if (reply.ret_code != LTTNG_OK) {
810 ret = -1;
811 ERR("Relayd close stream replied error %d", reply.ret_code);
812 } else {
813 /* Success */
814 ret = 0;
815 }
816
817 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
818
819 error:
820 return ret;
821 }
822
823 /*
824 * Check for data availability for a given stream id.
825 *
826 * Return 0 if NOT pending, 1 if so and a negative value on error.
827 */
828 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
829 uint64_t last_net_seq_num)
830 {
831 int ret;
832 struct lttcomm_relayd_data_pending msg;
833 struct lttcomm_relayd_generic_reply reply;
834
835 /* Code flow error. Safety net. */
836 assert(rsock);
837
838 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
839
840 memset(&msg, 0, sizeof(msg));
841 msg.stream_id = htobe64(stream_id);
842 msg.last_net_seq_num = htobe64(last_net_seq_num);
843
844 /* Send command */
845 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
846 sizeof(msg), 0);
847 if (ret < 0) {
848 goto error;
849 }
850
851 /* Receive response */
852 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
853 if (ret < 0) {
854 goto error;
855 }
856
857 reply.ret_code = be32toh(reply.ret_code);
858
859 /* Return session id or negative ret code. */
860 if (reply.ret_code >= LTTNG_OK) {
861 ERR("Relayd data pending replied error %d", reply.ret_code);
862 }
863
864 /* At this point, the ret code is either 1 or 0 */
865 ret = reply.ret_code;
866
867 DBG("Relayd data is %s pending for stream id %" PRIu64,
868 ret == 1 ? "" : "NOT", stream_id);
869
870 error:
871 return ret;
872 }
873
874 /*
875 * Check on the relayd side for a quiescent state on the control socket.
876 */
877 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
878 uint64_t metadata_stream_id)
879 {
880 int ret;
881 struct lttcomm_relayd_quiescent_control msg;
882 struct lttcomm_relayd_generic_reply reply;
883
884 /* Code flow error. Safety net. */
885 assert(rsock);
886
887 DBG("Relayd checking quiescent control state");
888
889 memset(&msg, 0, sizeof(msg));
890 msg.stream_id = htobe64(metadata_stream_id);
891
892 /* Send command */
893 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
894 if (ret < 0) {
895 goto error;
896 }
897
898 /* Receive response */
899 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
900 if (ret < 0) {
901 goto error;
902 }
903
904 reply.ret_code = be32toh(reply.ret_code);
905
906 /* Return session id or negative ret code. */
907 if (reply.ret_code != LTTNG_OK) {
908 ret = -1;
909 ERR("Relayd quiescent control replied error %d", reply.ret_code);
910 goto error;
911 }
912
913 /* Control socket is quiescent */
914 return 0;
915
916 error:
917 return ret;
918 }
919
920 /*
921 * Begin a data pending command for a specific session id.
922 */
923 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
924 {
925 int ret;
926 struct lttcomm_relayd_begin_data_pending msg;
927 struct lttcomm_relayd_generic_reply reply;
928
929 /* Code flow error. Safety net. */
930 assert(rsock);
931
932 DBG("Relayd begin data pending");
933
934 memset(&msg, 0, sizeof(msg));
935 msg.session_id = htobe64(id);
936
937 /* Send command */
938 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
939 if (ret < 0) {
940 goto error;
941 }
942
943 /* Receive response */
944 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
945 if (ret < 0) {
946 goto error;
947 }
948
949 reply.ret_code = be32toh(reply.ret_code);
950
951 /* Return session id or negative ret code. */
952 if (reply.ret_code != LTTNG_OK) {
953 ret = -1;
954 ERR("Relayd begin data pending replied error %d", reply.ret_code);
955 goto error;
956 }
957
958 return 0;
959
960 error:
961 return ret;
962 }
963
964 /*
965 * End a data pending command for a specific session id.
966 *
967 * Return 0 on success and set is_data_inflight to 0 if no data is being
968 * streamed or 1 if it is the case.
969 */
970 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
971 unsigned int *is_data_inflight)
972 {
973 int ret, recv_ret;
974 struct lttcomm_relayd_end_data_pending msg;
975 struct lttcomm_relayd_generic_reply reply;
976
977 /* Code flow error. Safety net. */
978 assert(rsock);
979
980 DBG("Relayd end data pending");
981
982 memset(&msg, 0, sizeof(msg));
983 msg.session_id = htobe64(id);
984
985 /* Send command */
986 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
987 if (ret < 0) {
988 goto error;
989 }
990
991 /* Receive response */
992 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
993 if (ret < 0) {
994 goto error;
995 }
996
997 recv_ret = be32toh(reply.ret_code);
998 if (recv_ret < 0) {
999 ret = recv_ret;
1000 goto error;
1001 }
1002
1003 *is_data_inflight = recv_ret;
1004
1005 DBG("Relayd end data pending is data inflight: %d", recv_ret);
1006
1007 return 0;
1008
1009 error:
1010 return ret;
1011 }
1012
1013 /*
1014 * Send index to the relayd.
1015 */
1016 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
1017 struct ctf_packet_index *index, uint64_t relay_stream_id,
1018 uint64_t net_seq_num)
1019 {
1020 int ret;
1021 struct lttcomm_relayd_index msg;
1022 struct lttcomm_relayd_generic_reply reply;
1023
1024 /* Code flow error. Safety net. */
1025 assert(rsock);
1026
1027 if (rsock->minor < 4) {
1028 DBG("Not sending indexes before protocol 2.4");
1029 ret = 0;
1030 goto error;
1031 }
1032
1033 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1034
1035 memset(&msg, 0, sizeof(msg));
1036 msg.relay_stream_id = htobe64(relay_stream_id);
1037 msg.net_seq_num = htobe64(net_seq_num);
1038
1039 /* The index is already in big endian. */
1040 msg.packet_size = index->packet_size;
1041 msg.content_size = index->content_size;
1042 msg.timestamp_begin = index->timestamp_begin;
1043 msg.timestamp_end = index->timestamp_end;
1044 msg.events_discarded = index->events_discarded;
1045 msg.stream_id = index->stream_id;
1046
1047 if (rsock->minor >= 8) {
1048 msg.stream_instance_id = index->stream_instance_id;
1049 msg.packet_seq_num = index->packet_seq_num;
1050 }
1051
1052 /* Send command */
1053 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1054 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1055 rsock->minor),
1056 lttng_to_index_minor(rsock->major, rsock->minor)),
1057 0);
1058 if (ret < 0) {
1059 goto error;
1060 }
1061
1062 /* Receive response */
1063 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1064 if (ret < 0) {
1065 goto error;
1066 }
1067
1068 reply.ret_code = be32toh(reply.ret_code);
1069
1070 /* Return session id or negative ret code. */
1071 if (reply.ret_code != LTTNG_OK) {
1072 ret = -1;
1073 ERR("Relayd send index replied error %d", reply.ret_code);
1074 } else {
1075 /* Success */
1076 ret = 0;
1077 }
1078
1079 error:
1080 return ret;
1081 }
1082
1083 /*
1084 * Ask the relay to reset the metadata trace file (regeneration).
1085 */
1086 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1087 uint64_t stream_id, uint64_t version)
1088 {
1089 int ret;
1090 struct lttcomm_relayd_reset_metadata msg;
1091 struct lttcomm_relayd_generic_reply reply;
1092
1093 /* Code flow error. Safety net. */
1094 assert(rsock);
1095
1096 /* Should have been prevented by the sessiond. */
1097 if (rsock->minor < 8) {
1098 ERR("Metadata regeneration unsupported before 2.8");
1099 ret = -1;
1100 goto error;
1101 }
1102
1103 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1104
1105 memset(&msg, 0, sizeof(msg));
1106 msg.stream_id = htobe64(stream_id);
1107 msg.version = htobe64(version);
1108
1109 /* Send command */
1110 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1111 if (ret < 0) {
1112 goto error;
1113 }
1114
1115 /* Receive response */
1116 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1117 if (ret < 0) {
1118 goto error;
1119 }
1120
1121 reply.ret_code = be32toh(reply.ret_code);
1122
1123 /* Return session id or negative ret code. */
1124 if (reply.ret_code != LTTNG_OK) {
1125 ret = -1;
1126 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1127 } else {
1128 /* Success */
1129 ret = 0;
1130 }
1131
1132 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1133
1134 error:
1135 return ret;
1136 }
1137
1138 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
1139 unsigned int stream_count, const uint64_t *new_chunk_id,
1140 const struct relayd_stream_rotation_position *positions)
1141 {
1142 int ret;
1143 unsigned int i;
1144 struct lttng_dynamic_buffer payload;
1145 struct lttcomm_relayd_generic_reply reply = {};
1146 const struct lttcomm_relayd_rotate_streams msg = {
1147 .stream_count = htobe32((uint32_t) stream_count),
1148 .new_chunk_id = (typeof(msg.new_chunk_id)) {
1149 .is_set = !!new_chunk_id,
1150 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1151 },
1152 };
1153 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1154 const char *new_chunk_id_str;
1155
1156 lttng_dynamic_buffer_init(&payload);
1157
1158 /* Code flow error. Safety net. */
1159 assert(sock);
1160
1161 if (new_chunk_id) {
1162 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1163 "%" PRIu64, *new_chunk_id);
1164 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1165 new_chunk_id_str = "formatting error";
1166 } else {
1167 new_chunk_id_str = new_chunk_id_buf;
1168 }
1169 } else {
1170 new_chunk_id_str = "none";
1171 }
1172
1173 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1174 new_chunk_id_str, stream_count);
1175
1176 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1177 if (ret) {
1178 ERR("Failed to allocate \"rotate streams\" command payload");
1179 goto error;
1180 }
1181
1182 for (i = 0; i < stream_count; i++) {
1183 const struct relayd_stream_rotation_position *position =
1184 &positions[i];
1185 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1186 .stream_id = htobe64(position->stream_id),
1187 .rotate_at_seq_num = htobe64(
1188 position->rotate_at_seq_num),
1189 };
1190
1191 DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
1192 position->stream_id,
1193 position->rotate_at_seq_num);
1194 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1195 sizeof(comm_position));
1196 if (ret) {
1197 ERR("Failed to allocate \"rotate streams\" command payload");
1198 goto error;
1199 }
1200 }
1201
1202 /* Send command. */
1203 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1204 payload.size, 0);
1205 if (ret < 0) {
1206 ERR("Failed to send \"rotate stream\" command");
1207 goto error;
1208 }
1209
1210 /* Receive response. */
1211 ret = recv_reply(sock, &reply, sizeof(reply));
1212 if (ret < 0) {
1213 ERR("Failed to receive \"rotate streams\" command reply");
1214 goto error;
1215 }
1216
1217 reply.ret_code = be32toh(reply.ret_code);
1218 if (reply.ret_code != LTTNG_OK) {
1219 ret = -1;
1220 ERR("Relayd rotate streams replied error %d", reply.ret_code);
1221 } else {
1222 /* Success. */
1223 ret = 0;
1224 DBG("Relayd rotated streams successfully");
1225 }
1226
1227 error:
1228 lttng_dynamic_buffer_reset(&payload);
1229 return ret;
1230 }
1231
1232 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1233 struct lttng_trace_chunk *chunk)
1234 {
1235 int ret = 0;
1236 enum lttng_trace_chunk_status status;
1237 struct lttcomm_relayd_create_trace_chunk msg = {};
1238 struct lttcomm_relayd_generic_reply reply = {};
1239 struct lttng_dynamic_buffer payload;
1240 uint64_t chunk_id;
1241 time_t creation_timestamp;
1242 const char *chunk_name;
1243 size_t chunk_name_length;
1244 bool overridden_name;
1245
1246 lttng_dynamic_buffer_init(&payload);
1247
1248 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1249 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1250 ret = -1;
1251 goto end;
1252 }
1253
1254 status = lttng_trace_chunk_get_creation_timestamp(
1255 chunk, &creation_timestamp);
1256 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1257 ret = -1;
1258 goto end;
1259 }
1260
1261 status = lttng_trace_chunk_get_name(
1262 chunk, &chunk_name, &overridden_name);
1263 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1264 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1265 ret = -1;
1266 goto end;
1267 }
1268
1269 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
1270 msg = (typeof(msg)){
1271 .chunk_id = htobe64(chunk_id),
1272 .creation_timestamp = htobe64((uint64_t) creation_timestamp),
1273 .override_name_length = htobe32((uint32_t) chunk_name_length),
1274 };
1275
1276 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1277 if (ret) {
1278 goto end;
1279 }
1280 if (chunk_name_length) {
1281 ret = lttng_dynamic_buffer_append(
1282 &payload, chunk_name, chunk_name_length);
1283 if (ret) {
1284 goto end;
1285 }
1286 }
1287
1288 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1289 payload.size, 0);
1290 if (ret < 0) {
1291 ERR("Failed to send trace chunk creation command to relay daemon");
1292 goto end;
1293 }
1294
1295 ret = recv_reply(sock, &reply, sizeof(reply));
1296 if (ret < 0) {
1297 ERR("Failed to receive relay daemon trace chunk creation command reply");
1298 goto end;
1299 }
1300
1301 reply.ret_code = be32toh(reply.ret_code);
1302 if (reply.ret_code != LTTNG_OK) {
1303 ret = -1;
1304 ERR("Relayd trace chunk create replied error %d",
1305 reply.ret_code);
1306 } else {
1307 ret = 0;
1308 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1309 chunk_id);
1310 }
1311
1312 end:
1313 lttng_dynamic_buffer_reset(&payload);
1314 return ret;
1315 }
1316
1317 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1318 struct lttng_trace_chunk *chunk)
1319 {
1320 int ret = 0;
1321 enum lttng_trace_chunk_status status;
1322 struct lttcomm_relayd_close_trace_chunk msg = {};
1323 struct lttcomm_relayd_generic_reply reply = {};
1324 uint64_t chunk_id;
1325 time_t close_timestamp;
1326 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1327
1328 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1329 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1330 ERR("Failed to get trace chunk id");
1331 ret = -1;
1332 goto end;
1333 }
1334
1335 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1336 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1337 ERR("Failed to get trace chunk close timestamp");
1338 ret = -1;
1339 goto end;
1340 }
1341
1342 status = lttng_trace_chunk_get_close_command(chunk,
1343 &close_command.value);
1344 switch (status) {
1345 case LTTNG_TRACE_CHUNK_STATUS_OK:
1346 close_command.is_set = 1;
1347 break;
1348 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1349 break;
1350 default:
1351 ERR("Failed to get trace chunk close command");
1352 ret = -1;
1353 goto end;
1354 }
1355
1356 msg = (typeof(msg)){
1357 .chunk_id = htobe64(chunk_id),
1358 .close_timestamp = htobe64((uint64_t) close_timestamp),
1359 .close_command = {
1360 .value = htobe32((uint32_t) close_command.value),
1361 .is_set = close_command.is_set,
1362 },
1363 };
1364
1365 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1366 0);
1367 if (ret < 0) {
1368 ERR("Failed to send trace chunk close command to relay daemon");
1369 goto end;
1370 }
1371
1372 ret = recv_reply(sock, &reply, sizeof(reply));
1373 if (ret < 0) {
1374 ERR("Failed to receive relay daemon trace chunk close command reply");
1375 goto end;
1376 }
1377
1378 reply.ret_code = be32toh(reply.ret_code);
1379 if (reply.ret_code != LTTNG_OK) {
1380 ret = -1;
1381 ERR("Relayd trace chunk close replied error %d",
1382 reply.ret_code);
1383 } else {
1384 ret = 0;
1385 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1386 chunk_id);
1387 }
1388 end:
1389 return ret;
1390 }
1391
1392 int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1393 uint64_t chunk_id, bool *chunk_exists)
1394 {
1395 int ret = 0;
1396 struct lttcomm_relayd_trace_chunk_exists msg = {};
1397 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1398
1399 msg = (typeof(msg)){
1400 .chunk_id = htobe64(chunk_id),
1401 };
1402
1403 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1404 0);
1405 if (ret < 0) {
1406 ERR("Failed to send trace chunk exists command to relay daemon");
1407 goto end;
1408 }
1409
1410 ret = recv_reply(sock, &reply, sizeof(reply));
1411 if (ret < 0) {
1412 ERR("Failed to receive relay daemon trace chunk close command reply");
1413 goto end;
1414 }
1415
1416 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1417 if (reply.generic.ret_code != LTTNG_OK) {
1418 ret = -1;
1419 ERR("Relayd trace chunk close replied error %d",
1420 reply.generic.ret_code);
1421 } else {
1422 ret = 0;
1423 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1424 ", exists = %s", chunk_id,
1425 reply.trace_chunk_exists ? "true" : "false");
1426 *chunk_exists = !!reply.trace_chunk_exists;
1427 }
1428 end:
1429 return ret;
1430 }
This page took 0.059954 seconds and 3 git commands to generate.