Run clang-format on the whole tree
[lttng-tools.git] / tests / regression / tools / live / live_test.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include <common/common.hpp>
9 #include <common/compat/endian.hpp>
10 #include <common/compat/errno.hpp>
11 #include <common/compat/time.hpp>
12 #include <common/index/ctf-index.hpp>
13
14 #include <lttng/lttng.h>
15
16 #include <bin/lttng-relayd/lttng-viewer-abi.hpp>
17 #include <fcntl.h>
18 #include <inttypes.h>
19 #include <netdb.h>
20 #include <netinet/in.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/mman.h>
25 #include <sys/socket.h>
26 #include <sys/stat.h>
27 #include <sys/types.h>
28 #include <tap/tap.h>
29 #include <unistd.h>
30 #include <urcu/list.h>
31
32 #define SESSION1 "test1"
33 #define RELAYD_URL "net://localhost"
34 #define LIVE_TIMER 2000000
35
36 /* Number of TAP tests in this file */
37 #define NUM_TESTS 11
38 #define mmap_size 524288
39
40 #ifdef HAVE_LIBLTTNG_UST_CTL
41 #include <lttng/lttng-export.h>
42 #include <lttng/ust-sigbus.h>
43 LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
44 #endif
45
46 namespace {
47 struct live_session *session;
48 int control_sock;
49
50 int first_packet_offset;
51 int first_packet_len;
52 int first_packet_stream_id = -1;
53
54 struct viewer_stream {
55 uint64_t id;
56 uint64_t ctf_trace_id;
57 void *mmap_base;
58 int fd;
59 int metadata_flag;
60 int first_read;
61 char path[PATH_MAX];
62 };
63
64 struct live_session {
65 struct viewer_stream *streams;
66 uint64_t live_timer_interval;
67 uint64_t stream_count;
68 };
69 } /* namespace */
70
71 static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
72 {
73 ssize_t ret;
74 size_t copied = 0, to_copy = len;
75
76 do {
77 ret = recv(fd, (char *) buf + copied, to_copy, 0);
78 if (ret > 0) {
79 LTTNG_ASSERT(ret <= to_copy);
80 copied += ret;
81 to_copy -= ret;
82 }
83 } while ((ret > 0 && to_copy > 0) || (ret < 0 && errno == EINTR));
84 if (ret > 0)
85 ret = copied;
86 /* ret = 0 means orderly shutdown, ret < 0 is error. */
87 return ret;
88 }
89
90 static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
91 {
92 ssize_t ret;
93
94 do {
95 ret = send(fd, buf, len, MSG_NOSIGNAL);
96 } while (ret < 0 && errno == EINTR);
97 return ret;
98 }
99
100 static int connect_viewer(const char *hostname)
101 {
102 struct hostent *host;
103 struct sockaddr_in server_addr;
104 int ret;
105
106 host = gethostbyname(hostname);
107 if (!host) {
108 ret = -1;
109 goto end;
110 }
111
112 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
113 PERROR("Socket");
114 ret = -1;
115 goto end;
116 }
117
118 server_addr.sin_family = AF_INET;
119 server_addr.sin_port = htons(5344);
120 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
121 bzero(&(server_addr.sin_zero), 8);
122
123 if (connect(control_sock, (struct sockaddr *) &server_addr, sizeof(struct sockaddr)) ==
124 -1) {
125 PERROR("Connect");
126 ret = -1;
127 goto end;
128 }
129
130 server_addr.sin_family = AF_INET;
131 server_addr.sin_port = htons(5345);
132 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
133 bzero(&(server_addr.sin_zero), 8);
134
135 ret = 0;
136
137 end:
138 return ret;
139 }
140
141 static int establish_connection(void)
142 {
143 struct lttng_viewer_cmd cmd;
144 struct lttng_viewer_connect connect;
145 ssize_t ret_len;
146
147 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
148 cmd.data_size = htobe64(sizeof(connect));
149 cmd.cmd_version = htobe32(0);
150
151 memset(&connect, 0, sizeof(connect));
152 connect.major = htobe32(VERSION_MAJOR);
153 connect.minor = htobe32(VERSION_MINOR);
154 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
155
156 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
157 if (ret_len < 0) {
158 diag("Error sending cmd");
159 goto error;
160 }
161 ret_len = lttng_live_send(control_sock, &connect, sizeof(connect));
162 if (ret_len < 0) {
163 diag("Error sending version");
164 goto error;
165 }
166
167 ret_len = lttng_live_recv(control_sock, &connect, sizeof(connect));
168 if (ret_len == 0) {
169 diag("[error] Remote side has closed connection");
170 goto error;
171 }
172 if (ret_len < 0) {
173 diag("Error receiving version");
174 goto error;
175 }
176 return 0;
177
178 error:
179 return -1;
180 }
181
182 /*
183 * Returns the number of sessions, should be 1 during the unit test.
184 */
185 static int list_sessions(uint64_t *session_id)
186 {
187 struct lttng_viewer_cmd cmd;
188 struct lttng_viewer_list_sessions list;
189 struct lttng_viewer_session lsession;
190 int i;
191 ssize_t ret_len;
192 int first_session = 0;
193
194 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
195 cmd.data_size = htobe64(0);
196 cmd.cmd_version = htobe32(0);
197
198 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
199 if (ret_len < 0) {
200 diag("Error sending cmd");
201 goto error;
202 }
203
204 ret_len = lttng_live_recv(control_sock, &list, sizeof(list));
205 if (ret_len == 0) {
206 diag("[error] Remote side has closed connection");
207 goto error;
208 }
209 if (ret_len < 0) {
210 diag("Error receiving session list");
211 goto error;
212 }
213
214 for (i = 0; i < be32toh(list.sessions_count); i++) {
215 ret_len = lttng_live_recv(control_sock, &lsession, sizeof(lsession));
216 if (ret_len < 0) {
217 diag("Error receiving session");
218 goto error;
219 }
220 if (lsession.streams > 0 && first_session <= 0) {
221 first_session = be64toh(lsession.id);
222 *session_id = first_session;
223 }
224 }
225
226 return be32toh(list.sessions_count);
227
228 error:
229 return -1;
230 }
231
232 static int create_viewer_session(void)
233 {
234 struct lttng_viewer_cmd cmd;
235 struct lttng_viewer_create_session_response resp;
236 ssize_t ret_len;
237
238 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
239 cmd.data_size = htobe64(0);
240 cmd.cmd_version = htobe32(0);
241
242 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
243 if (ret_len < 0) {
244 diag("[error] Error sending cmd");
245 goto error;
246 }
247 LTTNG_ASSERT(ret_len == sizeof(cmd));
248
249 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
250 if (ret_len == 0) {
251 diag("[error] Remote side has closed connection");
252 goto error;
253 }
254 if (ret_len < 0) {
255 diag("[error] Error receiving create session reply");
256 goto error;
257 }
258 LTTNG_ASSERT(ret_len == sizeof(resp));
259
260 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
261 diag("[error] Error creating viewer session");
262 goto error;
263 }
264 return 0;
265
266 error:
267 return -1;
268 }
269
270 static int attach_session(uint64_t id)
271 {
272 struct lttng_viewer_cmd cmd;
273 struct lttng_viewer_attach_session_request rq;
274 struct lttng_viewer_attach_session_response rp;
275 struct lttng_viewer_stream stream;
276 int i;
277 ssize_t ret_len;
278
279 session = zmalloc<live_session>();
280 if (!session) {
281 goto error;
282 }
283
284 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
285 cmd.data_size = htobe64(sizeof(rq));
286 cmd.cmd_version = htobe32(0);
287
288 memset(&rq, 0, sizeof(rq));
289 rq.session_id = htobe64(id);
290 rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
291
292 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
293 if (ret_len < 0) {
294 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
295 goto error;
296 }
297 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
298 if (ret_len < 0) {
299 diag("Error sending attach request");
300 goto error;
301 }
302
303 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
304 if (ret_len == 0) {
305 diag("[error] Remote side has closed connection");
306 goto error;
307 }
308 if (ret_len < 0) {
309 diag("Error receiving attach response");
310 goto error;
311 }
312 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
313 goto error;
314 }
315
316 session->stream_count = be32toh(rp.streams_count);
317 if (session->stream_count == 0) {
318 diag("Got session stream count == 0");
319 goto error;
320 }
321 session->streams = calloc<viewer_stream>(session->stream_count);
322 if (!session->streams) {
323 goto error;
324 }
325
326 for (i = 0; i < be32toh(rp.streams_count); i++) {
327 ret_len = lttng_live_recv(control_sock, &stream, sizeof(stream));
328 if (ret_len == 0) {
329 diag("[error] Remote side has closed connection");
330 goto error;
331 }
332 if (ret_len < 0) {
333 diag("Error receiving stream");
334 goto error;
335 }
336 session->streams[i].id = be64toh(stream.id);
337
338 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
339 session->streams[i].first_read = 1;
340 session->streams[i].mmap_base = mmap(
341 NULL, mmap_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
342 if (session->streams[i].mmap_base == MAP_FAILED) {
343 diag("mmap error");
344 goto error;
345 }
346
347 if (be32toh(stream.metadata_flag)) {
348 session->streams[i].metadata_flag = 1;
349 }
350 }
351 return session->stream_count;
352
353 error:
354 return -1;
355 }
356
357 static int get_metadata(void)
358 {
359 struct lttng_viewer_cmd cmd;
360 struct lttng_viewer_get_metadata rq;
361 struct lttng_viewer_metadata_packet rp;
362 ssize_t ret_len;
363 int ret;
364 uint64_t i;
365 char *data = NULL;
366 uint64_t len = 0;
367 int metadata_stream_id = -1;
368
369 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
370 cmd.data_size = htobe64(sizeof(rq));
371 cmd.cmd_version = htobe32(0);
372
373 for (i = 0; i < session->stream_count; i++) {
374 if (session->streams[i].metadata_flag) {
375 metadata_stream_id = i;
376 break;
377 }
378 }
379
380 if (metadata_stream_id < 0) {
381 diag("No metadata stream found");
382 goto error;
383 }
384
385 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
386
387 retry:
388 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
389 if (ret_len < 0) {
390 diag("Error sending cmd");
391 goto error;
392 }
393 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
394 if (ret_len < 0) {
395 diag("Error sending get_metadata request");
396 goto error;
397 }
398 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
399 if (ret_len == 0) {
400 diag("[error] Remote side has closed connection");
401 goto error;
402 }
403 if (ret_len < 0) {
404 diag("Error receiving metadata response");
405 goto error;
406 }
407 switch (be32toh(rp.status)) {
408 case LTTNG_VIEWER_METADATA_OK:
409 break;
410 case LTTNG_VIEWER_NO_NEW_METADATA:
411 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
412 usleep(50);
413 goto retry;
414 case LTTNG_VIEWER_METADATA_ERR:
415 diag("Got LTTNG_VIEWER_METADATA_ERR:");
416 goto error;
417 default:
418 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
419 goto error;
420 }
421
422 len = be64toh(rp.len);
423 if (len <= 0) {
424 goto error;
425 }
426
427 data = calloc<char>(len);
428 if (!data) {
429 PERROR("relay data zmalloc");
430 goto error;
431 }
432 ret_len = lttng_live_recv(control_sock, data, len);
433 if (ret_len == 0) {
434 diag("[error] Remote side has closed connection");
435 goto error_free_data;
436 }
437 if (ret_len < 0) {
438 diag("Error receiving trace packet");
439 goto error_free_data;
440 }
441 free(data);
442 ret = len;
443
444 return ret;
445
446 error_free_data:
447 free(data);
448 error:
449 return -1;
450 }
451
452 static int get_next_index(void)
453 {
454 struct lttng_viewer_cmd cmd;
455 struct lttng_viewer_get_next_index rq;
456 struct lttng_viewer_index rp;
457 ssize_t ret_len;
458 int id;
459
460 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
461 cmd.data_size = htobe64(sizeof(rq));
462 cmd.cmd_version = htobe32(0);
463
464 for (id = 0; id < session->stream_count; id++) {
465 if (session->streams[id].metadata_flag) {
466 continue;
467 }
468 memset(&rq, 0, sizeof(rq));
469 rq.stream_id = htobe64(session->streams[id].id);
470
471 retry:
472 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
473 if (ret_len < 0) {
474 diag("Error sending cmd");
475 goto error;
476 }
477 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
478 if (ret_len < 0) {
479 diag("Error sending get_next_index request");
480 goto error;
481 }
482 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
483 if (ret_len == 0) {
484 diag("[error] Remote side has closed connection");
485 goto error;
486 }
487 if (ret_len < 0) {
488 diag("Error receiving index response");
489 goto error;
490 }
491
492 rp.flags = be32toh(rp.flags);
493
494 switch (be32toh(rp.status)) {
495 case LTTNG_VIEWER_INDEX_INACTIVE:
496 /* Skip this stream. */
497 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
498 continue;
499 case LTTNG_VIEWER_INDEX_OK:
500 break;
501 case LTTNG_VIEWER_INDEX_RETRY:
502 sleep(1);
503 goto retry;
504 case LTTNG_VIEWER_INDEX_HUP:
505 diag("Got LTTNG_VIEWER_INDEX_HUP");
506 session->streams[id].id = -1ULL;
507 session->streams[id].fd = -1;
508 goto error;
509 case LTTNG_VIEWER_INDEX_ERR:
510 diag("Got LTTNG_VIEWER_INDEX_ERR");
511 goto error;
512 default:
513 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)",
514 be32toh(rp.status));
515 goto error;
516 }
517 if (first_packet_stream_id < 0) {
518 /*
519 * Initialize the first packet stream id. That is,
520 * the first active stream encoutered.
521 */
522 first_packet_offset = be64toh(rp.offset);
523 first_packet_len = be64toh(rp.packet_size) / CHAR_BIT;
524 first_packet_stream_id = id;
525 diag("Got first packet index with offset %d and len %d",
526 first_packet_offset,
527 first_packet_len);
528 }
529 }
530 return 0;
531
532 error:
533 return -1;
534 }
535
536 static int get_data_packet(int id, uint64_t offset, uint64_t len)
537 {
538 struct lttng_viewer_cmd cmd;
539 struct lttng_viewer_get_packet rq;
540 struct lttng_viewer_trace_packet rp;
541 ssize_t ret_len;
542
543 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
544 cmd.data_size = htobe64(sizeof(rq));
545 cmd.cmd_version = htobe32(0);
546
547 memset(&rq, 0, sizeof(rq));
548 rq.stream_id = htobe64(session->streams[id].id);
549 /* Already in big endian. */
550 rq.offset = offset;
551 rq.len = htobe32(len);
552
553 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
554 if (ret_len < 0) {
555 diag("Error sending cmd");
556 goto error;
557 }
558 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
559 if (ret_len < 0) {
560 diag("Error sending get_data_packet request");
561 goto error;
562 }
563 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
564 if (ret_len == 0) {
565 diag("[error] Remote side has closed connection");
566 goto error;
567 }
568 if (ret_len < 0) {
569 diag("Error receiving data response");
570 goto error;
571 }
572 rp.flags = be32toh(rp.flags);
573
574 switch (be32toh(rp.status)) {
575 case LTTNG_VIEWER_GET_PACKET_OK:
576 len = be32toh(rp.len);
577 if (len == 0) {
578 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
579 goto error;
580 }
581 break;
582 case LTTNG_VIEWER_GET_PACKET_RETRY:
583 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
584 goto error;
585 case LTTNG_VIEWER_GET_PACKET_ERR:
586 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
587 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
588 goto end;
589 }
590 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
591 goto error;
592 default:
593 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
594 goto error;
595 }
596
597 if (len > mmap_size) {
598 diag("mmap_size not big enough");
599 goto error;
600 }
601
602 ret_len = lttng_live_recv(control_sock, session->streams[id].mmap_base, len);
603 if (ret_len == 0) {
604 diag("[error] Remote side has closed connection");
605 goto error;
606 }
607 if (ret_len < 0) {
608 diag("Error receiving trace packet");
609 goto error;
610 }
611 end:
612 return 0;
613 error:
614 return -1;
615 }
616
617 static int detach_viewer_session(uint64_t id)
618 {
619 struct lttng_viewer_cmd cmd;
620 struct lttng_viewer_detach_session_response resp;
621 struct lttng_viewer_detach_session_request rq;
622 int ret;
623 ssize_t ret_len;
624
625 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
626 cmd.data_size = htobe64(sizeof(rq));
627 cmd.cmd_version = htobe32(0);
628
629 memset(&rq, 0, sizeof(rq));
630 rq.session_id = htobe64(id);
631
632 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
633 if (ret_len < 0) {
634 fprintf(stderr, "[error] Error sending cmd\n");
635 ret = ret_len;
636 goto error;
637 }
638
639 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
640 if (ret_len < 0) {
641 fprintf(stderr, "Error sending attach request\n");
642 ret = ret_len;
643 goto error;
644 }
645
646 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
647 if (ret_len < 0) {
648 fprintf(stderr, "[error] Error receiving detach session reply\n");
649 ret = ret_len;
650 goto error;
651 }
652
653 if (be32toh(resp.status) != LTTNG_VIEWER_DETACH_SESSION_OK) {
654 fprintf(stderr, "[error] Error detaching viewer session\n");
655 ret = -1;
656 goto error;
657 }
658 ret = 0;
659
660 error:
661 return ret;
662 }
663
664 int main(void)
665 {
666 int ret;
667 uint64_t session_id;
668
669 plan_tests(NUM_TESTS);
670
671 diag("Live unit tests");
672
673 ret = connect_viewer("localhost");
674 ok(ret == 0, "Connect viewer to relayd");
675
676 ret = establish_connection();
677 ok(ret == 0,
678 "Established connection and version check with %d.%d",
679 VERSION_MAJOR,
680 VERSION_MINOR);
681
682 ret = list_sessions(&session_id);
683 ok(ret > 0, "List sessions : %d session(s)", ret);
684 if (ret < 0) {
685 goto end;
686 }
687
688 ret = create_viewer_session();
689 ok(ret == 0, "Create viewer session");
690
691 ret = attach_session(session_id);
692 ok(ret > 0, "Attach to session, %d stream(s) received", ret);
693
694 ret = get_metadata();
695 ok(ret > 0, "Get metadata, received %d bytes", ret);
696
697 ret = get_next_index();
698 ok(ret == 0, "Get one index per stream");
699
700 ret = get_data_packet(first_packet_stream_id, first_packet_offset, first_packet_len);
701 ok(ret == 0,
702 "Get one data packet for stream %d, offset %d, len %d",
703 first_packet_stream_id,
704 first_packet_offset,
705 first_packet_len);
706
707 ret = detach_viewer_session(session_id);
708 ok(ret == 0, "Detach viewer session");
709
710 ret = list_sessions(&session_id);
711 ok(ret > 0, "List sessions : %d session(s)", ret);
712
713 ret = attach_session(session_id);
714 ok(ret > 0, "Attach to session, %d streams received", ret);
715 end:
716 return exit_status();
717 }
This page took 0.044864 seconds and 4 git commands to generate.