d5da03e4d7d4417ed7bdfee5e996430cc4204d85
[lttng-tools.git] / tests / regression / tools / live / live_test.c
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include <assert.h>
9 #include <errno.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <unistd.h>
14 #include <common/compat/time.h>
15 #include <sys/types.h>
16 #include <inttypes.h>
17 #include <stdlib.h>
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <netdb.h>
21 #include <fcntl.h>
22 #include <sys/mman.h>
23 #include <sys/stat.h>
24
25 #include <tap/tap.h>
26 #include <lttng/lttng.h>
27
28 #include <urcu/list.h>
29 #include <common/common.h>
30
31 #include <bin/lttng-relayd/lttng-viewer-abi.h>
32 #include <common/index/ctf-index.h>
33
34 #include <common/compat/endian.h>
35
36 #define SESSION1 "test1"
37 #define RELAYD_URL "net://localhost"
38 #define LIVE_TIMER 2000000
39
40 /* Number of TAP tests in this file */
41 #define NUM_TESTS 11
42 #define mmap_size 524288
43
44 static int control_sock;
45 struct live_session *session;
46
47 static int first_packet_offset;
48 static int first_packet_len;
49 static int first_packet_stream_id = -1;
50
51 struct viewer_stream {
52 uint64_t id;
53 uint64_t ctf_trace_id;
54 void *mmap_base;
55 int fd;
56 int metadata_flag;
57 int first_read;
58 char path[PATH_MAX];
59 };
60
61 struct live_session {
62 struct viewer_stream *streams;
63 uint64_t live_timer_interval;
64 uint64_t stream_count;
65 };
66
67 static
68 ssize_t lttng_live_recv(int fd, void *buf, size_t len)
69 {
70 ssize_t ret;
71 size_t copied = 0, to_copy = len;
72
73 do {
74 ret = recv(fd, buf + copied, to_copy, 0);
75 if (ret > 0) {
76 assert(ret <= to_copy);
77 copied += ret;
78 to_copy -= ret;
79 }
80 } while ((ret > 0 && to_copy > 0)
81 || (ret < 0 && errno == EINTR));
82 if (ret > 0)
83 ret = copied;
84 /* ret = 0 means orderly shutdown, ret < 0 is error. */
85 return ret;
86 }
87
88 static
89 ssize_t lttng_live_send(int fd, const void *buf, size_t len)
90 {
91 ssize_t ret;
92
93 do {
94 ret = send(fd, buf, len, MSG_NOSIGNAL);
95 } while (ret < 0 && errno == EINTR);
96 return ret;
97 }
98
99 static
100 int connect_viewer(char *hostname)
101 {
102 struct hostent *host;
103 struct sockaddr_in server_addr;
104 int ret;
105
106 host = gethostbyname(hostname);
107 if (!host) {
108 ret = -1;
109 goto end;
110 }
111
112 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
113 PERROR("Socket");
114 ret = -1;
115 goto end;
116 }
117
118 server_addr.sin_family = AF_INET;
119 server_addr.sin_port = htons(5344);
120 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
121 bzero(&(server_addr.sin_zero), 8);
122
123 if (connect(control_sock, (struct sockaddr *) &server_addr,
124 sizeof(struct sockaddr)) == -1) {
125 PERROR("Connect");
126 ret = -1;
127 goto end;
128 }
129
130 server_addr.sin_family = AF_INET;
131 server_addr.sin_port = htons(5345);
132 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
133 bzero(&(server_addr.sin_zero), 8);
134
135 ret = 0;
136
137 end:
138 return ret;
139 }
140
141 static
142 int establish_connection(void)
143 {
144 struct lttng_viewer_cmd cmd;
145 struct lttng_viewer_connect connect;
146 ssize_t ret_len;
147
148 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
149 cmd.data_size = htobe64(sizeof(connect));
150 cmd.cmd_version = htobe32(0);
151
152 memset(&connect, 0, sizeof(connect));
153 connect.major = htobe32(VERSION_MAJOR);
154 connect.minor = htobe32(VERSION_MINOR);
155 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
156
157 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
158 if (ret_len < 0) {
159 diag("Error sending cmd");
160 goto error;
161 }
162 ret_len = lttng_live_send(control_sock, &connect, sizeof(connect));
163 if (ret_len < 0) {
164 diag("Error sending version");
165 goto error;
166 }
167
168 ret_len = lttng_live_recv(control_sock, &connect, sizeof(connect));
169 if (ret_len == 0) {
170 diag("[error] Remote side has closed connection");
171 goto error;
172 }
173 if (ret_len < 0) {
174 diag("Error receiving version");
175 goto error;
176 }
177 return 0;
178
179 error:
180 return -1;
181 }
182
183 /*
184 * Returns the number of sessions, should be 1 during the unit test.
185 */
186 static
187 int list_sessions(uint64_t *session_id)
188 {
189 struct lttng_viewer_cmd cmd;
190 struct lttng_viewer_list_sessions list;
191 struct lttng_viewer_session lsession;
192 int i;
193 ssize_t ret_len;
194 int first_session = 0;
195
196 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
197 cmd.data_size = htobe64(0);
198 cmd.cmd_version = htobe32(0);
199
200 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
201 if (ret_len < 0) {
202 diag("Error sending cmd");
203 goto error;
204 }
205
206 ret_len = lttng_live_recv(control_sock, &list, sizeof(list));
207 if (ret_len == 0) {
208 diag("[error] Remote side has closed connection");
209 goto error;
210 }
211 if (ret_len < 0) {
212 diag("Error receiving session list");
213 goto error;
214 }
215
216 for (i = 0; i < be32toh(list.sessions_count); i++) {
217 ret_len = lttng_live_recv(control_sock, &lsession, sizeof(lsession));
218 if (ret_len < 0) {
219 diag("Error receiving session");
220 goto error;
221 }
222 if (lsession.streams > 0 && first_session <= 0) {
223 first_session = be64toh(lsession.id);
224 *session_id = first_session;
225 }
226 }
227
228 return be32toh(list.sessions_count);
229
230 error:
231 return -1;
232 }
233
234 static
235 int create_viewer_session(void)
236 {
237 struct lttng_viewer_cmd cmd;
238 struct lttng_viewer_create_session_response resp;
239 ssize_t ret_len;
240
241 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
242 cmd.data_size = htobe64(0);
243 cmd.cmd_version = htobe32(0);
244
245 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
246 if (ret_len < 0) {
247 diag("[error] Error sending cmd");
248 goto error;
249 }
250 assert(ret_len == sizeof(cmd));
251
252 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
253 if (ret_len == 0) {
254 diag("[error] Remote side has closed connection");
255 goto error;
256 }
257 if (ret_len < 0) {
258 diag("[error] Error receiving create session reply");
259 goto error;
260 }
261 assert(ret_len == sizeof(resp));
262
263 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
264 diag("[error] Error creating viewer session");
265 goto error;
266 }
267 return 0;
268
269 error:
270 return -1;
271 }
272
273 static
274 int attach_session(uint64_t id)
275 {
276 struct lttng_viewer_cmd cmd;
277 struct lttng_viewer_attach_session_request rq;
278 struct lttng_viewer_attach_session_response rp;
279 struct lttng_viewer_stream stream;
280 int i;
281 ssize_t ret_len;
282
283 session = zmalloc(sizeof(struct live_session));
284 if (!session) {
285 goto error;
286 }
287
288 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
289 cmd.data_size = htobe64(sizeof(rq));
290 cmd.cmd_version = htobe32(0);
291
292 memset(&rq, 0, sizeof(rq));
293 rq.session_id = htobe64(id);
294 rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
295
296 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
297 if (ret_len < 0) {
298 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
299 goto error;
300 }
301 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
302 if (ret_len < 0) {
303 diag("Error sending attach request");
304 goto error;
305 }
306
307 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
308 if (ret_len == 0) {
309 diag("[error] Remote side has closed connection");
310 goto error;
311 }
312 if (ret_len < 0) {
313 diag("Error receiving attach response");
314 goto error;
315 }
316 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
317 goto error;
318 }
319
320 session->stream_count = be32toh(rp.streams_count);
321 if (session->stream_count == 0) {
322 diag("Got session stream count == 0");
323 goto error;
324 }
325 session->streams = zmalloc(session->stream_count *
326 sizeof(struct viewer_stream));
327 if (!session->streams) {
328 goto error;
329 }
330
331 for (i = 0; i < be32toh(rp.streams_count); i++) {
332 ret_len = lttng_live_recv(control_sock, &stream, sizeof(stream));
333 if (ret_len == 0) {
334 diag("[error] Remote side has closed connection");
335 goto error;
336 }
337 if (ret_len < 0) {
338 diag("Error receiving stream");
339 goto error;
340 }
341 session->streams[i].id = be64toh(stream.id);
342
343 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
344 session->streams[i].first_read = 1;
345 session->streams[i].mmap_base = mmap(NULL, mmap_size,
346 PROT_READ | PROT_WRITE,
347 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
348 if (session->streams[i].mmap_base == MAP_FAILED) {
349 diag("mmap error");
350 goto error;
351 }
352
353 if (be32toh(stream.metadata_flag)) {
354 session->streams[i].metadata_flag = 1;
355 }
356 }
357 return session->stream_count;
358
359 error:
360 return -1;
361 }
362
363 static
364 int get_metadata(void)
365 {
366 struct lttng_viewer_cmd cmd;
367 struct lttng_viewer_get_metadata rq;
368 struct lttng_viewer_metadata_packet rp;
369 ssize_t ret_len;
370 int ret;
371 uint64_t i;
372 char *data = NULL;
373 uint64_t len = 0;
374 int metadata_stream_id = -1;
375
376 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
377 cmd.data_size = htobe64(sizeof(rq));
378 cmd.cmd_version = htobe32(0);
379
380 for (i = 0; i < session->stream_count; i++) {
381 if (session->streams[i].metadata_flag) {
382 metadata_stream_id = i;
383 break;
384 }
385 }
386
387 if (metadata_stream_id < 0) {
388 diag("No metadata stream found");
389 goto error;
390 }
391
392 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
393
394 retry:
395 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
396 if (ret_len < 0) {
397 diag("Error sending cmd");
398 goto error;
399 }
400 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
401 if (ret_len < 0) {
402 diag("Error sending get_metadata request");
403 goto error;
404 }
405 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
406 if (ret_len == 0) {
407 diag("[error] Remote side has closed connection");
408 goto error;
409 }
410 if (ret_len < 0) {
411 diag("Error receiving metadata response");
412 goto error;
413 }
414 switch (be32toh(rp.status)) {
415 case LTTNG_VIEWER_METADATA_OK:
416 break;
417 case LTTNG_VIEWER_NO_NEW_METADATA:
418 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
419 usleep(50);
420 goto retry;
421 case LTTNG_VIEWER_METADATA_ERR:
422 diag("Got LTTNG_VIEWER_METADATA_ERR:");
423 goto error;
424 default:
425 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
426 goto error;
427 }
428
429 len = be64toh(rp.len);
430 if (len <= 0) {
431 goto error;
432 }
433
434 data = zmalloc(len);
435 if (!data) {
436 PERROR("relay data zmalloc");
437 goto error;
438 }
439 ret_len = lttng_live_recv(control_sock, data, len);
440 if (ret_len == 0) {
441 diag("[error] Remote side has closed connection");
442 goto error_free_data;
443 }
444 if (ret_len < 0) {
445 diag("Error receiving trace packet");
446 goto error_free_data;
447 }
448 free(data);
449 ret = len;
450
451 return ret;
452
453 error_free_data:
454 free(data);
455 error:
456 return -1;
457 }
458
459 static
460 int get_next_index(void)
461 {
462 struct lttng_viewer_cmd cmd;
463 struct lttng_viewer_get_next_index rq;
464 struct lttng_viewer_index rp;
465 ssize_t ret_len;
466 int id;
467
468 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
469 cmd.data_size = htobe64(sizeof(rq));
470 cmd.cmd_version = htobe32(0);
471
472 for (id = 0; id < session->stream_count; id++) {
473 if (session->streams[id].metadata_flag) {
474 continue;
475 }
476 memset(&rq, 0, sizeof(rq));
477 rq.stream_id = htobe64(session->streams[id].id);
478
479 retry:
480 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
481 if (ret_len < 0) {
482 diag("Error sending cmd");
483 goto error;
484 }
485 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
486 if (ret_len < 0) {
487 diag("Error sending get_next_index request");
488 goto error;
489 }
490 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
491 if (ret_len == 0) {
492 diag("[error] Remote side has closed connection");
493 goto error;
494 }
495 if (ret_len < 0) {
496 diag("Error receiving index response");
497 goto error;
498 }
499
500 rp.flags = be32toh(rp.flags);
501
502 switch (be32toh(rp.status)) {
503 case LTTNG_VIEWER_INDEX_INACTIVE:
504 /* Skip this stream. */
505 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
506 continue;
507 case LTTNG_VIEWER_INDEX_OK:
508 break;
509 case LTTNG_VIEWER_INDEX_RETRY:
510 sleep(1);
511 goto retry;
512 case LTTNG_VIEWER_INDEX_HUP:
513 diag("Got LTTNG_VIEWER_INDEX_HUP");
514 session->streams[id].id = -1ULL;
515 session->streams[id].fd = -1;
516 goto error;
517 case LTTNG_VIEWER_INDEX_ERR:
518 diag("Got LTTNG_VIEWER_INDEX_ERR");
519 goto error;
520 default:
521 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp.status));
522 goto error;
523 }
524 if (first_packet_stream_id < 0) {
525 /*
526 * Initialize the first packet stream id. That is,
527 * the first active stream encoutered.
528 */
529 first_packet_offset = be64toh(rp.offset);
530 first_packet_len = be64toh(rp.packet_size) / CHAR_BIT;
531 first_packet_stream_id = id;
532 diag("Got first packet index with offset %d and len %d",
533 first_packet_offset, first_packet_len);
534 }
535 }
536 return 0;
537
538 error:
539 return -1;
540 }
541
542 static
543 int get_data_packet(int id, uint64_t offset,
544 uint64_t len)
545 {
546 struct lttng_viewer_cmd cmd;
547 struct lttng_viewer_get_packet rq;
548 struct lttng_viewer_trace_packet rp;
549 ssize_t ret_len;
550
551 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
552 cmd.data_size = htobe64(sizeof(rq));
553 cmd.cmd_version = htobe32(0);
554
555 memset(&rq, 0, sizeof(rq));
556 rq.stream_id = htobe64(session->streams[id].id);
557 /* Already in big endian. */
558 rq.offset = offset;
559 rq.len = htobe32(len);
560
561 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
562 if (ret_len < 0) {
563 diag("Error sending cmd");
564 goto error;
565 }
566 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
567 if (ret_len < 0) {
568 diag("Error sending get_data_packet request");
569 goto error;
570 }
571 ret_len = lttng_live_recv(control_sock, &rp, sizeof(rp));
572 if (ret_len == 0) {
573 diag("[error] Remote side has closed connection");
574 goto error;
575 }
576 if (ret_len < 0) {
577 diag("Error receiving data response");
578 goto error;
579 }
580 rp.flags = be32toh(rp.flags);
581
582 switch (be32toh(rp.status)) {
583 case LTTNG_VIEWER_GET_PACKET_OK:
584 len = be32toh(rp.len);
585 if (len == 0) {
586 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
587 goto error;
588 }
589 break;
590 case LTTNG_VIEWER_GET_PACKET_RETRY:
591 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
592 goto error;
593 case LTTNG_VIEWER_GET_PACKET_ERR:
594 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
595 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
596 goto end;
597 }
598 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
599 goto error;
600 default:
601 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
602 goto error;
603 }
604
605 if (len > mmap_size) {
606 diag("mmap_size not big enough");
607 goto error;
608 }
609
610 ret_len = lttng_live_recv(control_sock, session->streams[id].mmap_base, len);
611 if (ret_len == 0) {
612 diag("[error] Remote side has closed connection");
613 goto error;
614 }
615 if (ret_len < 0) {
616 diag("Error receiving trace packet");
617 goto error;
618 }
619 end:
620 return 0;
621 error:
622 return -1;
623 }
624
625 static
626 int detach_viewer_session(uint64_t id)
627 {
628 struct lttng_viewer_cmd cmd;
629 struct lttng_viewer_detach_session_response resp;
630 struct lttng_viewer_detach_session_request rq;
631 int ret;
632 ssize_t ret_len;
633
634 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
635 cmd.data_size = htobe64(sizeof(rq));
636 cmd.cmd_version = htobe32(0);
637
638 memset(&rq, 0, sizeof(rq));
639 rq.session_id = htobe64(id);
640
641 ret_len = lttng_live_send(control_sock, &cmd, sizeof(cmd));
642 if (ret_len < 0) {
643 fprintf(stderr, "[error] Error sending cmd\n");
644 ret = ret_len;
645 goto error;
646 }
647
648 ret_len = lttng_live_send(control_sock, &rq, sizeof(rq));
649 if (ret_len < 0) {
650 fprintf(stderr, "Error sending attach request\n");
651 ret = ret_len;
652 goto error;
653 }
654
655 ret_len = lttng_live_recv(control_sock, &resp, sizeof(resp));
656 if (ret_len < 0) {
657 fprintf(stderr, "[error] Error receiving detach session reply\n");
658 ret = ret_len;
659 goto error;
660 }
661
662 if (be32toh(resp.status) != LTTNG_VIEWER_DETACH_SESSION_OK) {
663 fprintf(stderr, "[error] Error detaching viewer session\n");
664 ret = -1;
665 goto error;
666 }
667 ret = 0;
668
669 error:
670 return ret;
671 }
672
673 int main(int argc, char **argv)
674 {
675 int ret;
676 uint64_t session_id;
677
678 plan_tests(NUM_TESTS);
679
680 diag("Live unit tests");
681
682 ret = connect_viewer("localhost");
683 ok(ret == 0, "Connect viewer to relayd");
684
685 ret = establish_connection();
686 ok(ret == 0, "Established connection and version check with %d.%d",
687 VERSION_MAJOR, VERSION_MINOR);
688
689 ret = list_sessions(&session_id);
690 ok(ret > 0, "List sessions : %d session(s)", ret);
691 if (ret < 0) {
692 goto end;
693 }
694
695 ret = create_viewer_session();
696 ok(ret == 0, "Create viewer session");
697
698 ret = attach_session(session_id);
699 ok(ret > 0, "Attach to session, %d stream(s) received", ret);
700
701 ret = get_metadata();
702 ok(ret > 0, "Get metadata, received %d bytes", ret);
703
704 ret = get_next_index();
705 ok(ret == 0, "Get one index per stream");
706
707 ret = get_data_packet(first_packet_stream_id, first_packet_offset,
708 first_packet_len);
709 ok(ret == 0,
710 "Get one data packet for stream %d, offset %d, len %d",
711 first_packet_stream_id, first_packet_offset,
712 first_packet_len);
713
714 ret = detach_viewer_session(session_id);
715 ok(ret == 0, "Detach viewer session");
716
717 ret = list_sessions(&session_id);
718 ok(ret > 0, "List sessions : %d session(s)", ret);
719
720 ret = attach_session(session_id);
721 ok(ret > 0, "Attach to session, %d streams received", ret);
722 end:
723 return exit_status();
724 }
This page took 0.043096 seconds and 3 git commands to generate.