Fix: Tests with racy event validation
[lttng-tools.git] / tests / regression / tools / live / live_test.c
... / ...
CommitLineData
1/*
2 * Copyright (c) - 2013 Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by as
6 * published by the Free Software Foundation; only version 2 of the License.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <assert.h>
20#include <errno.h>
21#include <stdio.h>
22#include <stdlib.h>
23#include <string.h>
24#include <unistd.h>
25#include <time.h>
26#include <sys/types.h>
27#include <inttypes.h>
28#include <stdlib.h>
29#include <sys/socket.h>
30#include <netinet/in.h>
31#include <netdb.h>
32#include <fcntl.h>
33#include <sys/mman.h>
34#include <sys/stat.h>
35
36#include <tap/tap.h>
37#include <lttng/lttng.h>
38
39#include <urcu/list.h>
40#include <bin/lttng-sessiond/session.h>
41#include <common/common.h>
42
43#include <bin/lttng-relayd/lttng-viewer-abi.h>
44#include <common/index/ctf-index.h>
45
46#include <common/compat/endian.h>
47
48#define SESSION1 "test1"
49#define RELAYD_URL "net://localhost"
50#define LIVE_TIMER 2000000
51
52/* Number of TAP tests in this file */
53#define NUM_TESTS 8
54#define mmap_size 524288
55
56int ust_consumerd32_fd;
57int ust_consumerd64_fd;
58
59static int control_sock;
60struct live_session *session;
61
62static int first_packet_offset;
63static int first_packet_len;
64static int first_packet_stream_id;
65
66struct viewer_stream {
67 uint64_t id;
68 uint64_t ctf_trace_id;
69 void *mmap_base;
70 int fd;
71 int metadata_flag;
72 int first_read;
73 char path[PATH_MAX];
74};
75
76struct live_session {
77 struct viewer_stream *streams;
78 uint64_t live_timer_interval;
79 uint64_t stream_count;
80};
81
82static
83int connect_viewer(char *hostname)
84{
85 struct hostent *host;
86 struct sockaddr_in server_addr;
87 int ret;
88
89 host = gethostbyname(hostname);
90 if (!host) {
91 ret = -1;
92 goto end;
93 }
94
95 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
96 perror("Socket");
97 ret = -1;
98 goto end;
99 }
100
101 server_addr.sin_family = AF_INET;
102 server_addr.sin_port = htons(5344);
103 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
104 bzero(&(server_addr.sin_zero), 8);
105
106 if (connect(control_sock, (struct sockaddr *) &server_addr,
107 sizeof(struct sockaddr)) == -1) {
108 perror("Connect");
109 ret = -1;
110 goto end;
111 }
112
113 server_addr.sin_family = AF_INET;
114 server_addr.sin_port = htons(5345);
115 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
116 bzero(&(server_addr.sin_zero), 8);
117
118 ret = 0;
119
120end:
121 return ret;
122}
123
124int establish_connection(void)
125{
126 struct lttng_viewer_cmd cmd;
127 struct lttng_viewer_connect connect;
128 int ret;
129
130 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
131 cmd.data_size = sizeof(connect);
132 cmd.cmd_version = 0;
133
134 memset(&connect, 0, sizeof(connect));
135 connect.major = htobe32(VERSION_MAJOR);
136 connect.minor = htobe32(VERSION_MINOR);
137 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
138
139 do {
140 ret = send(control_sock, &cmd, sizeof(cmd), 0);
141 } while (ret < 0 && errno == EINTR);
142 if (ret < 0) {
143 fprintf(stderr, "Error sending cmd\n");
144 goto error;
145 }
146 do {
147 ret = send(control_sock, &connect, sizeof(connect), 0);
148 } while (ret < 0 && errno == EINTR);
149 if (ret < 0) {
150 fprintf(stderr, "Error sending version\n");
151 goto error;
152 }
153
154 do {
155 ret = recv(control_sock, &connect, sizeof(connect), 0);
156 } while (ret < 0 && errno == EINTR);
157 if (ret < 0) {
158 fprintf(stderr, "Error receiving version\n");
159 goto error;
160 }
161 ret = 0;
162
163error:
164 return ret;
165}
166
167/*
168 * Returns the number of sessions, should be 1 during the unit test.
169 */
170int list_sessions(int *session_id)
171{
172 struct lttng_viewer_cmd cmd;
173 struct lttng_viewer_list_sessions list;
174 struct lttng_viewer_session lsession;
175 int i, ret;
176 int first_session = 0;
177
178 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
179 cmd.data_size = 0;
180 cmd.cmd_version = 0;
181
182 do {
183 ret = send(control_sock, &cmd, sizeof(cmd), 0);
184 } while (ret < 0 && errno == EINTR);
185 if (ret < 0) {
186 fprintf(stderr, "Error sending cmd\n");
187 goto error;
188 }
189
190 do {
191 ret = recv(control_sock, &list, sizeof(list), 0);
192 } while (ret < 0 && errno == EINTR);
193 if (ret < 0) {
194 fprintf(stderr, "Error receiving session list\n");
195 goto error;
196 }
197
198 for (i = 0; i < be32toh(list.sessions_count); i++) {
199 do {
200 ret = recv(control_sock, &lsession, sizeof(lsession), 0);
201 } while (ret < 0 && errno == EINTR);
202 if (ret < 0) {
203 fprintf(stderr, "Error receiving session\n");
204 goto error;
205 }
206 if (lsession.streams > 0 && first_session <= 0) {
207 first_session = be64toh(lsession.id);
208 *session_id = first_session;
209 }
210 }
211
212 ret = be32toh(list.sessions_count);
213
214error:
215 return ret;
216}
217
218int create_viewer_session()
219{
220 struct lttng_viewer_cmd cmd;
221 struct lttng_viewer_create_session_response resp;
222 int ret;
223 ssize_t ret_len;
224
225 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
226 cmd.data_size = 0;
227 cmd.cmd_version = 0;
228
229 do {
230 ret_len = send(control_sock, &cmd, sizeof(cmd), 0);
231 } while (ret_len < 0 && errno == EINTR);
232 if (ret_len < 0) {
233 fprintf(stderr, "[error] Error sending cmd\n");
234 ret = ret_len;
235 goto error;
236 }
237 assert(ret_len == sizeof(cmd));
238
239 do {
240 ret_len = recv(control_sock, &resp, sizeof(resp), 0);
241 } while (ret_len < 0 && errno == EINTR);
242 if (ret_len < 0) {
243 fprintf(stderr, "[error] Error receiving create session reply\n");
244 ret = ret_len;
245 goto error;
246 }
247 assert(ret_len == sizeof(resp));
248
249 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
250 fprintf(stderr, "[error] Error creating viewer session\n");
251 ret = -1;
252 goto error;
253 }
254 ret = 0;
255
256error:
257 return ret;
258}
259
260int attach_session(int id)
261{
262 struct lttng_viewer_cmd cmd;
263 struct lttng_viewer_attach_session_request rq;
264 struct lttng_viewer_attach_session_response rp;
265 struct lttng_viewer_stream stream;
266 int ret, i;
267
268 session = zmalloc(sizeof(struct live_session));
269 if (!session) {
270 ret = -1;
271 goto error;
272 }
273
274 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
275 cmd.data_size = sizeof(rq);
276 cmd.cmd_version = 0;
277
278 memset(&rq, 0, sizeof(rq));
279 rq.session_id = htobe64(id);
280 rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
281
282 do {
283 ret = send(control_sock, &cmd, sizeof(cmd), 0);
284 } while (ret < 0 && errno == EINTR);
285 if (ret < 0) {
286 fprintf(stderr, "Error sending cmd\n");
287 goto error;
288 }
289 do {
290 ret = send(control_sock, &rq, sizeof(rq), 0);
291 } while (ret < 0 && errno == EINTR);
292 if (ret < 0) {
293 fprintf(stderr, "Error sending attach request\n");
294 goto error;
295 }
296
297 do {
298 ret = recv(control_sock, &rp, sizeof(rp), 0);
299 } while (ret < 0 && errno == EINTR);
300 if (ret < 0) {
301 fprintf(stderr, "Error receiving attach response\n");
302 goto error;
303 }
304 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
305 ret = -1;
306 goto end;
307 }
308
309 session->stream_count = be32toh(rp.streams_count);
310 session->streams = zmalloc(session->stream_count *
311 sizeof(struct viewer_stream));
312 if (!session->streams) {
313 ret = -1;
314 goto error;
315 }
316
317 for (i = 0; i < be32toh(rp.streams_count); i++) {
318 do {
319 ret = recv(control_sock, &stream, sizeof(stream), 0);
320 } while (ret < 0 && errno == EINTR);
321 if (ret < 0) {
322 fprintf(stderr, "Error receiving stream\n");
323 goto error;
324 }
325 session->streams[i].id = be64toh(stream.id);
326
327 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
328 session->streams[i].first_read = 1;
329 session->streams[i].mmap_base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
330 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
331 if (session->streams[i].mmap_base == MAP_FAILED) {
332 fprintf(stderr, "mmap error\n");
333 ret = -1;
334 goto error;
335 }
336
337 if (be32toh(stream.metadata_flag)) {
338 session->streams[i].metadata_flag = 1;
339 }
340 }
341 ret = session->stream_count;
342
343end:
344error:
345 return ret;
346}
347
348int get_metadata(void)
349{
350 struct lttng_viewer_cmd cmd;
351 struct lttng_viewer_get_metadata rq;
352 struct lttng_viewer_metadata_packet rp;
353 int ret;
354 uint64_t i;
355 char *data = NULL;
356 uint64_t len = 0;
357 int metadata_stream_id = -1;
358
359 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
360 cmd.data_size = sizeof(rq);
361 cmd.cmd_version = 0;
362
363 for (i = 0; i < session->stream_count; i++) {
364 if (session->streams[i].metadata_flag) {
365 metadata_stream_id = i;
366 break;
367 }
368 }
369
370 if (metadata_stream_id < 0) {
371 fprintf(stderr, "No metadata stream found\n");
372 ret = -1;
373 goto error;
374 }
375
376 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
377
378 do {
379 ret = send(control_sock, &cmd, sizeof(cmd), 0);
380 } while (ret < 0 && errno == EINTR);
381 if (ret < 0) {
382 fprintf(stderr, "Error sending cmd\n");
383 goto error;
384 }
385 do {
386 ret = send(control_sock, &rq, sizeof(rq), 0);
387 } while (ret < 0 && errno == EINTR);
388 if (ret < 0) {
389 fprintf(stderr, "Error sending get_metadata request\n");
390 goto error;
391 }
392 do {
393 ret = recv(control_sock, &rp, sizeof(rp), 0);
394 } while (ret < 0 && errno == EINTR);
395 if (ret < 0) {
396 fprintf(stderr, "Error receiving metadata response\n");
397 goto error;
398 }
399 switch (be32toh(rp.status)) {
400 case LTTNG_VIEWER_METADATA_OK:
401 break;
402 case LTTNG_VIEWER_NO_NEW_METADATA:
403 fprintf(stderr, "NO NEW\n");
404 ret = -1;
405 goto end;
406 case LTTNG_VIEWER_METADATA_ERR:
407 fprintf(stderr, "ERR\n");
408 ret = -1;
409 goto end;
410 default:
411 fprintf(stderr, "UNKNOWN\n");
412 ret = -1;
413 goto end;
414 }
415
416 len = be64toh(rp.len);
417 if (len <= 0) {
418 goto end;
419 }
420
421 data = zmalloc(len);
422 if (!data) {
423 perror("relay data zmalloc");
424 goto error;
425 }
426 do {
427 ret = recv(control_sock, data, len, MSG_WAITALL);
428 } while (ret < 0 && errno == EINTR);
429 if (ret < 0) {
430 fprintf(stderr, "Error receiving trace packet\n");
431 free(data);
432 goto error;
433 }
434 free(data);
435
436 ret = (int) len;
437end:
438error:
439 return ret;
440}
441
442int get_next_index(void)
443{
444 struct lttng_viewer_cmd cmd;
445 struct lttng_viewer_get_next_index rq;
446 struct lttng_viewer_index rp;
447 int ret;
448 int id;
449
450 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
451 cmd.data_size = sizeof(rq);
452 cmd.cmd_version = 0;
453
454 for (id = 0; id < session->stream_count; id++) {
455 if (session->streams[id].metadata_flag) {
456 continue;
457 }
458 rq.stream_id = htobe64(session->streams[id].id);
459
460retry:
461 do {
462 ret = send(control_sock, &cmd, sizeof(cmd), 0);
463 } while (ret < 0 && errno == EINTR);
464 if (ret < 0) {
465 fprintf(stderr, "Error sending cmd\n");
466 goto error;
467 }
468 do {
469 ret = send(control_sock, &rq, sizeof(rq), 0);
470 } while (ret < 0 && errno == EINTR);
471 if (ret < 0) {
472 fprintf(stderr, "Error sending get_next_index request\n");
473 goto error;
474 }
475 do {
476 ret = recv(control_sock, &rp, sizeof(rp), 0);
477 } while (ret < 0 && errno == EINTR);
478 if (ret < 0) {
479 fprintf(stderr, "Error receiving index response\n");
480 goto error;
481 }
482
483 rp.flags = be32toh(rp.flags);
484
485 switch (be32toh(rp.status)) {
486 case LTTNG_VIEWER_INDEX_INACTIVE:
487 fprintf(stderr, "(INACTIVE)\n");
488 break;
489 case LTTNG_VIEWER_INDEX_OK:
490 break;
491 case LTTNG_VIEWER_INDEX_RETRY:
492 sleep(1);
493 goto retry;
494 case LTTNG_VIEWER_INDEX_HUP:
495 fprintf(stderr, "(HUP)\n");
496 session->streams[id].id = -1ULL;
497 session->streams[id].fd = -1;
498 break;
499 case LTTNG_VIEWER_INDEX_ERR:
500 fprintf(stderr, "(ERR)\n");
501 ret = -1;
502 goto error;
503 default:
504 fprintf(stderr, "SHOULD NOT HAPPEN\n");
505 ret = -1;
506 goto error;
507 }
508 if (!first_packet_stream_id) {
509 first_packet_offset = be64toh(rp.offset);
510 first_packet_len = be64toh(rp.packet_size) / CHAR_BIT;
511 first_packet_stream_id = id;
512 }
513 }
514 ret = 0;
515
516error:
517 return ret;
518}
519
520static
521int get_data_packet(int id, uint64_t offset,
522 uint64_t len)
523{
524 struct lttng_viewer_cmd cmd;
525 struct lttng_viewer_get_packet rq;
526 struct lttng_viewer_trace_packet rp;
527 int ret;
528
529 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
530 cmd.data_size = sizeof(rq);
531 cmd.cmd_version = 0;
532
533 rq.stream_id = htobe64(session->streams[id].id);
534 /* Already in big endian. */
535 rq.offset = offset;
536 rq.len = htobe32(len);
537
538 do {
539 ret = send(control_sock, &cmd, sizeof(cmd), 0);
540 } while (ret < 0 && errno == EINTR);
541 if (ret < 0) {
542 fprintf(stderr, "Error sending cmd\n");
543 goto error;
544 }
545 do {
546 ret = send(control_sock, &rq, sizeof(rq), 0);
547 } while (ret < 0 && errno == EINTR);
548 if (ret < 0) {
549 fprintf(stderr, "Error sending get_data_packet request\n");
550 goto error;
551 }
552 do {
553 ret = recv(control_sock, &rp, sizeof(rp), 0);
554 } while (ret < 0 && errno == EINTR);
555 if (ret < 0) {
556 fprintf(stderr, "Error receiving data response\n");
557 goto error;
558 }
559 rp.flags = be32toh(rp.flags);
560
561 switch (be32toh(rp.status)) {
562 case LTTNG_VIEWER_GET_PACKET_OK:
563 break;
564 case LTTNG_VIEWER_GET_PACKET_RETRY:
565 fprintf(stderr, "RETRY\n");
566 ret = -1;
567 goto end;
568 case LTTNG_VIEWER_GET_PACKET_ERR:
569 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
570 fprintf(stderr, "NEW_METADATA\n");
571 ret = 0;
572 goto end;
573 }
574 fprintf(stderr, "ERR\n");
575 ret = -1;
576 goto end;
577 default:
578 fprintf(stderr, "UNKNOWN\n");
579 ret = -1;
580 goto end;
581 }
582
583 len = be32toh(rp.len);
584 if (len <= 0) {
585 goto end;
586 }
587
588 if (len > mmap_size) {
589 fprintf(stderr, "mmap_size not big enough\n");
590 ret = -1;
591 goto error;
592 }
593
594 do {
595 ret = recv(control_sock, session->streams[id].mmap_base, len, MSG_WAITALL);
596 } while (ret < 0 && errno == EINTR);
597 if (ret < 0) {
598 fprintf(stderr, "Error receiving trace packet\n");
599 goto error;
600 }
601 ret = len;
602
603end:
604error:
605 return ret;
606}
607
608int main(int argc, char **argv)
609{
610 int ret;
611 int session_id;
612
613 plan_tests(NUM_TESTS);
614
615 diag("Live unit tests");
616
617 ret = connect_viewer("localhost");
618 ok(ret == 0, "Connect viewer to relayd");
619
620 ret = establish_connection();
621 ok(ret == 0, "Established connection and version check with %d.%d",
622 VERSION_MAJOR, VERSION_MINOR);
623
624 ret = list_sessions(&session_id);
625 ok(ret > 0, "List sessions : %d session(s)", ret);
626
627 ret = create_viewer_session();
628 ok(ret == 0, "Create viewer session");
629
630 ret = attach_session(session_id);
631 ok(ret > 0, "Attach to session, %d streams received", ret);
632
633 ret = get_metadata();
634 ok(ret > 0, "Get metadata, received %d bytes", ret);
635
636 ret = get_next_index();
637 ok(ret == 0, "Get one index per stream");
638
639 ret = get_data_packet(first_packet_stream_id, first_packet_offset,
640 first_packet_len);
641 ok(ret == first_packet_len,
642 "Get one data packet for stream %d, offset %d, len %d",
643 first_packet_stream_id, first_packet_offset,
644 first_packet_len);
645
646 return exit_status();
647}
This page took 0.026729 seconds and 4 git commands to generate.