2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
13 #include <common/compat/time.h>
14 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <netinet/in.h>
25 #include <lttng/lttng.h>
27 #include <urcu/list.h>
28 #include <common/common.h>
30 #include <bin/lttng-relayd/lttng-viewer-abi.h>
31 #include <common/index/ctf-index.h>
33 #include <common/compat/errno.h>
34 #include <common/compat/endian.h>
36 #define SESSION1 "test1"
37 #define RELAYD_URL "net://localhost"
38 #define LIVE_TIMER 2000000
40 /* Number of TAP tests in this file */
42 #define mmap_size 524288
44 static int control_sock
;
45 struct live_session
*session
;
47 static int first_packet_offset
;
48 static int first_packet_len
;
49 static int first_packet_stream_id
= -1;
51 struct viewer_stream
{
53 uint64_t ctf_trace_id
;
62 struct viewer_stream
*streams
;
63 uint64_t live_timer_interval
;
64 uint64_t stream_count
;
68 ssize_t
lttng_live_recv(int fd
, void *buf
, size_t len
)
71 size_t copied
= 0, to_copy
= len
;
74 ret
= recv(fd
, buf
+ copied
, to_copy
, 0);
76 assert(ret
<= to_copy
);
80 } while ((ret
> 0 && to_copy
> 0)
81 || (ret
< 0 && errno
== EINTR
));
84 /* ret = 0 means orderly shutdown, ret < 0 is error. */
89 ssize_t
lttng_live_send(int fd
, const void *buf
, size_t len
)
94 ret
= send(fd
, buf
, len
, MSG_NOSIGNAL
);
95 } while (ret
< 0 && errno
== EINTR
);
100 int connect_viewer(const char *hostname
)
102 struct hostent
*host
;
103 struct sockaddr_in server_addr
;
106 host
= gethostbyname(hostname
);
112 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
118 server_addr
.sin_family
= AF_INET
;
119 server_addr
.sin_port
= htons(5344);
120 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
121 bzero(&(server_addr
.sin_zero
), 8);
123 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
124 sizeof(struct sockaddr
)) == -1) {
130 server_addr
.sin_family
= AF_INET
;
131 server_addr
.sin_port
= htons(5345);
132 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
133 bzero(&(server_addr
.sin_zero
), 8);
142 int establish_connection(void)
144 struct lttng_viewer_cmd cmd
;
145 struct lttng_viewer_connect connect
;
148 cmd
.cmd
= htobe32(LTTNG_VIEWER_CONNECT
);
149 cmd
.data_size
= htobe64(sizeof(connect
));
150 cmd
.cmd_version
= htobe32(0);
152 memset(&connect
, 0, sizeof(connect
));
153 connect
.major
= htobe32(VERSION_MAJOR
);
154 connect
.minor
= htobe32(VERSION_MINOR
);
155 connect
.type
= htobe32(LTTNG_VIEWER_CLIENT_COMMAND
);
157 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
159 diag("Error sending cmd");
162 ret_len
= lttng_live_send(control_sock
, &connect
, sizeof(connect
));
164 diag("Error sending version");
168 ret_len
= lttng_live_recv(control_sock
, &connect
, sizeof(connect
));
170 diag("[error] Remote side has closed connection");
174 diag("Error receiving version");
184 * Returns the number of sessions, should be 1 during the unit test.
187 int list_sessions(uint64_t *session_id
)
189 struct lttng_viewer_cmd cmd
;
190 struct lttng_viewer_list_sessions list
;
191 struct lttng_viewer_session lsession
;
194 int first_session
= 0;
196 cmd
.cmd
= htobe32(LTTNG_VIEWER_LIST_SESSIONS
);
197 cmd
.data_size
= htobe64(0);
198 cmd
.cmd_version
= htobe32(0);
200 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
202 diag("Error sending cmd");
206 ret_len
= lttng_live_recv(control_sock
, &list
, sizeof(list
));
208 diag("[error] Remote side has closed connection");
212 diag("Error receiving session list");
216 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
217 ret_len
= lttng_live_recv(control_sock
, &lsession
, sizeof(lsession
));
219 diag("Error receiving session");
222 if (lsession
.streams
> 0 && first_session
<= 0) {
223 first_session
= be64toh(lsession
.id
);
224 *session_id
= first_session
;
228 return be32toh(list
.sessions_count
);
235 int create_viewer_session(void)
237 struct lttng_viewer_cmd cmd
;
238 struct lttng_viewer_create_session_response resp
;
241 cmd
.cmd
= htobe32(LTTNG_VIEWER_CREATE_SESSION
);
242 cmd
.data_size
= htobe64(0);
243 cmd
.cmd_version
= htobe32(0);
245 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
247 diag("[error] Error sending cmd");
250 assert(ret_len
== sizeof(cmd
));
252 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
254 diag("[error] Remote side has closed connection");
258 diag("[error] Error receiving create session reply");
261 assert(ret_len
== sizeof(resp
));
263 if (be32toh(resp
.status
) != LTTNG_VIEWER_CREATE_SESSION_OK
) {
264 diag("[error] Error creating viewer session");
274 int attach_session(uint64_t id
)
276 struct lttng_viewer_cmd cmd
;
277 struct lttng_viewer_attach_session_request rq
;
278 struct lttng_viewer_attach_session_response rp
;
279 struct lttng_viewer_stream stream
;
283 session
= zmalloc(sizeof(struct live_session
));
288 cmd
.cmd
= htobe32(LTTNG_VIEWER_ATTACH_SESSION
);
289 cmd
.data_size
= htobe64(sizeof(rq
));
290 cmd
.cmd_version
= htobe32(0);
292 memset(&rq
, 0, sizeof(rq
));
293 rq
.session_id
= htobe64(id
);
294 rq
.seek
= htobe32(LTTNG_VIEWER_SEEK_BEGINNING
);
296 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
298 diag("Error sending cmd LTTNG_VIEWER_ATTACH_SESSION");
301 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
303 diag("Error sending attach request");
307 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
309 diag("[error] Remote side has closed connection");
313 diag("Error receiving attach response");
316 if (be32toh(rp
.status
) != LTTNG_VIEWER_ATTACH_OK
) {
320 session
->stream_count
= be32toh(rp
.streams_count
);
321 if (session
->stream_count
== 0) {
322 diag("Got session stream count == 0");
325 session
->streams
= zmalloc(session
->stream_count
*
326 sizeof(struct viewer_stream
));
327 if (!session
->streams
) {
331 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
332 ret_len
= lttng_live_recv(control_sock
, &stream
, sizeof(stream
));
334 diag("[error] Remote side has closed connection");
338 diag("Error receiving stream");
341 session
->streams
[i
].id
= be64toh(stream
.id
);
343 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
344 session
->streams
[i
].first_read
= 1;
345 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
,
346 PROT_READ
| PROT_WRITE
,
347 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
348 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
353 if (be32toh(stream
.metadata_flag
)) {
354 session
->streams
[i
].metadata_flag
= 1;
357 return session
->stream_count
;
364 int get_metadata(void)
366 struct lttng_viewer_cmd cmd
;
367 struct lttng_viewer_get_metadata rq
;
368 struct lttng_viewer_metadata_packet rp
;
374 int metadata_stream_id
= -1;
376 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_METADATA
);
377 cmd
.data_size
= htobe64(sizeof(rq
));
378 cmd
.cmd_version
= htobe32(0);
380 for (i
= 0; i
< session
->stream_count
; i
++) {
381 if (session
->streams
[i
].metadata_flag
) {
382 metadata_stream_id
= i
;
387 if (metadata_stream_id
< 0) {
388 diag("No metadata stream found");
392 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
395 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
397 diag("Error sending cmd");
400 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
402 diag("Error sending get_metadata request");
405 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
407 diag("[error] Remote side has closed connection");
411 diag("Error receiving metadata response");
414 switch (be32toh(rp
.status
)) {
415 case LTTNG_VIEWER_METADATA_OK
:
417 case LTTNG_VIEWER_NO_NEW_METADATA
:
418 diag("Got LTTNG_VIEWER_NO_NEW_METADATA:");
421 case LTTNG_VIEWER_METADATA_ERR
:
422 diag("Got LTTNG_VIEWER_METADATA_ERR:");
425 diag("Got unknown status during LTTNG_VIEWER_GET_METADATA");
429 len
= be64toh(rp
.len
);
436 PERROR("relay data zmalloc");
439 ret_len
= lttng_live_recv(control_sock
, data
, len
);
441 diag("[error] Remote side has closed connection");
442 goto error_free_data
;
445 diag("Error receiving trace packet");
446 goto error_free_data
;
460 int get_next_index(void)
462 struct lttng_viewer_cmd cmd
;
463 struct lttng_viewer_get_next_index rq
;
464 struct lttng_viewer_index rp
;
468 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_NEXT_INDEX
);
469 cmd
.data_size
= htobe64(sizeof(rq
));
470 cmd
.cmd_version
= htobe32(0);
472 for (id
= 0; id
< session
->stream_count
; id
++) {
473 if (session
->streams
[id
].metadata_flag
) {
476 memset(&rq
, 0, sizeof(rq
));
477 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
480 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
482 diag("Error sending cmd");
485 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
487 diag("Error sending get_next_index request");
490 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
492 diag("[error] Remote side has closed connection");
496 diag("Error receiving index response");
500 rp
.flags
= be32toh(rp
.flags
);
502 switch (be32toh(rp
.status
)) {
503 case LTTNG_VIEWER_INDEX_INACTIVE
:
504 /* Skip this stream. */
505 diag("Got LTTNG_VIEWER_INDEX_INACTIVE");
507 case LTTNG_VIEWER_INDEX_OK
:
509 case LTTNG_VIEWER_INDEX_RETRY
:
512 case LTTNG_VIEWER_INDEX_HUP
:
513 diag("Got LTTNG_VIEWER_INDEX_HUP");
514 session
->streams
[id
].id
= -1ULL;
515 session
->streams
[id
].fd
= -1;
517 case LTTNG_VIEWER_INDEX_ERR
:
518 diag("Got LTTNG_VIEWER_INDEX_ERR");
521 diag("Unknown reply status during LTTNG_VIEWER_GET_NEXT_INDEX (%d)", be32toh(rp
.status
));
524 if (first_packet_stream_id
< 0) {
526 * Initialize the first packet stream id. That is,
527 * the first active stream encoutered.
529 first_packet_offset
= be64toh(rp
.offset
);
530 first_packet_len
= be64toh(rp
.packet_size
) / CHAR_BIT
;
531 first_packet_stream_id
= id
;
532 diag("Got first packet index with offset %d and len %d",
533 first_packet_offset
, first_packet_len
);
543 int get_data_packet(int id
, uint64_t offset
,
546 struct lttng_viewer_cmd cmd
;
547 struct lttng_viewer_get_packet rq
;
548 struct lttng_viewer_trace_packet rp
;
551 cmd
.cmd
= htobe32(LTTNG_VIEWER_GET_PACKET
);
552 cmd
.data_size
= htobe64(sizeof(rq
));
553 cmd
.cmd_version
= htobe32(0);
555 memset(&rq
, 0, sizeof(rq
));
556 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
557 /* Already in big endian. */
559 rq
.len
= htobe32(len
);
561 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
563 diag("Error sending cmd");
566 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
568 diag("Error sending get_data_packet request");
571 ret_len
= lttng_live_recv(control_sock
, &rp
, sizeof(rp
));
573 diag("[error] Remote side has closed connection");
577 diag("Error receiving data response");
580 rp
.flags
= be32toh(rp
.flags
);
582 switch (be32toh(rp
.status
)) {
583 case LTTNG_VIEWER_GET_PACKET_OK
:
584 len
= be32toh(rp
.len
);
586 diag("Got LTTNG_VIEWER_GET_PACKET_OK, but len == 0");
590 case LTTNG_VIEWER_GET_PACKET_RETRY
:
591 diag("Got LTTNG_VIEWER_GET_PACKET_RETRY:");
593 case LTTNG_VIEWER_GET_PACKET_ERR
:
594 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
595 diag("Got LTTNG_VIEWER_GET_PACKET_ERR with NEW_METADATA flag");
598 diag("Got LTTNG_VIEWER_GET_PACKET_ERR:");
601 diag("Got unknown status code during LTTNG_VIEWER_GET_PACKET");
605 if (len
> mmap_size
) {
606 diag("mmap_size not big enough");
610 ret_len
= lttng_live_recv(control_sock
, session
->streams
[id
].mmap_base
, len
);
612 diag("[error] Remote side has closed connection");
616 diag("Error receiving trace packet");
626 int detach_viewer_session(uint64_t id
)
628 struct lttng_viewer_cmd cmd
;
629 struct lttng_viewer_detach_session_response resp
;
630 struct lttng_viewer_detach_session_request rq
;
634 cmd
.cmd
= htobe32(LTTNG_VIEWER_DETACH_SESSION
);
635 cmd
.data_size
= htobe64(sizeof(rq
));
636 cmd
.cmd_version
= htobe32(0);
638 memset(&rq
, 0, sizeof(rq
));
639 rq
.session_id
= htobe64(id
);
641 ret_len
= lttng_live_send(control_sock
, &cmd
, sizeof(cmd
));
643 fprintf(stderr
, "[error] Error sending cmd\n");
648 ret_len
= lttng_live_send(control_sock
, &rq
, sizeof(rq
));
650 fprintf(stderr
, "Error sending attach request\n");
655 ret_len
= lttng_live_recv(control_sock
, &resp
, sizeof(resp
));
657 fprintf(stderr
, "[error] Error receiving detach session reply\n");
662 if (be32toh(resp
.status
) != LTTNG_VIEWER_DETACH_SESSION_OK
) {
663 fprintf(stderr
, "[error] Error detaching viewer session\n");
673 int main(int argc
, char **argv
)
678 plan_tests(NUM_TESTS
);
680 diag("Live unit tests");
682 ret
= connect_viewer("localhost");
683 ok(ret
== 0, "Connect viewer to relayd");
685 ret
= establish_connection();
686 ok(ret
== 0, "Established connection and version check with %d.%d",
687 VERSION_MAJOR
, VERSION_MINOR
);
689 ret
= list_sessions(&session_id
);
690 ok(ret
> 0, "List sessions : %d session(s)", ret
);
695 ret
= create_viewer_session();
696 ok(ret
== 0, "Create viewer session");
698 ret
= attach_session(session_id
);
699 ok(ret
> 0, "Attach to session, %d stream(s) received", ret
);
701 ret
= get_metadata();
702 ok(ret
> 0, "Get metadata, received %d bytes", ret
);
704 ret
= get_next_index();
705 ok(ret
== 0, "Get one index per stream");
707 ret
= get_data_packet(first_packet_stream_id
, first_packet_offset
,
710 "Get one data packet for stream %d, offset %d, len %d",
711 first_packet_stream_id
, first_packet_offset
,
714 ret
= detach_viewer_session(session_id
);
715 ok(ret
== 0, "Detach viewer session");
717 ret
= list_sessions(&session_id
);
718 ok(ret
> 0, "List sessions : %d session(s)", ret
);
720 ret
= attach_session(session_id
);
721 ok(ret
> 0, "Attach to session, %d streams received", ret
);
723 return exit_status();