Live unit test
[lttng-tools.git] / tests / regression / tools / live / live_test.c
1 /*
2 * Copyright (c) - 2013 Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by as
6 * published by the Free Software Foundation; only version 2 of the License.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <errno.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <time.h>
26 #include <sys/types.h>
27 #include <inttypes.h>
28 #include <stdlib.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <netdb.h>
32 #include <fcntl.h>
33 #include <sys/mman.h>
34 #include <sys/stat.h>
35
36 #include <tap/tap.h>
37 #include <lttng/lttng.h>
38
39 #include <urcu/list.h>
40 #include <bin/lttng-sessiond/session.h>
41 #include <common/common.h>
42
43 #include <bin/lttng-relayd/lttng-viewer.h>
44 #include <common/index/lttng-index.h>
45
46 #define SESSION1 "test1"
47 #define RELAYD_URL "net://localhost"
48 #define LIVE_TIMER 2000000
49
50 /* Number of TAP tests in this file */
51 #define NUM_TESTS 7
52 #define mmap_size 524288
53
54 int ust_consumerd32_fd;
55 int ust_consumerd64_fd;
56
57 static int control_sock;
58 struct live_session *session;
59
60 static int first_packet_offset;
61 static int first_packet_len;
62 static int first_packet_stream_id;
63
64 struct viewer_stream {
65 uint64_t id;
66 uint64_t ctf_trace_id;
67 void *mmap_base;
68 int fd;
69 int metadata_flag;
70 int first_read;
71 char path[PATH_MAX];
72 };
73
74 struct live_session {
75 struct viewer_stream *streams;
76 uint64_t live_timer_interval;
77 uint64_t stream_count;
78 };
79
80 static
81 int connect_viewer(char *hostname)
82 {
83 struct hostent *host;
84 struct sockaddr_in server_addr;
85 int ret;
86
87 host = gethostbyname(hostname);
88 if (!host) {
89 ret = -1;
90 goto end;
91 }
92
93 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
94 perror("Socket");
95 ret = -1;
96 goto end;
97 }
98
99 server_addr.sin_family = AF_INET;
100 server_addr.sin_port = htons(5344);
101 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
102 bzero(&(server_addr.sin_zero), 8);
103
104 if (connect(control_sock, (struct sockaddr *) &server_addr,
105 sizeof(struct sockaddr)) == -1) {
106 perror("Connect");
107 ret = -1;
108 goto end;
109 }
110
111 server_addr.sin_family = AF_INET;
112 server_addr.sin_port = htons(5345);
113 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
114 bzero(&(server_addr.sin_zero), 8);
115
116 ret = 0;
117
118 end:
119 return ret;
120 }
121
122 int establish_connection(void)
123 {
124 struct lttng_viewer_cmd cmd;
125 struct lttng_viewer_connect connect;
126 int ret;
127
128 cmd.cmd = htobe32(VIEWER_CONNECT);
129 cmd.data_size = sizeof(connect);
130 cmd.cmd_version = 0;
131
132 connect.major = htobe32(VERSION_MAJOR);
133 connect.minor = htobe32(VERSION_MINOR);
134 connect.type = htobe32(VIEWER_CLIENT_COMMAND);
135
136 do {
137 ret = send(control_sock, &cmd, sizeof(cmd), 0);
138 } while (ret < 0 && errno == EINTR);
139 if (ret < 0) {
140 fprintf(stderr, "Error sending cmd\n");
141 goto error;
142 }
143 do {
144 ret = send(control_sock, &connect, sizeof(connect), 0);
145 } while (ret < 0 && errno == EINTR);
146 if (ret < 0) {
147 fprintf(stderr, "Error sending version\n");
148 goto error;
149 }
150
151 do {
152 ret = recv(control_sock, &connect, sizeof(connect), 0);
153 } while (ret < 0 && errno == EINTR);
154 if (ret < 0) {
155 fprintf(stderr, "Error receiving version\n");
156 goto error;
157 }
158 ret = 0;
159
160 error:
161 return ret;
162 }
163
164 /*
165 * Returns the number of sessions, should be 1 during the unit test.
166 */
167 int list_sessions(int *session_id)
168 {
169 struct lttng_viewer_cmd cmd;
170 struct lttng_viewer_list_sessions list;
171 struct lttng_viewer_session lsession;
172 int i, ret;
173 int first_session = 0;
174
175 cmd.cmd = htobe32(VIEWER_LIST_SESSIONS);
176 cmd.data_size = 0;
177 cmd.cmd_version = 0;
178
179 do {
180 ret = send(control_sock, &cmd, sizeof(cmd), 0);
181 } while (ret < 0 && errno == EINTR);
182 if (ret < 0) {
183 fprintf(stderr, "Error sending cmd\n");
184 goto error;
185 }
186
187 do {
188 ret = recv(control_sock, &list, sizeof(list), 0);
189 } while (ret < 0 && errno == EINTR);
190 if (ret < 0) {
191 fprintf(stderr, "Error receiving session list\n");
192 goto error;
193 }
194
195 for (i = 0; i < be32toh(list.sessions_count); i++) {
196 do {
197 ret = recv(control_sock, &lsession, sizeof(lsession), 0);
198 } while (ret < 0 && errno == EINTR);
199 if (ret < 0) {
200 fprintf(stderr, "Error receiving session\n");
201 goto error;
202 }
203 if (lsession.streams > 0 && first_session <= 0) {
204 first_session = be64toh(lsession.id);
205 *session_id = first_session;
206 }
207 }
208
209 ret = be32toh(list.sessions_count);
210
211 error:
212 return ret;
213 }
214
215 int attach_session(int id)
216 {
217 struct lttng_viewer_cmd cmd;
218 struct lttng_viewer_attach_session_request rq;
219 struct lttng_viewer_attach_session_response rp;
220 struct lttng_viewer_stream stream;
221 int ret, i;
222
223 session = zmalloc(sizeof(struct live_session));
224 if (!session) {
225 ret = -1;
226 goto error;
227 }
228
229 cmd.cmd = htobe32(VIEWER_ATTACH_SESSION);
230 cmd.data_size = sizeof(rq);
231 cmd.cmd_version = 0;
232
233 rq.session_id = htobe64(id);
234 rq.seek = htobe32(VIEWER_SEEK_BEGINNING);
235
236 do {
237 ret = send(control_sock, &cmd, sizeof(cmd), 0);
238 } while (ret < 0 && errno == EINTR);
239 if (ret < 0) {
240 fprintf(stderr, "Error sending cmd\n");
241 goto error;
242 }
243 do {
244 ret = send(control_sock, &rq, sizeof(rq), 0);
245 } while (ret < 0 && errno == EINTR);
246 if (ret < 0) {
247 fprintf(stderr, "Error sending attach request\n");
248 goto error;
249 }
250
251 do {
252 ret = recv(control_sock, &rp, sizeof(rp), 0);
253 } while (ret < 0 && errno == EINTR);
254 if (ret < 0) {
255 fprintf(stderr, "Error receiving attach response\n");
256 goto error;
257 }
258 if (be32toh(rp.status) != VIEWER_ATTACH_OK) {
259 ret = -1;
260 goto end;
261 }
262
263 session->stream_count = be32toh(rp.streams_count);
264 session->streams = zmalloc(session->stream_count *
265 sizeof(struct viewer_stream));
266 if (!session->streams) {
267 ret = -1;
268 goto error;
269 }
270
271 for (i = 0; i < be32toh(rp.streams_count); i++) {
272 do {
273 ret = recv(control_sock, &stream, sizeof(stream), 0);
274 } while (ret < 0 && errno == EINTR);
275 if (ret < 0) {
276 fprintf(stderr, "Error receiving stream\n");
277 goto error;
278 }
279 session->streams[i].id = be64toh(stream.id);
280
281 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
282 session->streams[i].first_read = 1;
283 session->streams[i].mmap_base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
284 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
285 if (session->streams[i].mmap_base == MAP_FAILED) {
286 fprintf(stderr, "mmap error\n");
287 ret = -1;
288 goto error;
289 }
290
291 if (be32toh(stream.metadata_flag)) {
292 session->streams[i].metadata_flag = 1;
293 }
294 }
295 ret = session->stream_count;
296
297 end:
298 error:
299 return ret;
300 }
301
302 int get_metadata(void)
303 {
304 struct lttng_viewer_cmd cmd;
305 struct lttng_viewer_get_metadata rq;
306 struct lttng_viewer_metadata_packet rp;
307 int ret;
308 uint64_t i;
309 char *data = NULL;
310 uint64_t len = 0;
311 int metadata_stream_id = -1;
312
313 cmd.cmd = htobe32(VIEWER_GET_METADATA);
314 cmd.data_size = sizeof(rq);
315 cmd.cmd_version = 0;
316
317 for (i = 0; i < session->stream_count; i++) {
318 if (session->streams[i].metadata_flag) {
319 metadata_stream_id = i;
320 break;
321 }
322 }
323
324 if (metadata_stream_id < 0) {
325 fprintf(stderr, "No metadata stream found\n");
326 ret = -1;
327 goto error;
328 }
329
330 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
331
332 do {
333 ret = send(control_sock, &cmd, sizeof(cmd), 0);
334 } while (ret < 0 && errno == EINTR);
335 if (ret < 0) {
336 fprintf(stderr, "Error sending cmd\n");
337 goto error;
338 }
339 do {
340 ret = send(control_sock, &rq, sizeof(rq), 0);
341 } while (ret < 0 && errno == EINTR);
342 if (ret < 0) {
343 fprintf(stderr, "Error sending get_metadata request\n");
344 goto error;
345 }
346 do {
347 ret = recv(control_sock, &rp, sizeof(rp), 0);
348 } while (ret < 0 && errno == EINTR);
349 if (ret < 0) {
350 fprintf(stderr, "Error receiving metadata response\n");
351 goto error;
352 }
353 switch (be32toh(rp.status)) {
354 case VIEWER_METADATA_OK:
355 break;
356 case VIEWER_NO_NEW_METADATA:
357 fprintf(stderr, "NO NEW\n");
358 ret = -1;
359 goto end;
360 case VIEWER_METADATA_ERR:
361 fprintf(stderr, "ERR\n");
362 ret = -1;
363 goto end;
364 default:
365 fprintf(stderr, "UNKNOWN\n");
366 ret = -1;
367 goto end;
368 }
369
370 len = be64toh(rp.len);
371 if (len <= 0) {
372 goto end;
373 }
374
375 data = zmalloc(len);
376 if (!data) {
377 perror("relay data zmalloc");
378 goto error;
379 }
380 do {
381 ret = recv(control_sock, data, len, MSG_WAITALL);
382 } while (ret < 0 && errno == EINTR);
383 if (ret < 0) {
384 fprintf(stderr, "Error receiving trace packet\n");
385 free(data);
386 goto error;
387 }
388 free(data);
389
390 ret = (int) len;
391 end:
392 error:
393 return ret;
394 }
395
396 int get_next_index(void)
397 {
398 struct lttng_viewer_cmd cmd;
399 struct lttng_viewer_get_next_index rq;
400 struct lttng_viewer_index rp;
401 int ret;
402 int id;
403
404 cmd.cmd = htobe32(VIEWER_GET_NEXT_INDEX);
405 cmd.data_size = sizeof(rq);
406 cmd.cmd_version = 0;
407
408 for (id = 0; id < session->stream_count; id++) {
409 if (session->streams[id].metadata_flag) {
410 continue;
411 }
412 rq.stream_id = htobe64(session->streams[id].id);
413
414 retry:
415 do {
416 ret = send(control_sock, &cmd, sizeof(cmd), 0);
417 } while (ret < 0 && errno == EINTR);
418 if (ret < 0) {
419 fprintf(stderr, "Error sending cmd\n");
420 goto error;
421 }
422 do {
423 ret = send(control_sock, &rq, sizeof(rq), 0);
424 } while (ret < 0 && errno == EINTR);
425 if (ret < 0) {
426 fprintf(stderr, "Error sending get_next_index request\n");
427 goto error;
428 }
429 do {
430 ret = recv(control_sock, &rp, sizeof(rp), 0);
431 } while (ret < 0 && errno == EINTR);
432 if (ret < 0) {
433 fprintf(stderr, "Error receiving index response\n");
434 goto error;
435 }
436
437 rp.flags = be32toh(rp.flags);
438
439 switch (be32toh(rp.status)) {
440 case VIEWER_INDEX_INACTIVE:
441 fprintf(stderr, "(INACTIVE)\n");
442 break;
443 case VIEWER_INDEX_OK:
444 break;
445 case VIEWER_INDEX_RETRY:
446 sleep(1);
447 goto retry;
448 case VIEWER_INDEX_HUP:
449 fprintf(stderr, "(HUP)\n");
450 session->streams[id].id = -1ULL;
451 session->streams[id].fd = -1;
452 break;
453 case VIEWER_INDEX_ERR:
454 fprintf(stderr, "(ERR)\n");
455 ret = -1;
456 goto error;
457 default:
458 fprintf(stderr, "SHOULD NOT HAPPEN\n");
459 ret = -1;
460 goto error;
461 }
462 if (!first_packet_stream_id) {
463 first_packet_offset = be64toh(rp.offset);
464 first_packet_len = be64toh(rp.packet_size) / CHAR_BIT;
465 first_packet_stream_id = id;
466 }
467 }
468 ret = 0;
469
470 error:
471 return ret;
472 }
473
474 static
475 int get_data_packet(int id, uint64_t offset,
476 uint64_t len)
477 {
478 struct lttng_viewer_cmd cmd;
479 struct lttng_viewer_get_packet rq;
480 struct lttng_viewer_trace_packet rp;
481 int ret;
482
483 cmd.cmd = htobe32(VIEWER_GET_PACKET);
484 cmd.data_size = sizeof(rq);
485 cmd.cmd_version = 0;
486
487 rq.stream_id = htobe64(session->streams[id].id);
488 /* Already in big endian. */
489 rq.offset = offset;
490 rq.len = htobe32(len);
491
492 do {
493 ret = send(control_sock, &cmd, sizeof(cmd), 0);
494 } while (ret < 0 && errno == EINTR);
495 if (ret < 0) {
496 fprintf(stderr, "Error sending cmd\n");
497 goto error;
498 }
499 do {
500 ret = send(control_sock, &rq, sizeof(rq), 0);
501 } while (ret < 0 && errno == EINTR);
502 if (ret < 0) {
503 fprintf(stderr, "Error sending get_data_packet request\n");
504 goto error;
505 }
506 do {
507 ret = recv(control_sock, &rp, sizeof(rp), 0);
508 } while (ret < 0 && errno == EINTR);
509 if (ret < 0) {
510 fprintf(stderr, "Error receiving data response\n");
511 goto error;
512 }
513 rp.flags = be32toh(rp.flags);
514
515 switch (be32toh(rp.status)) {
516 case VIEWER_GET_PACKET_OK:
517 break;
518 case VIEWER_GET_PACKET_RETRY:
519 fprintf(stderr, "RETRY\n");
520 ret = -1;
521 goto end;
522 case VIEWER_GET_PACKET_ERR:
523 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
524 fprintf(stderr, "NEW_METADATA\n");
525 ret = 0;
526 goto end;
527 }
528 fprintf(stderr, "ERR\n");
529 ret = -1;
530 goto end;
531 default:
532 fprintf(stderr, "UNKNOWN\n");
533 ret = -1;
534 goto end;
535 }
536
537 len = be32toh(rp.len);
538 if (len <= 0) {
539 goto end;
540 }
541
542 if (len > mmap_size) {
543 fprintf(stderr, "mmap_size not big enough\n");
544 ret = -1;
545 goto error;
546 }
547
548 do {
549 ret = recv(control_sock, session->streams[id].mmap_base, len, MSG_WAITALL);
550 } while (ret < 0 && errno == EINTR);
551 if (ret < 0) {
552 fprintf(stderr, "Error receiving trace packet\n");
553 goto error;
554 }
555 ret = len;
556
557 end:
558 error:
559 return ret;
560 }
561
562 int main(int argc, char **argv)
563 {
564 int ret;
565 int session_id;
566
567 plan_tests(NUM_TESTS);
568
569 diag("Live unit tests");
570
571 ret = connect_viewer("localhost");
572 ok(ret == 0, "Connect viewer to relayd");
573
574 ret = establish_connection();
575 ok(ret == 0, "Established connection and version check with %d.%d",
576 VERSION_MAJOR, VERSION_MINOR);
577
578 ret = list_sessions(&session_id);
579 ok(ret > 0, "List sessions : %d session(s)", ret);
580
581 ret = attach_session(session_id);
582 ok(ret > 0, "Attach to session, %d streams received", ret);
583
584 ret = get_metadata();
585 ok(ret > 0, "Get metadata, received %d bytes", ret);
586
587 ret = get_next_index();
588 ok(ret == 0, "Get one index per stream");
589
590 ret = get_data_packet(first_packet_stream_id, first_packet_offset,
591 first_packet_len);
592 ok(ret == first_packet_len,
593 "Get one data packet for stream %d, offset %d, len %d",
594 first_packet_stream_id, first_packet_offset,
595 first_packet_len);
596
597 return exit_status();
598 }
This page took 0.041565 seconds and 5 git commands to generate.