2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
37 #include "lttng-viewer.h"
38 #include "ctf-index.h"
39 #include "network-live.h"
41 #include <babeltrace/babeltrace.h>
42 #include <babeltrace/ctf/events.h>
43 #include <babeltrace/ctf/callbacks.h>
44 #include <babeltrace/ctf/iterator.h>
46 /* for packet_index */
47 #include <babeltrace/ctf/types.h>
49 #include <babeltrace/ctf/metadata.h>
50 #include <babeltrace/ctf-text/types.h>
51 #include <babeltrace/ctf/events-internal.h>
54 * Memory allocation zeroed
56 #define zmalloc(x) calloc(1, x)
57 /* FIXME : completely arbitrary */
58 #define mmap_size 524288
60 static int control_sock
;
61 struct live_session
*session
;
63 struct viewer_stream
{
65 uint64_t ctf_trace_id
;
74 struct viewer_stream
*streams
;
75 uint64_t live_timer_interval
;
76 uint64_t stream_count
;
80 int connect_viewer(char *hostname
)
83 struct sockaddr_in server_addr
;
86 host
= gethostbyname(hostname
);
92 if ((control_sock
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
98 server_addr
.sin_family
= AF_INET
;
99 server_addr
.sin_port
= htons(5344);
100 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
101 bzero(&(server_addr
.sin_zero
), 8);
103 if (connect(control_sock
, (struct sockaddr
*) &server_addr
,
104 sizeof(struct sockaddr
)) == -1) {
110 server_addr
.sin_family
= AF_INET
;
111 server_addr
.sin_port
= htons(5345);
112 server_addr
.sin_addr
= *((struct in_addr
*) host
->h_addr
);
113 bzero(&(server_addr
.sin_zero
), 8);
122 int establish_connection(void)
124 struct lttng_viewer_cmd cmd
;
125 struct lttng_viewer_connect connect
;
128 cmd
.cmd
= htobe32(VIEWER_CONNECT
);
129 cmd
.data_size
= sizeof(connect
);
132 connect
.major
= htobe32(2);
133 connect
.minor
= htobe32(4);
134 connect
.type
= htobe32(VIEWER_CLIENT_COMMAND
);
137 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
138 } while (ret
< 0 && errno
== EINTR
);
140 fprintf(stderr
, "Error sending cmd\n");
144 ret
= send(control_sock
, &connect
, sizeof(connect
), 0);
145 } while (ret
< 0 && errno
== EINTR
);
147 fprintf(stderr
, "Error sending version\n");
152 ret
= recv(control_sock
, &connect
, sizeof(connect
), 0);
153 } while (ret
< 0 && errno
== EINTR
);
155 fprintf(stderr
, "Error receiving version\n");
158 fprintf(stderr
, " - Received viewer session ID : %" PRIu64
"\n",
159 be64toh(connect
.viewer_session_id
));
160 fprintf(stderr
, " - Received version : %u.%u\n", be32toh(connect
.major
),
161 be32toh(connect
.minor
));
169 int list_sessions(void)
171 struct lttng_viewer_cmd cmd
;
172 struct lttng_viewer_list_sessions list
;
173 struct lttng_viewer_session lsession
;
175 int first_session
= 0;
177 cmd
.cmd
= htobe32(VIEWER_LIST_SESSIONS
);
182 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
183 } while (ret
< 0 && errno
== EINTR
);
185 fprintf(stderr
, "Error sending cmd\n");
190 ret
= recv(control_sock
, &list
, sizeof(list
), 0);
191 } while (ret
< 0 && errno
== EINTR
);
193 fprintf(stderr
, "Error receiving session list\n");
197 fprintf(stderr
, " - %u active session(s)\n", be32toh(list
.sessions_count
));
198 for (i
= 0; i
< be32toh(list
.sessions_count
); i
++) {
200 ret
= recv(control_sock
, &lsession
, sizeof(lsession
), 0);
201 } while (ret
< 0 && errno
== EINTR
);
203 fprintf(stderr
, "Error receiving session\n");
206 fprintf(stderr
, " - %" PRIu64
" : %s on host %s (timer = %u, "
207 "%u client(s) connected)\n",
208 be64toh(lsession
.id
), lsession
.session_name
,
209 lsession
.hostname
, be32toh(lsession
.live_timer
),
210 be32toh(lsession
.clients
));
211 if (first_session
<= 0) {
212 first_session
= be64toh(lsession
.id
);
216 /* I know, type mismatch */
217 ret
= (int) first_session
;
224 int attach_session(int id
, int begin
)
226 struct lttng_viewer_cmd cmd
;
227 struct lttng_viewer_attach_session_request rq
;
228 struct lttng_viewer_attach_session_response rp
;
229 struct lttng_viewer_stream stream
;
232 cmd
.cmd
= htobe32(VIEWER_ATTACH_SESSION
);
233 cmd
.data_size
= sizeof(rq
);
236 rq
.session_id
= htobe64(id
);
238 rq
.seek
= htobe32(VIEWER_SEEK_BEGINNING
);
240 rq
.seek
= htobe32(VIEWER_SEEK_LAST
);
244 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
245 } while (ret
< 0 && errno
== EINTR
);
247 fprintf(stderr
, "Error sending cmd\n");
251 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
252 } while (ret
< 0 && errno
== EINTR
);
254 fprintf(stderr
, "Error sending attach request\n");
259 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
260 } while (ret
< 0 && errno
== EINTR
);
262 fprintf(stderr
, "Error receiving attach response\n");
265 fprintf(stderr
, " - session attach response : %u\n", be32toh(rp
.status
));
266 if (be32toh(rp
.status
) != VIEWER_ATTACH_OK
) {
271 session
->stream_count
= be32toh(rp
.streams_count
);
272 fprintf(stderr
, " - Waiting for %" PRIu64
" streams\n", session
->stream_count
);
273 session
->streams
= zmalloc(session
->stream_count
*
274 sizeof(struct viewer_stream
));
275 if (!session
->streams
) {
280 for (i
= 0; i
< be32toh(rp
.streams_count
); i
++) {
282 ret
= recv(control_sock
, &stream
, sizeof(stream
), 0);
283 } while (ret
< 0 && errno
== EINTR
);
285 fprintf(stderr
, "Error receiving stream\n");
288 fprintf(stderr
, " - stream %" PRIu64
" : %s/%s\n",
289 be64toh(stream
.id
), stream
.path_name
,
290 stream
.channel_name
);
291 session
->streams
[i
].id
= be64toh(stream
.id
);
293 session
->streams
[i
].ctf_trace_id
= be64toh(stream
.ctf_trace_id
);
294 session
->streams
[i
].first_read
= 1;
295 session
->streams
[i
].mmap_base
= mmap(NULL
, mmap_size
, PROT_READ
| PROT_WRITE
,
296 MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0);
297 if (session
->streams
[i
].mmap_base
== MAP_FAILED
) {
298 fprintf(stderr
, "mmap error\n");
303 if (be32toh(stream
.metadata_flag
)) {
304 session
->streams
[i
].metadata_flag
= 1;
305 unlink("testlivetrace");
306 mkdir("testlivetrace", S_IRWXU
| S_IRWXG
);
307 snprintf(session
->streams
[i
].path
,
308 sizeof(session
->streams
[i
].path
),
310 stream
.channel_name
);
311 ret
= open(session
->streams
[i
].path
,
312 O_WRONLY
| O_CREAT
| O_TRUNC
,
313 S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
);
317 session
->streams
[i
].fd
= ret
;
330 void dump_packet_index(struct lttng_packet_index
*index
)
332 printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n",
333 be64toh(index
->offset
),
334 be64toh(index
->packet_size
),
335 be64toh(index
->content_size
),
336 be64toh(index
->timestamp_begin
),
337 be64toh(index
->timestamp_end
),
338 be64toh(index
->events_discarded
),
339 be64toh(index
->stream_id
));
344 int get_data_packet(int id
, uint64_t offset
,
347 struct lttng_viewer_cmd cmd
;
348 struct lttng_viewer_get_packet rq
;
349 struct lttng_viewer_trace_packet rp
;
352 cmd
.cmd
= htobe32(VIEWER_GET_PACKET
);
353 cmd
.data_size
= sizeof(rq
);
356 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
357 /* Already in big endian. */
359 rq
.len
= htobe32(len
);
360 fprintf(stderr
, " - get_packet ");
363 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
364 } while (ret
< 0 && errno
== EINTR
);
366 fprintf(stderr
, "Error sending cmd\n");
370 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
371 } while (ret
< 0 && errno
== EINTR
);
373 fprintf(stderr
, "Error sending get_data_packet request\n");
377 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
378 } while (ret
< 0 && errno
== EINTR
);
380 fprintf(stderr
, "Error receiving data response\n");
383 rp
.flags
= be32toh(rp
.flags
);
385 switch (be32toh(rp
.status
)) {
386 case VIEWER_GET_PACKET_OK
:
387 fprintf(stderr
, "OK\n");
389 case VIEWER_GET_PACKET_RETRY
:
390 fprintf(stderr
, "RETRY\n");
393 case VIEWER_GET_PACKET_ERR
:
394 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
395 fprintf(stderr
, "NEW_METADATA\n");
399 fprintf(stderr
, "ERR\n");
403 fprintf(stderr
, "UNKNOWN\n");
408 len
= be32toh(rp
.len
);
409 fprintf(stderr
, " - writing %" PRIu64
" bytes to tracefile\n", len
);
414 if (len
> mmap_size
) {
415 fprintf(stderr
, "mmap_size not big enough\n");
421 ret
= recv(control_sock
, session
->streams
[id
].mmap_base
, len
, MSG_WAITALL
);
422 } while (ret
< 0 && errno
== EINTR
);
424 fprintf(stderr
, "Error receiving trace packet\n");
434 * Return number of metadata bytes written or a negative value on error.
437 int get_new_metadata(int id
)
439 struct lttng_viewer_cmd cmd
;
440 struct lttng_viewer_get_metadata rq
;
441 struct lttng_viewer_metadata_packet rp
;
446 int metadata_stream_id
= -1;
448 cmd
.cmd
= htobe32(VIEWER_GET_METADATA
);
449 cmd
.data_size
= sizeof(rq
);
452 /* find the metadata stream for this ctf_trace */
453 for (i
= 0; i
< session
->stream_count
; i
++) {
454 if (session
->streams
[i
].metadata_flag
&&
455 session
->streams
[i
].ctf_trace_id
==
456 session
->streams
[id
].ctf_trace_id
) {
457 metadata_stream_id
= i
;
461 if (metadata_stream_id
< 0) {
462 fprintf(stderr
, "No metadata stream found\n");
467 rq
.stream_id
= htobe64(session
->streams
[metadata_stream_id
].id
);
468 fprintf(stderr
, " - get_metadata ");
471 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
472 } while (ret
< 0 && errno
== EINTR
);
474 fprintf(stderr
, "Error sending cmd\n");
478 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
479 } while (ret
< 0 && errno
== EINTR
);
481 fprintf(stderr
, "Error sending get_metadata request\n");
485 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
486 } while (ret
< 0 && errno
== EINTR
);
488 fprintf(stderr
, "Error receiving metadata response\n");
491 switch (be32toh(rp
.status
)) {
492 case VIEWER_METADATA_OK
:
493 fprintf(stderr
, "OK\n");
495 case VIEWER_NO_NEW_METADATA
:
496 fprintf(stderr
, "NO NEW\n");
499 case VIEWER_METADATA_ERR
:
500 fprintf(stderr
, "ERR\n");
504 fprintf(stderr
, "UNKNOWN\n");
509 len
= be64toh(rp
.len
);
510 fprintf(stderr
, " - writing %" PRIu64
" bytes to metadata\n", len
);
517 perror("relay data zmalloc");
521 ret
= recv(control_sock
, data
, len
, MSG_WAITALL
);
522 } while (ret
< 0 && errno
== EINTR
);
524 fprintf(stderr
, "Error receiving trace packet\n");
529 ret
= write(session
->streams
[metadata_stream_id
].fd
, data
, len
);
530 } while (ret
< 0 && errno
== EINTR
);
545 * Get one index for a stream.
547 int get_next_index(int id
, struct packet_index
*index
)
549 struct lttng_viewer_cmd cmd
;
550 struct lttng_viewer_get_next_index rq
;
551 struct lttng_viewer_index rp
;
554 cmd
.cmd
= htobe32(VIEWER_GET_NEXT_INDEX
);
555 cmd
.data_size
= sizeof(rq
);
558 fprintf(stderr
, " - get next index for stream %" PRIu64
"\n",
559 session
->streams
[id
].id
);
560 rq
.stream_id
= htobe64(session
->streams
[id
].id
);
564 ret
= send(control_sock
, &cmd
, sizeof(cmd
), 0);
565 } while (ret
< 0 && errno
== EINTR
);
567 fprintf(stderr
, "Error sending cmd\n");
571 ret
= send(control_sock
, &rq
, sizeof(rq
), 0);
572 } while (ret
< 0 && errno
== EINTR
);
574 fprintf(stderr
, "Error sending get_next_index request\n");
578 ret
= recv(control_sock
, &rp
, sizeof(rp
), 0);
579 } while (ret
< 0 && errno
== EINTR
);
581 fprintf(stderr
, "Error receiving index response\n");
584 fprintf(stderr
, " - reply : %u ", be32toh(rp
.status
));
586 rp
.flags
= be32toh(rp
.flags
);
588 switch (be32toh(rp
.status
)) {
589 case VIEWER_INDEX_INACTIVE
:
590 fprintf(stderr
, "(INACTIVE)\n");
591 memset(index
, 0, sizeof(struct packet_index
));
592 index
->timestamp_end
= be64toh(rp
.timestamp_end
);
594 case VIEWER_INDEX_OK
:
595 fprintf(stderr
, "(OK), need metadata update : %u\n",
596 rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
);
597 index
->offset
= be64toh(rp
.offset
);
598 index
->packet_size
= be64toh(rp
.packet_size
);
599 index
->content_size
= be64toh(rp
.content_size
);
600 index
->timestamp_begin
= be64toh(rp
.timestamp_begin
);
601 index
->timestamp_end
= be64toh(rp
.timestamp_end
);
602 index
->events_discarded
= be64toh(rp
.events_discarded
);
604 if (rp
.flags
& LTTNG_VIEWER_FLAG_NEW_METADATA
) {
605 fprintf(stderr
, "NEW METADATA NEEDED\n");
606 ret
= get_new_metadata(id
);
612 case VIEWER_INDEX_RETRY
:
613 fprintf(stderr
, "(RETRY)\n");
616 case VIEWER_INDEX_HUP
:
617 fprintf(stderr
, "(HUP)\n");
618 session
->streams
[id
].id
= -1ULL;
619 session
->streams
[id
].fd
= -1;
621 case VIEWER_INDEX_ERR
:
622 fprintf(stderr
, "(ERR)\n");
626 fprintf(stderr
, "SHOULD NOT HAPPEN\n");
635 void ctf_live_packet_seek(struct bt_stream_pos
*stream_pos
, size_t index
,
638 struct ctf_stream_pos
*pos
;
639 struct ctf_file_stream
*file_stream
;
640 struct packet_index packet_index
;
643 pos
= ctf_pos(stream_pos
);
644 file_stream
= container_of(pos
, struct ctf_file_stream
, pos
);
646 fprintf(stderr
, "BT GET_NEXT_INDEX %d\n", pos
->fd
);
647 ret
= get_next_index(pos
->fd
, &packet_index
);
649 fprintf(stderr
, "get_next_index failed\n");
653 pos
->packet_size
= packet_index
.packet_size
;
654 pos
->content_size
= packet_index
.content_size
;
655 pos
->mmap_base_offset
= 0;
657 if (packet_index
.offset
== EOF
) {
663 file_stream
->parent
.cycles_timestamp
= packet_index
.timestamp_end
;
664 file_stream
->parent
.real_timestamp
= ctf_get_real_timestamp(
665 &file_stream
->parent
, packet_index
.timestamp_end
);
667 if (pos
->packet_size
== 0) {
671 fprintf(stderr
, "BT GET_DATA_PACKET\n");
672 ret
= get_data_packet(pos
->fd
, be64toh(packet_index
.offset
),
673 packet_index
.packet_size
/ CHAR_BIT
);
675 fprintf(stderr
, "get_data_packet failed");
679 fprintf(stderr
, "BT MMAP %d\n", pos
->fd
);
680 fprintf(stderr
, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n",
681 packet_index
.packet_size
,
683 packet_index
.content_size
,
684 packet_index
.timestamp_end
,
685 ctf_get_real_timestamp(
686 &file_stream
->parent
, packet_index
.timestamp_end
));
687 if (!pos
->base_mma
) {
688 pos
->base_mma
= zmalloc(sizeof(*pos
->base_mma
));
689 if (!pos
->base_mma
) {
690 fprintf(stderr
, "alloc pos->base_mma\n");
695 mmap_align_set_addr(pos
->base_mma
, session
->streams
[pos
->fd
].mmap_base
);
696 if (pos
->base_mma
== MAP_FAILED
) {
697 perror("Error mmaping");
701 /* update trace_packet_header and stream_packet_context */
702 if (pos
->prot
!= PROT_WRITE
&& file_stream
->parent
.trace_packet_header
) {
703 /* Read packet header */
704 ret
= generic_rw(&pos
->parent
, &file_stream
->parent
.trace_packet_header
->p
);
707 if (pos
->prot
!= PROT_WRITE
&& file_stream
->parent
.stream_packet_context
) {
708 /* Read packet context */
709 ret
= generic_rw(&pos
->parent
, &file_stream
->parent
.stream_packet_context
->p
);
717 int open_trace(struct bt_context
**bt_ctx
)
719 struct bt_mmap_stream
*new_mmap_stream
;
720 struct bt_mmap_stream_list mmap_list
;
721 FILE *metadata_fp
= NULL
;
725 *bt_ctx
= bt_context_create();
726 BT_INIT_LIST_HEAD(&mmap_list
.head
);
728 for (i
= 0; i
< session
->stream_count
; i
++) {
729 int total_metadata
= 0;
731 if (!session
->streams
[i
].metadata_flag
) {
732 new_mmap_stream
= zmalloc(sizeof(struct bt_mmap_stream
));
734 * The FD is unused when we handle manually the
735 * packet seek, so we store here the ID of the
736 * stream in our stream list to be able to use it
739 new_mmap_stream
->fd
= i
;
740 bt_list_add(&new_mmap_stream
->list
, &mmap_list
.head
);
742 /* Get all possible metadata before starting */
744 ret
= get_new_metadata(i
);
746 total_metadata
+= ret
;
748 } while (ret
> 0 || total_metadata
== 0);
749 metadata_fp
= fopen(session
->streams
[i
].path
, "r");
754 fprintf(stderr
, "No metadata stream opened\n");
758 ret
= bt_context_add_trace(*bt_ctx
, NULL
, "ctf",
759 ctf_live_packet_seek
, &mmap_list
, metadata_fp
);
761 fprintf(stderr
, "Error adding trace\n");
766 begin_pos.type = BT_SEEK_BEGIN;
767 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
768 while ((event = bt_ctf_iter_read_event(iter)) != NULL) {
770 ret = sout->parent.event_cb(&sout->parent, event->parent->stream);
772 fprintf(stderr, "[error] Writing event failed.\n");
777 ret = bt_iter_next(bt_ctf_get_iter(iter));
780 } else if (ret == EAGAIN) {
792 int setup_network_live(char *hostname
, int begin
)
797 session
= zmalloc(sizeof(struct live_session
));
802 ret
= connect_viewer(hostname
);
806 fprintf(stderr
, "* Connected\n");
808 fprintf(stderr
, "* Establish connection and version check\n");
809 ret
= establish_connection();
814 fprintf(stderr
, "* List sessions\n");
815 ret
= list_sessions();
817 fprintf(stderr
, "* List error\n");
819 } else if (ret
== 0) {
820 fprintf(stderr
, "* No session to attach to, exiting\n");
827 fprintf(stderr
, "* Attach session %d\n", ret
);
828 ret
= attach_session(session_id
, begin
);
832 } while (session
->stream_count
== 0);
838 free(session
->streams
);
839 fprintf(stderr
, "* Exiting %d\n", ret
);