Fix: tap.h conflicts with Clang C++ headers
[lttng-tools.git] / tests / regression / tools / live / live_test.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <unistd.h>
12 #include <common/compat/time.hpp>
13 #include <sys/types.h>
14 #include <inttypes.h>
15 #include <stdlib.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netdb.h>
19 #include <fcntl.h>
20 #include <sys/mman.h>
21 #include <sys/stat.h>
22
23 #include <lttng/lttng.h>
24
25 #include <urcu/list.h>
26 #include <common/common.hpp>
27
28 #include <bin/lttng-relayd/lttng-viewer-abi.hpp>
29 #include <common/index/ctf-index.hpp>
30
31 #include <common/compat/errno.hpp>
32 #include <common/compat/endian.hpp>
33
34 #include <tap/tap.h>
35
36 #define SESSION1 "test1"
37 #define RELAYD_URL "net://localhost"
38 #define LIVE_TIMER 2000000
39
40 /* Number of TAP tests in this file */
41 #define NUM_TESTS 11
42 #define mmap_size 524288
43
44 #ifdef HAVE_LIBLTTNG_UST_CTL
45 #include <lttng/lttng-export.h>
46 #include <lttng/ust-sigbus.h>
47 LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
48 #endif
49
50 namespace {
51 struct live_session *session;
52 int control_sock;
53
54 int first_packet_offset;
55 int first_packet_len;
56 int first_packet_stream_id = -1;
57
58 struct viewer_stream {
59 uint64_t id;
60 uint64_t ctf_trace_id;
61 void *mmap_base;
62 int fd;
63 int metadata_flag;
64 int first_read;
65 char path[PATH_MAX];
66 };
67
68 struct live_session {
69 struct viewer_stream *streams;
70 uint64_t live_timer_interval;
71 uint64_t stream_count;
72 };
73 } /* namespace */
74
75 static
76 ssize_t lttng_live_recv(int fd, void *buf, size_t len)
77 {
78 ssize_t ret;
79 size_t copied = 0, to_copy = len;
80
81 do {
82 ret = recv(fd, (char *) buf + copied, to_copy, 0);
83 if (ret > 0) {
84 LTTNG_ASSERT(ret <= to_copy);
85 copied += ret;
86 to_copy -= ret;
87 }
88 } while ((ret > 0 && to_copy > 0)
89 || (ret < 0 && errno == EINTR));
90 if (ret > 0)
91 ret = copied;
92 /* ret = 0 means orderly shutdown, ret < 0 is error. */
93 return ret;
94 }
95
96 static
97 ssize_t lttng_live_send(int fd, const void *buf, size_t len)
98 {
99 ssize_t ret;
100
101 do {
102 ret = send(fd, buf, len, MSG_NOSIGNAL);
103 } while (ret < 0 && errno == EINTR);
104 return ret;
105 }
106
107 static
108 int connect_viewer(const char *hostname)
109 {
110 struct hostent *host;
111 struct sockaddr_in server_addr;
112 int ret;
113
114 host = gethostbyname(hostname);
115 if (!host) {
116 ret = -1;
117 goto end;
118 }
119
120 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
121 PERROR("Socket");
122 ret = -1;
123 goto end;
124 }
125
126 server_addr.sin_family = AF_INET;
127 server_addr.sin_port = htons(5344);
128 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
129 bzero(&(server_addr.sin_zero), 8);
130
131 if (connect(control_sock, (struct sockaddr *) &server_addr,
132 sizeof(struct sockaddr)) == -1) {
133 PERROR("Connect");
134 ret = -1;
135 goto end;
136 }
137
138 server_addr.sin_family = AF_INET;
139 server_addr.sin_port = htons(5345);
140 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
141 bzero(&(server_addr.sin_zero), 8);
142
143 ret = 0;
144
145 end:
146 return ret;
147 }
148
149 static
150 int establish_connection(void)
151 {
152 struct lttng_viewer_cmd cmd;
153 struct lttng_viewer_connect connect;
154 ssize_t ret_len;
155
156 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
157 cmd.data_size = htobe64(sizeof(connect));
158 cmd.cmd_version = htobe32(0);
159
160 memset(&connect, 0, sizeof(connect));
161 connect.major = htobe32(VERSION_MAJOR);
162 connect.minor = htobe32(VERSION_MINOR);
163 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
164
165 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
166 if (ret_len < 0) {
167 diag("Error sending cmd");
168 goto error;
169 }
170 ret_len = lttng_live_send(control_sock, &connect, sizeof(connect));
171 if (ret_len < 0) {
172 diag("Error sending version");
173 goto error;
174 }
175
176 ret_len = lttng_live_recv(control_sock, &connect, sizeof(connect));
177 if (ret_len == 0) {
178 diag("[error] Remote side has closed connection");
179 goto error;
180 }
181 if (ret_len < 0) {
182 diag("Error receiving version");
183 goto error;
184 }
185 return 0;
186
187 error:
188 return -1;
189 }
190
191 /*
192 * Returns the number of sessions, should be 1 during the unit test.
193 */
194 static
195 int list_sessions(uint64_t *session_id)
196 {
197 struct lttng_viewer_cmd cmd;
198 struct lttng_viewer_list_sessions list;
199 struct lttng_viewer_session lsession;
200 int i;
201 ssize_t ret_len;
202 int first_session = 0;
203
204 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
205 cmd.data_size = htobe64(0);
206 cmd.cmd_version = htobe32(0);
207
208 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
209 if (ret_len < 0) {
210 diag("Error sending cmd");
211 goto error;
212 }
213
214 ret_len = lttng_live_recv(control_sock, &list, sizeof(list));
215 if (ret_len == 0) {
216 diag("[error] Remote side has closed connection");
217 goto error;
218 }
219 if (ret_len < 0) {
220 diag("Error receiving session list");
221 goto error;
222 }
223
224 for (i = 0; i < be32toh(list.sessions_count); i++) {
225 ret_len = lttng_live_recv(control_sock, &lsession, sizeof(lsession));
226 if (ret_len < 0) {
227 diag("Error receiving session");
228 goto error;
229 }
230 if (lsession.streams > 0 && first_session <= 0) {
231 first_session = be64toh(lsession.id);
232 *session_id = first_session;
233 }
234 }
235
236 return be32toh(list.sessions_count);
237
238 error:
239 return -1;
240 }
241
242 static
243 int create_viewer_session(void)
244 {
245 struct lttng_viewer_cmd cmd;
246 struct lttng_viewer_create_session_response resp;
247 ssize_t ret_len;
248
249 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
250 cmd.data_size = htobe64(0);
251 cmd.cmd_version = htobe32(0);
252
253 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
254 if (ret_len < 0) {
255 diag("[error] Error sending cmd");
256 goto error;
257 }
258 LTTNG_ASSERT(ret_len == sizeof(cmd));
259
260 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
261 if (ret_len == 0) {
262 diag("[error] Remote side has closed connection");
263 goto error;
264 }
265 if (ret_len < 0) {
266 diag("[error] Error receiving create session reply");
267 goto error;
268 }
269 LTTNG_ASSERT(ret_len == sizeof(resp));
270
271 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
272 diag("[error] Error creating viewer session");
273 goto error;
274 }
275 return 0;
276
277 error:
278 return -1;
279 }
280
281 static
282 int attach_session(uint64_t id)
283 {
284 struct lttng_viewer_cmd cmd;
285 struct lttng_viewer_attach_session_request rq;
286 struct lttng_viewer_attach_session_response rp;
287 struct lttng_viewer_stream stream;
288 int i;
289 ssize_t ret_len;
290
291 session = zmalloc<live_session>();
292 if (!session) {
293 goto error;
294 }
295
296 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
297 cmd.data_size = htobe64(sizeof(rq));
298 cmd.cmd_version = htobe32(0);
299
300 memset(&rq, 0, sizeof(rq));
301 rq.session_id = htobe64(id);
302 rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
303
304 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
305 if (ret_len < 0) {
306 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
307 goto error;
308 }
309 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
310 if (ret_len < 0) {
311 diag("Error sending attach request");
312 goto error;
313 }
314
315 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
316 if (ret_len == 0) {
317 diag("[error] Remote side has closed connection");
318 goto error;
319 }
320 if (ret_len < 0) {
321 diag("Error receiving attach response");
322 goto error;
323 }
324 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
325 goto error;
326 }
327
328 session->stream_count = be32toh(rp.streams_count);
329 if (session->stream_count == 0) {
330 diag("Got session stream count == 0");
331 goto error;
332 }
333 session->streams = calloc<viewer_stream>(session->stream_count);
334 if (!session->streams) {
335 goto error;
336 }
337
338 for (i = 0; i < be32toh(rp.streams_count); i++) {
339 ret_len = lttng_live_recv(control_sock, &stream, sizeof(stream));
340 if (ret_len == 0) {
341 diag("[error] Remote side has closed connection");
342 goto error;
343 }
344 if (ret_len < 0) {
345 diag("Error receiving stream");
346 goto error;
347 }
348 session->streams[i].id = be64toh(stream.id);
349
350 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
351 session->streams[i].first_read = 1;
352 session->streams[i].mmap_base = mmap(NULL, mmap_size,
353 PROT_READ | PROT_WRITE,
354 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
355 if (session->streams[i].mmap_base == MAP_FAILED) {
356 diag("mmap error");
357 goto error;
358 }
359
360 if (be32toh(stream.metadata_flag)) {
361 session->streams[i].metadata_flag = 1;
362 }
363 }
364 return session->stream_count;
365
366 error:
367 return -1;
368 }
369
370 static
371 int get_metadata(void)
372 {
373 struct lttng_viewer_cmd cmd;
374 struct lttng_viewer_get_metadata rq;
375 struct lttng_viewer_metadata_packet rp;
376 ssize_t ret_len;
377 int ret;
378 uint64_t i;
379 char *data = NULL;
380 uint64_t len = 0;
381 int metadata_stream_id = -1;
382
383 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
384 cmd.data_size = htobe64(sizeof(rq));
385 cmd.cmd_version = htobe32(0);
386
387 for (i = 0; i < session->stream_count; i++) {
388 if (session->streams[i].metadata_flag) {
389 metadata_stream_id = i;
390 break;
391 }
392 }
393
394 if (metadata_stream_id < 0) {
395 diag("No metadata stream found");
396 goto error;
397 }
398
399 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
400
401 retry:
402 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
403 if (ret_len < 0) {
404 diag("Error sending cmd");
405 goto error;
406 }
407 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
408 if (ret_len < 0) {
409 diag("Error sending get_metadata request");
410 goto error;
411 }
412 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
413 if (ret_len == 0) {
414 diag("[error] Remote side has closed connection");
415 goto error;
416 }
417 if (ret_len < 0) {
418 diag("Error receiving metadata response");
419 goto error;
420 }
421 switch (be32toh(rp.status)) {
422 case LTTNG_VIEWER_METADATA_OK:
423 break;
424 case LTTNG_VIEWER_NO_NEW_METADATA:
425 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
426 usleep(50);
427 goto retry;
428 case LTTNG_VIEWER_METADATA_ERR:
429 diag("Got LTTNG_VIEWER_METADATA_ERR:");
430 goto error;
431 default:
432 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
433 goto error;
434 }
435
436 len = be64toh(rp.len);
437 if (len <= 0) {
438 goto error;
439 }
440
441 data = calloc<char>(len);
442 if (!data) {
443 PERROR("relay data zmalloc");
444 goto error;
445 }
446 ret_len = lttng_live_recv(control_sock, data, len);
447 if (ret_len == 0) {
448 diag("[error] Remote side has closed connection");
449 goto error_free_data;
450 }
451 if (ret_len < 0) {
452 diag("Error receiving trace packet");
453 goto error_free_data;
454 }
455 free(data);
456 ret = len;
457
458 return ret;
459
460 error_free_data:
461 free(data);
462 error:
463 return -1;
464 }
465
466 static
467 int get_next_index(void)
468 {
469 struct lttng_viewer_cmd cmd;
470 struct lttng_viewer_get_next_index rq;
471 struct lttng_viewer_index rp;
472 ssize_t ret_len;
473 int id;
474
475 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
476 cmd.data_size = htobe64(sizeof(rq));
477 cmd.cmd_version = htobe32(0);
478
479 for (id = 0; id < session->stream_count; id++) {
480 if (session->streams[id].metadata_flag) {
481 continue;
482 }
483 memset(&rq, 0, sizeof(rq));
484 rq.stream_id = htobe64(session->streams[id].id);
485
486 retry:
487 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
488 if (ret_len < 0) {
489 diag("Error sending cmd");
490 goto error;
491 }
492 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
493 if (ret_len < 0) {
494 diag("Error sending get_next_index request");
495 goto error;
496 }
497 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
498 if (ret_len == 0) {
499 diag("[error] Remote side has closed connection");
500 goto error;
501 }
502 if (ret_len < 0) {
503 diag("Error receiving index response");
504 goto error;
505 }
506
507 rp.flags = be32toh(rp.flags);
508
509 switch (be32toh(rp.status)) {
510 case LTTNG_VIEWER_INDEX_INACTIVE:
511 /* Skip this stream. */
512 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
513 continue;
514 case LTTNG_VIEWER_INDEX_OK:
515 break;
516 case LTTNG_VIEWER_INDEX_RETRY:
517 sleep(1);
518 goto retry;
519 case LTTNG_VIEWER_INDEX_HUP:
520 diag("Got LTTNG_VIEWER_INDEX_HUP");
521 session->streams[id].id = -1ULL;
522 session->streams[id].fd = -1;
523 goto error;
524 case LTTNG_VIEWER_INDEX_ERR:
525 diag("Got LTTNG_VIEWER_INDEX_ERR");
526 goto error;
527 default:
528 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp.status));
529 goto error;
530 }
531 if (first_packet_stream_id < 0) {
532 /*
533 * Initialize the first packet stream id. That is,
534 * the first active stream encoutered.
535 */
536 first_packet_offset = be64toh(rp.offset);
537 first_packet_len = be64toh(rp.packet_size) / CHAR_BIT;
538 first_packet_stream_id = id;
539 diag("Got first packet index with offset %d and len %d",
540 first_packet_offset, first_packet_len);
541 }
542 }
543 return 0;
544
545 error:
546 return -1;
547 }
548
549 static
550 int get_data_packet(int id, uint64_t offset,
551 uint64_t len)
552 {
553 struct lttng_viewer_cmd cmd;
554 struct lttng_viewer_get_packet rq;
555 struct lttng_viewer_trace_packet rp;
556 ssize_t ret_len;
557
558 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
559 cmd.data_size = htobe64(sizeof(rq));
560 cmd.cmd_version = htobe32(0);
561
562 memset(&rq, 0, sizeof(rq));
563 rq.stream_id = htobe64(session->streams[id].id);
564 /* Already in big endian. */
565 rq.offset = offset;
566 rq.len = htobe32(len);
567
568 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
569 if (ret_len < 0) {
570 diag("Error sending cmd");
571 goto error;
572 }
573 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
574 if (ret_len < 0) {
575 diag("Error sending get_data_packet request");
576 goto error;
577 }
578 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
579 if (ret_len == 0) {
580 diag("[error] Remote side has closed connection");
581 goto error;
582 }
583 if (ret_len < 0) {
584 diag("Error receiving data response");
585 goto error;
586 }
587 rp.flags = be32toh(rp.flags);
588
589 switch (be32toh(rp.status)) {
590 case LTTNG_VIEWER_GET_PACKET_OK:
591 len = be32toh(rp.len);
592 if (len == 0) {
593 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
594 goto error;
595 }
596 break;
597 case LTTNG_VIEWER_GET_PACKET_RETRY:
598 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
599 goto error;
600 case LTTNG_VIEWER_GET_PACKET_ERR:
601 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
602 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
603 goto end;
604 }
605 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
606 goto error;
607 default:
608 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
609 goto error;
610 }
611
612 if (len > mmap_size) {
613 diag("mmap_size not big enough");
614 goto error;
615 }
616
617 ret_len = lttng_live_recv(control_sock, session->streams[id].mmap_base, len);
618 if (ret_len == 0) {
619 diag("[error] Remote side has closed connection");
620 goto error;
621 }
622 if (ret_len < 0) {
623 diag("Error receiving trace packet");
624 goto error;
625 }
626 end:
627 return 0;
628 error:
629 return -1;
630 }
631
632 static
633 int detach_viewer_session(uint64_t id)
634 {
635 struct lttng_viewer_cmd cmd;
636 struct lttng_viewer_detach_session_response resp;
637 struct lttng_viewer_detach_session_request rq;
638 int ret;
639 ssize_t ret_len;
640
641 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
642 cmd.data_size = htobe64(sizeof(rq));
643 cmd.cmd_version = htobe32(0);
644
645 memset(&rq, 0, sizeof(rq));
646 rq.session_id = htobe64(id);
647
648 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
649 if (ret_len < 0) {
650 fprintf(stderr, "[error] Error sending cmd\n");
651 ret = ret_len;
652 goto error;
653 }
654
655 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
656 if (ret_len < 0) {
657 fprintf(stderr, "Error sending attach request\n");
658 ret = ret_len;
659 goto error;
660 }
661
662 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
663 if (ret_len < 0) {
664 fprintf(stderr, "[error] Error receiving detach session reply\n");
665 ret = ret_len;
666 goto error;
667 }
668
669 if (be32toh(resp.status) != LTTNG_VIEWER_DETACH_SESSION_OK) {
670 fprintf(stderr, "[error] Error detaching viewer session\n");
671 ret = -1;
672 goto error;
673 }
674 ret = 0;
675
676 error:
677 return ret;
678 }
679
680 int main(void)
681 {
682 int ret;
683 uint64_t session_id;
684
685 plan_tests(NUM_TESTS);
686
687 diag("Live unit tests");
688
689 ret = connect_viewer("localhost");
690 ok(ret == 0, "Connect viewer to relayd");
691
692 ret = establish_connection();
693 ok(ret == 0, "Established connection and version check with %d.%d",
694 VERSION_MAJOR, VERSION_MINOR);
695
696 ret = list_sessions(&session_id);
697 ok(ret > 0, "List sessions : %d session(s)", ret);
698 if (ret < 0) {
699 goto end;
700 }
701
702 ret = create_viewer_session();
703 ok(ret == 0, "Create viewer session");
704
705 ret = attach_session(session_id);
706 ok(ret > 0, "Attach to session, %d stream(s) received", ret);
707
708 ret = get_metadata();
709 ok(ret > 0, "Get metadata, received %d bytes", ret);
710
711 ret = get_next_index();
712 ok(ret == 0, "Get one index per stream");
713
714 ret = get_data_packet(first_packet_stream_id, first_packet_offset,
715 first_packet_len);
716 ok(ret == 0,
717 "Get one data packet for stream %d, offset %d, len %d",
718 first_packet_stream_id, first_packet_offset,
719 first_packet_len);
720
721 ret = detach_viewer_session(session_id);
722 ok(ret == 0, "Detach viewer session");
723
724 ret = list_sessions(&session_id);
725 ok(ret > 0, "List sessions : %d session(s)", ret);
726
727 ret = attach_session(session_id);
728 ok(ret > 0, "Attach to session, %d streams received", ret);
729 end:
730 return exit_status();
731 }
This page took 0.043262 seconds and 4 git commands to generate.