relayd: send sessiond uuid and session id as part of create session
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/compat/endian.h>
29 #include <common/compat/string.h>
30 #include <common/sessiond-comm/relayd.h>
31 #include <common/index/ctf-index.h>
32
33 #include "relayd.h"
34
35 /*
36 * Send command. Fill up the header and append the data.
37 */
38 static int send_command(struct lttcomm_relayd_sock *rsock,
39 enum lttcomm_relayd_command cmd, const void *data, size_t size,
40 int flags)
41 {
42 int ret;
43 struct lttcomm_relayd_hdr header;
44 char *buf;
45 uint64_t buf_size = sizeof(header);
46
47 if (rsock->sock.fd < 0) {
48 return -ECONNRESET;
49 }
50
51 if (data) {
52 buf_size += size;
53 }
54
55 buf = zmalloc(buf_size);
56 if (buf == NULL) {
57 PERROR("zmalloc relayd send command buf");
58 ret = -1;
59 goto alloc_error;
60 }
61
62 memset(&header, 0, sizeof(header));
63 header.cmd = htobe32(cmd);
64 header.data_size = htobe64(size);
65
66 /* Zeroed for now since not used. */
67 header.cmd_version = 0;
68 header.circuit_id = 0;
69
70 /* Prepare buffer to send. */
71 memcpy(buf, &header, sizeof(header));
72 if (data) {
73 memcpy(buf + sizeof(header), data, size);
74 }
75
76 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
77 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
78 if (ret < 0) {
79 PERROR("Failed to send command %d of size %" PRIu64,
80 (int) cmd, buf_size);
81 ret = -errno;
82 goto error;
83 }
84 error:
85 free(buf);
86 alloc_error:
87 return ret;
88 }
89
90 /*
91 * Receive reply data on socket. This MUST be call after send_command or else
92 * could result in unexpected behavior(s).
93 */
94 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
95 {
96 int ret;
97
98 if (rsock->sock.fd < 0) {
99 return -ECONNRESET;
100 }
101
102 DBG3("Relayd waiting for reply of size %zu", size);
103
104 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
105 if (ret <= 0 || ret != size) {
106 if (ret == 0) {
107 /* Orderly shutdown. */
108 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
109 } else {
110 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
111 rsock->sock.fd, size, ret);
112 }
113 /* Always return -1 here and the caller can use errno. */
114 ret = -1;
115 goto error;
116 }
117
118 error:
119 return ret;
120 }
121
122 /*
123 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name & hostname)
124 * have no length restriction on the sender side.
125 * Length for both payloads is stored in the msg struct. A new dynamic size
126 * payload size is introduced.
127 */
128 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
129 char *session_name, char *hostname,
130 int session_live_timer, unsigned int snapshot,
131 uint64_t sessiond_session_id, const lttng_uuid sessiond_uuid)
132 {
133 int ret;
134 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
135 size_t session_name_len;
136 size_t hostname_len;
137 size_t msg_length;
138
139 /* The two names are sent with a '\0' delimiter between them. */
140 session_name_len = strlen(session_name) + 1;
141 hostname_len = strlen(hostname) + 1;
142
143 msg_length = sizeof(*msg) + session_name_len + hostname_len;
144 msg = zmalloc(msg_length);
145 if (!msg) {
146 PERROR("zmalloc create_session_2_11 command message");
147 ret = -1;
148 goto error;
149 }
150
151 assert(session_name_len <= UINT32_MAX);
152 msg->session_name_len = htobe32(session_name_len);
153
154 assert(hostname_len <= UINT32_MAX);
155 msg->hostname_len = htobe32(hostname_len);
156
157 if (lttng_strncpy(msg->names, session_name, session_name_len)) {
158 ret = -1;
159 goto error;
160 }
161 if (lttng_strncpy(msg->names + session_name_len, hostname, hostname_len)) {
162 ret = -1;
163 goto error;
164 }
165
166 msg->live_timer = htobe32(session_live_timer);
167 msg->snapshot = !!snapshot;
168
169 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
170 msg->session_id = htobe64(sessiond_session_id);
171
172 /* Send command */
173 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
174 if (ret < 0) {
175 goto error;
176 }
177 error:
178 free(msg);
179 return ret;
180 }
181 /*
182 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
183 * support the live reading capability.
184 */
185 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
186 char *session_name, char *hostname, int session_live_timer,
187 unsigned int snapshot)
188 {
189 int ret;
190 struct lttcomm_relayd_create_session_2_4 msg;
191
192 if (lttng_strncpy(msg.session_name, session_name,
193 sizeof(msg.session_name))) {
194 ret = -1;
195 goto error;
196 }
197 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
198 ret = -1;
199 goto error;
200 }
201 msg.live_timer = htobe32(session_live_timer);
202 msg.snapshot = htobe32(snapshot);
203
204 /* Send command */
205 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
206 if (ret < 0) {
207 goto error;
208 }
209
210 error:
211 return ret;
212 }
213
214 /*
215 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
216 */
217 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
218 {
219 int ret;
220
221 /* Send command */
222 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
223 if (ret < 0) {
224 goto error;
225 }
226
227 error:
228 return ret;
229 }
230
231 /*
232 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
233 * set session_id of the relayd if we have a successful reply from the relayd.
234 *
235 * On success, return 0 else a negative value which is either an errno error or
236 * a lttng error code from the relayd.
237 */
238 int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *relayd_session_id,
239 char *session_name, char *hostname, int session_live_timer,
240 unsigned int snapshot, uint64_t sessiond_session_id,
241 const lttng_uuid sessiond_uuid)
242 {
243 int ret;
244 struct lttcomm_relayd_status_session reply;
245
246 assert(rsock);
247 assert(relayd_session_id);
248
249 DBG("Relayd create session");
250
251 if (rsock->minor < 4) {
252 /* From 2.1 to 2.3 */
253 ret = relayd_create_session_2_1(rsock);
254 } else if (rsock->minor >= 4 && rsock->minor < 11) {
255 /* From 2.4 to 2.10 */
256 ret = relayd_create_session_2_4(rsock, session_name,
257 hostname, session_live_timer, snapshot);
258 } else {
259 /* From 2.11 to ... */
260 ret = relayd_create_session_2_11(rsock, session_name,
261 hostname, session_live_timer, snapshot,
262 sessiond_session_id, sessiond_uuid);
263 }
264
265 if (ret < 0) {
266 goto error;
267 }
268
269 /* Receive response */
270 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
271 if (ret < 0) {
272 goto error;
273 }
274
275 reply.session_id = be64toh(reply.session_id);
276 reply.ret_code = be32toh(reply.ret_code);
277
278 /* Return session id or negative ret code. */
279 if (reply.ret_code != LTTNG_OK) {
280 ret = -1;
281 ERR("Relayd create session replied error %d", reply.ret_code);
282 goto error;
283 } else {
284 ret = 0;
285 *relayd_session_id = reply.session_id;
286 }
287
288 DBG("Relayd session created with id %" PRIu64, reply.session_id);
289
290 error:
291 return ret;
292 }
293
294 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
295 const char *channel_name, const char *pathname)
296 {
297 int ret;
298 struct lttcomm_relayd_add_stream msg;
299
300 memset(&msg, 0, sizeof(msg));
301 if (lttng_strncpy(msg.channel_name, channel_name,
302 sizeof(msg.channel_name))) {
303 ret = -1;
304 goto error;
305 }
306
307 if (lttng_strncpy(msg.pathname, pathname,
308 sizeof(msg.pathname))) {
309 ret = -1;
310 goto error;
311 }
312
313 /* Send command */
314 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
315 if (ret < 0) {
316 ret = -1;
317 goto error;
318 }
319 ret = 0;
320 error:
321 return ret;
322 }
323
324 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
325 const char *channel_name, const char *pathname,
326 uint64_t tracefile_size, uint64_t tracefile_count)
327 {
328 int ret;
329 struct lttcomm_relayd_add_stream_2_2 msg;
330
331 memset(&msg, 0, sizeof(msg));
332 /* Compat with relayd 2.2 to 2.10 */
333 if (lttng_strncpy(msg.channel_name, channel_name,
334 sizeof(msg.channel_name))) {
335 ret = -1;
336 goto error;
337 }
338 if (lttng_strncpy(msg.pathname, pathname,
339 sizeof(msg.pathname))) {
340 ret = -1;
341 goto error;
342 }
343 msg.tracefile_size = htobe64(tracefile_size);
344 msg.tracefile_count = htobe64(tracefile_count);
345
346 /* Send command */
347 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
348 if (ret < 0) {
349 goto error;
350 }
351 ret = 0;
352 error:
353 return ret;
354 }
355
356 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
357 const char *channel_name, const char *pathname,
358 uint64_t tracefile_size, uint64_t tracefile_count,
359 uint64_t trace_archive_id)
360 {
361 int ret;
362 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
363 size_t channel_name_len;
364 size_t pathname_len;
365 size_t msg_length;
366
367 /* The two names are sent with a '\0' delimiter between them. */
368 channel_name_len = strlen(channel_name) + 1;
369 pathname_len = strlen(pathname) + 1;
370
371 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
372 msg = zmalloc(msg_length);
373 if (!msg) {
374 PERROR("zmalloc add_stream_2_11 command message");
375 ret = -1;
376 goto error;
377 }
378
379 assert(channel_name_len <= UINT32_MAX);
380 msg->channel_name_len = htobe32(channel_name_len);
381
382 assert(pathname_len <= UINT32_MAX);
383 msg->pathname_len = htobe32(pathname_len);
384
385 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
386 ret = -1;
387 goto error;
388 }
389 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
390 ret = -1;
391 goto error;
392 }
393
394 msg->tracefile_size = htobe64(tracefile_size);
395 msg->tracefile_count = htobe64(tracefile_count);
396 msg->trace_archive_id = htobe64(trace_archive_id);
397
398 /* Send command */
399 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
400 if (ret < 0) {
401 goto error;
402 }
403 ret = 0;
404 error:
405 free(msg);
406 return ret;
407 }
408
409 /*
410 * Add stream on the relayd and assign stream handle to the stream_id argument.
411 *
412 * On success return 0 else return ret_code negative value.
413 */
414 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
415 const char *pathname, uint64_t *stream_id,
416 uint64_t tracefile_size, uint64_t tracefile_count,
417 uint64_t trace_archive_id)
418 {
419 int ret;
420 struct lttcomm_relayd_status_stream reply;
421
422 /* Code flow error. Safety net. */
423 assert(rsock);
424 assert(channel_name);
425 assert(pathname);
426
427 DBG("Relayd adding stream for channel name %s", channel_name);
428
429 /* Compat with relayd 2.1 */
430 if (rsock->minor == 1) {
431 /* For 2.1 */
432 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
433
434 } else if (rsock->minor > 1 && rsock->minor < 11) {
435 /* From 2.2 to 2.10 */
436 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
437 tracefile_size, tracefile_count);
438 } else {
439 /* From 2.11 to ...*/
440 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
441 tracefile_size, tracefile_count,
442 trace_archive_id);
443 }
444
445 if (ret) {
446 ret = -1;
447 goto error;
448 }
449
450 /* Waiting for reply */
451 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
452 if (ret < 0) {
453 goto error;
454 }
455
456 /* Back to host bytes order. */
457 reply.handle = be64toh(reply.handle);
458 reply.ret_code = be32toh(reply.ret_code);
459
460 /* Return session id or negative ret code. */
461 if (reply.ret_code != LTTNG_OK) {
462 ret = -1;
463 ERR("Relayd add stream replied error %d", reply.ret_code);
464 } else {
465 /* Success */
466 ret = 0;
467 *stream_id = reply.handle;
468 }
469
470 DBG("Relayd stream added successfully with handle %" PRIu64,
471 reply.handle);
472
473 error:
474 return ret;
475 }
476
477 /*
478 * Inform the relay that all the streams for the current channel has been sent.
479 *
480 * On success return 0 else return ret_code negative value.
481 */
482 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
483 {
484 int ret;
485 struct lttcomm_relayd_generic_reply reply;
486
487 /* Code flow error. Safety net. */
488 assert(rsock);
489
490 DBG("Relayd sending streams sent.");
491
492 /* This feature was introduced in 2.4, ignore it for earlier versions. */
493 if (rsock->minor < 4) {
494 ret = 0;
495 goto end;
496 }
497
498 /* Send command */
499 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
500 if (ret < 0) {
501 goto error;
502 }
503
504 /* Waiting for reply */
505 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
506 if (ret < 0) {
507 goto error;
508 }
509
510 /* Back to host bytes order. */
511 reply.ret_code = be32toh(reply.ret_code);
512
513 /* Return session id or negative ret code. */
514 if (reply.ret_code != LTTNG_OK) {
515 ret = -1;
516 ERR("Relayd streams sent replied error %d", reply.ret_code);
517 goto error;
518 } else {
519 /* Success */
520 ret = 0;
521 }
522
523 DBG("Relayd streams sent success");
524
525 error:
526 end:
527 return ret;
528 }
529
530 /*
531 * Check version numbers on the relayd.
532 * If major versions are compatible, we assign minor_to_use to the
533 * minor version of the procotol we are going to use for this session.
534 *
535 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
536 * otherwise, or a negative value on network errors.
537 */
538 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
539 {
540 int ret;
541 struct lttcomm_relayd_version msg;
542
543 /* Code flow error. Safety net. */
544 assert(rsock);
545
546 DBG("Relayd version check for major.minor %u.%u", rsock->major,
547 rsock->minor);
548
549 memset(&msg, 0, sizeof(msg));
550 /* Prepare network byte order before transmission. */
551 msg.major = htobe32(rsock->major);
552 msg.minor = htobe32(rsock->minor);
553
554 /* Send command */
555 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
556 if (ret < 0) {
557 goto error;
558 }
559
560 /* Receive response */
561 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
562 if (ret < 0) {
563 goto error;
564 }
565
566 /* Set back to host bytes order */
567 msg.major = be32toh(msg.major);
568 msg.minor = be32toh(msg.minor);
569
570 /*
571 * Only validate the major version. If the other side is higher,
572 * communication is not possible. Only major version equal can talk to each
573 * other. If the minor version differs, the lowest version is used by both
574 * sides.
575 */
576 if (msg.major != rsock->major) {
577 /* Not compatible */
578 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
579 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
580 msg.major, rsock->major);
581 goto error;
582 }
583
584 /*
585 * If the relayd's minor version is higher, it will adapt to our version so
586 * we can continue to use the latest relayd communication data structure.
587 * If the received minor version is higher, the relayd should adapt to us.
588 */
589 if (rsock->minor > msg.minor) {
590 rsock->minor = msg.minor;
591 }
592
593 /* Version number compatible */
594 DBG2("Relayd version is compatible, using protocol version %u.%u",
595 rsock->major, rsock->minor);
596 ret = 0;
597
598 error:
599 return ret;
600 }
601
602 /*
603 * Add stream on the relayd and assign stream handle to the stream_id argument.
604 *
605 * On success return 0 else return ret_code negative value.
606 */
607 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
608 {
609 int ret;
610
611 /* Code flow error. Safety net. */
612 assert(rsock);
613
614 DBG("Relayd sending metadata of size %zu", len);
615
616 /* Send command */
617 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
618 if (ret < 0) {
619 goto error;
620 }
621
622 DBG2("Relayd metadata added successfully");
623
624 /*
625 * After that call, the metadata data MUST be sent to the relayd so the
626 * receive size on the other end matches the len of the metadata packet
627 * header. This is why we don't wait for a reply here.
628 */
629
630 error:
631 return ret;
632 }
633
634 /*
635 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
636 */
637 int relayd_connect(struct lttcomm_relayd_sock *rsock)
638 {
639 /* Code flow error. Safety net. */
640 assert(rsock);
641
642 if (!rsock->sock.ops) {
643 /*
644 * Attempting a connect on a non-initialized socket.
645 */
646 return -ECONNRESET;
647 }
648
649 DBG3("Relayd connect ...");
650
651 return rsock->sock.ops->connect(&rsock->sock);
652 }
653
654 /*
655 * Close relayd socket with an allocated lttcomm_relayd_sock.
656 *
657 * If no socket operations are found, simply return 0 meaning that everything
658 * is fine. Without operations, the socket can not possibly be opened or used.
659 * This is possible if the socket was allocated but not created. However, the
660 * caller could simply use it to store a valid file descriptor for instance
661 * passed over a Unix socket and call this to cleanup but still without a valid
662 * ops pointer.
663 *
664 * Return the close returned value. On error, a negative value is usually
665 * returned back from close(2).
666 */
667 int relayd_close(struct lttcomm_relayd_sock *rsock)
668 {
669 int ret;
670
671 /* Code flow error. Safety net. */
672 assert(rsock);
673
674 /* An invalid fd is fine, return success. */
675 if (rsock->sock.fd < 0) {
676 ret = 0;
677 goto end;
678 }
679
680 DBG3("Relayd closing socket %d", rsock->sock.fd);
681
682 if (rsock->sock.ops) {
683 ret = rsock->sock.ops->close(&rsock->sock);
684 } else {
685 /* Default call if no specific ops found. */
686 ret = close(rsock->sock.fd);
687 if (ret < 0) {
688 PERROR("relayd_close default close");
689 }
690 }
691 rsock->sock.fd = -1;
692
693 end:
694 return ret;
695 }
696
697 /*
698 * Send data header structure to the relayd.
699 */
700 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
701 struct lttcomm_relayd_data_hdr *hdr, size_t size)
702 {
703 int ret;
704
705 /* Code flow error. Safety net. */
706 assert(rsock);
707 assert(hdr);
708
709 if (rsock->sock.fd < 0) {
710 return -ECONNRESET;
711 }
712
713 DBG3("Relayd sending data header of size %zu", size);
714
715 /* Again, safety net */
716 if (size == 0) {
717 size = sizeof(struct lttcomm_relayd_data_hdr);
718 }
719
720 /* Only send data header. */
721 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
722 if (ret < 0) {
723 ret = -errno;
724 goto error;
725 }
726
727 /*
728 * The data MUST be sent right after that command for the receive on the
729 * other end to match the size in the header.
730 */
731
732 error:
733 return ret;
734 }
735
736 /*
737 * Send close stream command to the relayd.
738 */
739 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
740 uint64_t last_net_seq_num)
741 {
742 int ret;
743 struct lttcomm_relayd_close_stream msg;
744 struct lttcomm_relayd_generic_reply reply;
745
746 /* Code flow error. Safety net. */
747 assert(rsock);
748
749 DBG("Relayd closing stream id %" PRIu64, stream_id);
750
751 memset(&msg, 0, sizeof(msg));
752 msg.stream_id = htobe64(stream_id);
753 msg.last_net_seq_num = htobe64(last_net_seq_num);
754
755 /* Send command */
756 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
757 if (ret < 0) {
758 goto error;
759 }
760
761 /* Receive response */
762 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
763 if (ret < 0) {
764 goto error;
765 }
766
767 reply.ret_code = be32toh(reply.ret_code);
768
769 /* Return session id or negative ret code. */
770 if (reply.ret_code != LTTNG_OK) {
771 ret = -1;
772 ERR("Relayd close stream replied error %d", reply.ret_code);
773 } else {
774 /* Success */
775 ret = 0;
776 }
777
778 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
779
780 error:
781 return ret;
782 }
783
784 /*
785 * Check for data availability for a given stream id.
786 *
787 * Return 0 if NOT pending, 1 if so and a negative value on error.
788 */
789 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
790 uint64_t last_net_seq_num)
791 {
792 int ret;
793 struct lttcomm_relayd_data_pending msg;
794 struct lttcomm_relayd_generic_reply reply;
795
796 /* Code flow error. Safety net. */
797 assert(rsock);
798
799 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
800
801 memset(&msg, 0, sizeof(msg));
802 msg.stream_id = htobe64(stream_id);
803 msg.last_net_seq_num = htobe64(last_net_seq_num);
804
805 /* Send command */
806 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
807 sizeof(msg), 0);
808 if (ret < 0) {
809 goto error;
810 }
811
812 /* Receive response */
813 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
814 if (ret < 0) {
815 goto error;
816 }
817
818 reply.ret_code = be32toh(reply.ret_code);
819
820 /* Return session id or negative ret code. */
821 if (reply.ret_code >= LTTNG_OK) {
822 ERR("Relayd data pending replied error %d", reply.ret_code);
823 }
824
825 /* At this point, the ret code is either 1 or 0 */
826 ret = reply.ret_code;
827
828 DBG("Relayd data is %s pending for stream id %" PRIu64,
829 ret == 1 ? "" : "NOT", stream_id);
830
831 error:
832 return ret;
833 }
834
835 /*
836 * Check on the relayd side for a quiescent state on the control socket.
837 */
838 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
839 uint64_t metadata_stream_id)
840 {
841 int ret;
842 struct lttcomm_relayd_quiescent_control msg;
843 struct lttcomm_relayd_generic_reply reply;
844
845 /* Code flow error. Safety net. */
846 assert(rsock);
847
848 DBG("Relayd checking quiescent control state");
849
850 memset(&msg, 0, sizeof(msg));
851 msg.stream_id = htobe64(metadata_stream_id);
852
853 /* Send command */
854 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
855 if (ret < 0) {
856 goto error;
857 }
858
859 /* Receive response */
860 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
861 if (ret < 0) {
862 goto error;
863 }
864
865 reply.ret_code = be32toh(reply.ret_code);
866
867 /* Return session id or negative ret code. */
868 if (reply.ret_code != LTTNG_OK) {
869 ret = -1;
870 ERR("Relayd quiescent control replied error %d", reply.ret_code);
871 goto error;
872 }
873
874 /* Control socket is quiescent */
875 return 0;
876
877 error:
878 return ret;
879 }
880
881 /*
882 * Begin a data pending command for a specific session id.
883 */
884 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
885 {
886 int ret;
887 struct lttcomm_relayd_begin_data_pending msg;
888 struct lttcomm_relayd_generic_reply reply;
889
890 /* Code flow error. Safety net. */
891 assert(rsock);
892
893 DBG("Relayd begin data pending");
894
895 memset(&msg, 0, sizeof(msg));
896 msg.session_id = htobe64(id);
897
898 /* Send command */
899 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
900 if (ret < 0) {
901 goto error;
902 }
903
904 /* Receive response */
905 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
906 if (ret < 0) {
907 goto error;
908 }
909
910 reply.ret_code = be32toh(reply.ret_code);
911
912 /* Return session id or negative ret code. */
913 if (reply.ret_code != LTTNG_OK) {
914 ret = -1;
915 ERR("Relayd begin data pending replied error %d", reply.ret_code);
916 goto error;
917 }
918
919 return 0;
920
921 error:
922 return ret;
923 }
924
925 /*
926 * End a data pending command for a specific session id.
927 *
928 * Return 0 on success and set is_data_inflight to 0 if no data is being
929 * streamed or 1 if it is the case.
930 */
931 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
932 unsigned int *is_data_inflight)
933 {
934 int ret, recv_ret;
935 struct lttcomm_relayd_end_data_pending msg;
936 struct lttcomm_relayd_generic_reply reply;
937
938 /* Code flow error. Safety net. */
939 assert(rsock);
940
941 DBG("Relayd end data pending");
942
943 memset(&msg, 0, sizeof(msg));
944 msg.session_id = htobe64(id);
945
946 /* Send command */
947 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
948 if (ret < 0) {
949 goto error;
950 }
951
952 /* Receive response */
953 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
954 if (ret < 0) {
955 goto error;
956 }
957
958 recv_ret = be32toh(reply.ret_code);
959 if (recv_ret < 0) {
960 ret = recv_ret;
961 goto error;
962 }
963
964 *is_data_inflight = recv_ret;
965
966 DBG("Relayd end data pending is data inflight: %d", recv_ret);
967
968 return 0;
969
970 error:
971 return ret;
972 }
973
974 /*
975 * Send index to the relayd.
976 */
977 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
978 struct ctf_packet_index *index, uint64_t relay_stream_id,
979 uint64_t net_seq_num)
980 {
981 int ret;
982 struct lttcomm_relayd_index msg;
983 struct lttcomm_relayd_generic_reply reply;
984
985 /* Code flow error. Safety net. */
986 assert(rsock);
987
988 if (rsock->minor < 4) {
989 DBG("Not sending indexes before protocol 2.4");
990 ret = 0;
991 goto error;
992 }
993
994 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
995
996 memset(&msg, 0, sizeof(msg));
997 msg.relay_stream_id = htobe64(relay_stream_id);
998 msg.net_seq_num = htobe64(net_seq_num);
999
1000 /* The index is already in big endian. */
1001 msg.packet_size = index->packet_size;
1002 msg.content_size = index->content_size;
1003 msg.timestamp_begin = index->timestamp_begin;
1004 msg.timestamp_end = index->timestamp_end;
1005 msg.events_discarded = index->events_discarded;
1006 msg.stream_id = index->stream_id;
1007
1008 if (rsock->minor >= 8) {
1009 msg.stream_instance_id = index->stream_instance_id;
1010 msg.packet_seq_num = index->packet_seq_num;
1011 }
1012
1013 /* Send command */
1014 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1015 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1016 rsock->minor),
1017 lttng_to_index_minor(rsock->major, rsock->minor)),
1018 0);
1019 if (ret < 0) {
1020 goto error;
1021 }
1022
1023 /* Receive response */
1024 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1025 if (ret < 0) {
1026 goto error;
1027 }
1028
1029 reply.ret_code = be32toh(reply.ret_code);
1030
1031 /* Return session id or negative ret code. */
1032 if (reply.ret_code != LTTNG_OK) {
1033 ret = -1;
1034 ERR("Relayd send index replied error %d", reply.ret_code);
1035 } else {
1036 /* Success */
1037 ret = 0;
1038 }
1039
1040 error:
1041 return ret;
1042 }
1043
1044 /*
1045 * Ask the relay to reset the metadata trace file (regeneration).
1046 */
1047 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1048 uint64_t stream_id, uint64_t version)
1049 {
1050 int ret;
1051 struct lttcomm_relayd_reset_metadata msg;
1052 struct lttcomm_relayd_generic_reply reply;
1053
1054 /* Code flow error. Safety net. */
1055 assert(rsock);
1056
1057 /* Should have been prevented by the sessiond. */
1058 if (rsock->minor < 8) {
1059 ERR("Metadata regeneration unsupported before 2.8");
1060 ret = -1;
1061 goto error;
1062 }
1063
1064 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1065
1066 memset(&msg, 0, sizeof(msg));
1067 msg.stream_id = htobe64(stream_id);
1068 msg.version = htobe64(version);
1069
1070 /* Send command */
1071 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1072 if (ret < 0) {
1073 goto error;
1074 }
1075
1076 /* Receive response */
1077 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1078 if (ret < 0) {
1079 goto error;
1080 }
1081
1082 reply.ret_code = be32toh(reply.ret_code);
1083
1084 /* Return session id or negative ret code. */
1085 if (reply.ret_code != LTTNG_OK) {
1086 ret = -1;
1087 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1088 } else {
1089 /* Success */
1090 ret = 0;
1091 }
1092
1093 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1094
1095 error:
1096 return ret;
1097 }
1098
1099 int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1100 const char *new_pathname, uint64_t new_chunk_id,
1101 uint64_t seq_num)
1102 {
1103 int ret;
1104 struct lttcomm_relayd_rotate_stream *msg = NULL;
1105 struct lttcomm_relayd_generic_reply reply;
1106 size_t len;
1107 int msg_len;
1108
1109 /* Code flow error. Safety net. */
1110 assert(rsock);
1111
1112 DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
1113
1114 /* Account for the trailing NULL. */
1115 len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
1116 if (len > LTTNG_PATH_MAX) {
1117 ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
1118 ret = -1;
1119 goto error;
1120 }
1121
1122 msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
1123 msg = zmalloc(msg_len);
1124 if (!msg) {
1125 PERROR("Failed to allocate relayd rotate stream command of %d bytes",
1126 msg_len);
1127 ret = -1;
1128 goto error;
1129 }
1130
1131 if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
1132 ret = -1;
1133 ERR("Failed to copy relayd rotate stream command's new path name");
1134 goto error;
1135 }
1136
1137 msg->pathname_length = htobe32(len);
1138 msg->stream_id = htobe64(stream_id);
1139 msg->new_chunk_id = htobe64(new_chunk_id);
1140 /*
1141 * The seq_num is invalid for metadata streams, but it is ignored on
1142 * the relay.
1143 */
1144 msg->rotate_at_seq_num = htobe64(seq_num);
1145
1146 /* Send command. */
1147 ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
1148 if (ret < 0) {
1149 ERR("Send rotate command");
1150 goto error;
1151 }
1152
1153 /* Receive response. */
1154 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1155 if (ret < 0) {
1156 ERR("Receive rotate reply");
1157 goto error;
1158 }
1159
1160 reply.ret_code = be32toh(reply.ret_code);
1161
1162 /* Return session id or negative ret code. */
1163 if (reply.ret_code != LTTNG_OK) {
1164 ret = -1;
1165 ERR("Relayd rotate stream replied error %d", reply.ret_code);
1166 } else {
1167 /* Success. */
1168 ret = 0;
1169 DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
1170 }
1171
1172 error:
1173 free(msg);
1174 return ret;
1175 }
1176
1177 int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
1178 const char *old_path, const char *new_path)
1179 {
1180 int ret;
1181 struct lttcomm_relayd_rotate_rename *msg = NULL;
1182 struct lttcomm_relayd_generic_reply reply;
1183 size_t old_path_length, new_path_length;
1184 size_t msg_length;
1185
1186 /* Code flow error. Safety net. */
1187 assert(rsock);
1188
1189 DBG("Relayd rename chunk %s to %s", old_path, new_path);
1190
1191 /* The two paths are sent with a '\0' delimiter between them. */
1192 old_path_length = strlen(old_path) + 1;
1193 new_path_length = strlen(new_path) + 1;
1194
1195 msg_length = sizeof(*msg) + old_path_length + new_path_length;
1196 msg = zmalloc(msg_length);
1197 if (!msg) {
1198 PERROR("zmalloc rotate-rename command message");
1199 ret = -1;
1200 goto error;
1201 }
1202
1203 assert(old_path_length <= UINT32_MAX);
1204 msg->old_path_length = htobe32(old_path_length);
1205
1206 assert(new_path_length <= UINT32_MAX);
1207 msg->new_path_length = htobe32(new_path_length);
1208
1209 strcpy(msg->paths, old_path);
1210 strcpy(msg->paths + old_path_length, new_path);
1211
1212 /* Send command */
1213 ret = send_command(rsock, RELAYD_ROTATE_RENAME, (const void *) msg,
1214 msg_length, 0);
1215 if (ret < 0) {
1216 goto error;
1217 }
1218
1219 /* Receive response */
1220 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1221 if (ret < 0) {
1222 goto error;
1223 }
1224
1225 reply.ret_code = be32toh(reply.ret_code);
1226
1227 /* Return session id or negative ret code. */
1228 if (reply.ret_code != LTTNG_OK) {
1229 ret = -1;
1230 ERR("Relayd rotate rename replied error %d", reply.ret_code);
1231 } else {
1232 /* Success */
1233 ret = 0;
1234 }
1235
1236 DBG("Relayd rotate rename completed successfully");
1237
1238 error:
1239 free(msg);
1240 return ret;
1241 }
1242
1243 int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
1244 {
1245 int ret;
1246 struct lttcomm_relayd_rotate_pending msg;
1247 struct lttcomm_relayd_rotate_pending_reply reply;
1248
1249 /* Code flow error. Safety net. */
1250 assert(rsock);
1251
1252 DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
1253 chunk_id);
1254
1255 memset(&msg, 0, sizeof(msg));
1256 msg.chunk_id = htobe64(chunk_id);
1257
1258 /* Send command */
1259 ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
1260 sizeof(msg), 0);
1261 if (ret < 0) {
1262 goto error;
1263 }
1264
1265 /* Receive response */
1266 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1267 if (ret < 0) {
1268 goto error;
1269 }
1270
1271 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1272
1273 /* Return session id or negative ret code. */
1274 if (reply.generic.ret_code != LTTNG_OK) {
1275 ret = -reply.generic.ret_code;
1276 ERR("Relayd rotate pending replied with error %d", ret);
1277 goto error;
1278 } else {
1279 /* No error, just rotate pending state */
1280 if (reply.is_pending == 0 || reply.is_pending == 1) {
1281 ret = reply.is_pending;
1282 DBG("Relayd rotate pending command completed successfully with result \"%s\"",
1283 ret ? "rotation pending" : "rotation NOT pending");
1284 } else {
1285 ret = -LTTNG_ERR_UNK;
1286 }
1287 }
1288
1289 error:
1290 return ret;
1291 }
1292
1293 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
1294 {
1295 int ret;
1296 struct lttcomm_relayd_mkdir *msg;
1297 struct lttcomm_relayd_generic_reply reply;
1298 size_t len;
1299
1300 /* Code flow error. Safety net. */
1301 assert(rsock);
1302
1303 DBG("Relayd mkdir path %s", path);
1304
1305 len = strlen(path) + 1;
1306 msg = zmalloc(sizeof(msg->length) + len);
1307 if (!msg) {
1308 PERROR("Alloc mkdir msg");
1309 ret = -1;
1310 goto error;
1311 }
1312 msg->length = htobe32((uint32_t) len);
1313
1314 if (lttng_strncpy(msg->path, path, len)) {
1315 ret = -1;
1316 goto error;
1317 }
1318
1319 /* Send command */
1320 ret = send_command(rsock, RELAYD_MKDIR, (void *) msg,
1321 sizeof(msg->length) + len, 0);
1322 if (ret < 0) {
1323 goto error;
1324 }
1325
1326 /* Receive response */
1327 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1328 if (ret < 0) {
1329 goto error;
1330 }
1331
1332 reply.ret_code = be32toh(reply.ret_code);
1333
1334 /* Return session id or negative ret code. */
1335 if (reply.ret_code != LTTNG_OK) {
1336 ret = -1;
1337 ERR("Relayd mkdir replied error %d", reply.ret_code);
1338 } else {
1339 /* Success */
1340 ret = 0;
1341 }
1342
1343 DBG("Relayd mkdir completed successfully");
1344
1345 error:
1346 free(msg);
1347 return ret;
1348 }
This page took 0.088268 seconds and 5 git commands to generate.