1fb761ef828878ef9279343b7b214ee9a6bef42d
[lttngtop.git] / src / network-live.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include "lttng-viewer.h"
38 #include "lttng-index.h"
39 #include "network-live.h"
40
41 #include <babeltrace/babeltrace.h>
42 #include <babeltrace/ctf/events.h>
43 #include <babeltrace/ctf/callbacks.h>
44 #include <babeltrace/ctf/iterator.h>
45
46 /* for packet_index */
47 #include <babeltrace/ctf/types.h>
48
49 #include <babeltrace/ctf/metadata.h>
50 #include <babeltrace/ctf-text/types.h>
51 #include <babeltrace/ctf/events-internal.h>
52
53 /*
54 * Memory allocation zeroed
55 */
56 #define zmalloc(x) calloc(1, x)
57 /* FIXME : completely arbitrary */
58 #define mmap_size 524288
59
60 static int control_sock;
61 struct live_session *session;
62
63 struct viewer_stream {
64 uint64_t id;
65 uint64_t ctf_trace_id;
66 void *mmap_base;
67 int fd;
68 int metadata_flag;
69 int first_read;
70 char path[PATH_MAX];
71 };
72
73 struct live_session {
74 struct viewer_stream *streams;
75 uint64_t live_timer_interval;
76 uint64_t stream_count;
77 };
78
79 static
80 int connect_viewer(char *hostname)
81 {
82 struct hostent *host;
83 struct sockaddr_in server_addr;
84 int ret;
85
86 host = gethostbyname(hostname);
87 if (!host) {
88 ret = -1;
89 goto end;
90 }
91
92 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
93 perror("Socket");
94 ret = -1;
95 goto end;
96 }
97
98 server_addr.sin_family = AF_INET;
99 server_addr.sin_port = htons(5344);
100 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
101 bzero(&(server_addr.sin_zero), 8);
102
103 if (connect(control_sock, (struct sockaddr *) &server_addr,
104 sizeof(struct sockaddr)) == -1) {
105 perror("Connect");
106 ret = -1;
107 goto end;
108 }
109
110 server_addr.sin_family = AF_INET;
111 server_addr.sin_port = htons(5345);
112 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
113 bzero(&(server_addr.sin_zero), 8);
114
115 ret = 0;
116
117 end:
118 return ret;
119 }
120
121 static
122 int establish_connection(void)
123 {
124 struct lttng_viewer_cmd cmd;
125 struct lttng_viewer_connect connect;
126 int ret;
127
128 cmd.cmd = htobe32(VIEWER_CONNECT);
129 cmd.data_size = sizeof(connect);
130 cmd.cmd_version = 0;
131
132 connect.major = htobe32(2);
133 connect.minor = htobe32(4);
134 connect.type = htobe32(VIEWER_CLIENT_COMMAND);
135
136 do {
137 ret = send(control_sock, &cmd, sizeof(cmd), 0);
138 } while (ret < 0 && errno == EINTR);
139 if (ret < 0) {
140 fprintf(stderr, "Error sending cmd\n");
141 goto error;
142 }
143 do {
144 ret = send(control_sock, &connect, sizeof(connect), 0);
145 } while (ret < 0 && errno == EINTR);
146 if (ret < 0) {
147 fprintf(stderr, "Error sending version\n");
148 goto error;
149 }
150
151 do {
152 ret = recv(control_sock, &connect, sizeof(connect), 0);
153 } while (ret < 0 && errno == EINTR);
154 if (ret < 0) {
155 fprintf(stderr, "Error receiving version\n");
156 goto error;
157 }
158 fprintf(stderr, " - Received viewer session ID : %" PRIu64 "\n",
159 be64toh(connect.viewer_session_id));
160 fprintf(stderr, " - Received version : %u.%u\n", be32toh(connect.major),
161 be32toh(connect.minor));
162
163 ret = 0;
164
165 error:
166 return ret;
167 }
168
169 int list_sessions(void)
170 {
171 struct lttng_viewer_cmd cmd;
172 struct lttng_viewer_list_sessions list;
173 struct lttng_viewer_session lsession;
174 int i, ret;
175 int first_session = 0;
176
177 cmd.cmd = htobe32(VIEWER_LIST_SESSIONS);
178 cmd.data_size = 0;
179 cmd.cmd_version = 0;
180
181 do {
182 ret = send(control_sock, &cmd, sizeof(cmd), 0);
183 } while (ret < 0 && errno == EINTR);
184 if (ret < 0) {
185 fprintf(stderr, "Error sending cmd\n");
186 goto error;
187 }
188
189 do {
190 ret = recv(control_sock, &list, sizeof(list), 0);
191 } while (ret < 0 && errno == EINTR);
192 if (ret < 0) {
193 fprintf(stderr, "Error receiving session list\n");
194 goto error;
195 }
196
197 fprintf(stderr, " - %u active session(s)\n", be32toh(list.sessions_count));
198 for (i = 0; i < be32toh(list.sessions_count); i++) {
199 do {
200 ret = recv(control_sock, &lsession, sizeof(lsession), 0);
201 } while (ret < 0 && errno == EINTR);
202 if (ret < 0) {
203 fprintf(stderr, "Error receiving session\n");
204 goto error;
205 }
206 fprintf(stderr, " - %" PRIu64 " : %s on host %s (timer = %u, "
207 "%u client(s) connected)\n",
208 be64toh(lsession.id), lsession.session_name,
209 lsession.hostname, be32toh(lsession.live_timer),
210 be32toh(lsession.clients));
211 if (first_session <= 0) {
212 first_session = be64toh(lsession.id);
213 }
214 }
215
216 /* I know, type mismatch */
217 ret = (int) first_session;
218
219 error:
220 return ret;
221 }
222
223 int write_index_header(int fd)
224 {
225 struct lttng_packet_index_file_hdr hdr;
226 int ret;
227
228 memcpy(hdr.magic, INDEX_MAGIC, sizeof(hdr.magic));
229 hdr.index_major = htobe32(INDEX_MAJOR);
230 hdr.index_minor = htobe32(INDEX_MINOR);
231
232 do {
233 ret = write(fd, &hdr, sizeof(hdr));
234 } while (ret < 0 && errno == EINTR);
235 if (ret < 0) {
236 perror("write index header");
237 goto error;
238 }
239
240 error:
241 return ret;
242 }
243
244 static
245 int attach_session(int id, int begin)
246 {
247 struct lttng_viewer_cmd cmd;
248 struct lttng_viewer_attach_session_request rq;
249 struct lttng_viewer_attach_session_response rp;
250 struct lttng_viewer_stream stream;
251 int ret, i;
252
253 cmd.cmd = htobe32(VIEWER_ATTACH_SESSION);
254 cmd.data_size = sizeof(rq);
255 cmd.cmd_version = 0;
256
257 rq.session_id = htobe64(id);
258 if (begin) {
259 rq.seek = htobe32(VIEWER_SEEK_BEGINNING);
260 } else {
261 rq.seek = htobe32(VIEWER_SEEK_LAST);
262 }
263
264 do {
265 ret = send(control_sock, &cmd, sizeof(cmd), 0);
266 } while (ret < 0 && errno == EINTR);
267 if (ret < 0) {
268 fprintf(stderr, "Error sending cmd\n");
269 goto error;
270 }
271 do {
272 ret = send(control_sock, &rq, sizeof(rq), 0);
273 } while (ret < 0 && errno == EINTR);
274 if (ret < 0) {
275 fprintf(stderr, "Error sending attach request\n");
276 goto error;
277 }
278
279 do {
280 ret = recv(control_sock, &rp, sizeof(rp), 0);
281 } while (ret < 0 && errno == EINTR);
282 if (ret < 0) {
283 fprintf(stderr, "Error receiving attach response\n");
284 goto error;
285 }
286 fprintf(stderr, " - session attach response : %u\n", be32toh(rp.status));
287 if (be32toh(rp.status) != VIEWER_ATTACH_OK) {
288 ret = 1;
289 goto end;
290 }
291
292 session->stream_count = be32toh(rp.streams_count);
293 fprintf(stderr, " - Waiting for %" PRIu64 " streams\n", session->stream_count);
294 session->streams = zmalloc(session->stream_count *
295 sizeof(struct viewer_stream));
296 if (!session->streams) {
297 ret = -1;
298 goto error;
299 }
300
301 for (i = 0; i < be32toh(rp.streams_count); i++) {
302 do {
303 ret = recv(control_sock, &stream, sizeof(stream), 0);
304 } while (ret < 0 && errno == EINTR);
305 if (ret < 0) {
306 fprintf(stderr, "Error receiving stream\n");
307 goto error;
308 }
309 fprintf(stderr, " - stream %" PRIu64 " : %s/%s\n",
310 be64toh(stream.id), stream.path_name,
311 stream.channel_name);
312 session->streams[i].id = be64toh(stream.id);
313
314 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
315 session->streams[i].first_read = 1;
316 session->streams[i].mmap_base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
317 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
318 if (session->streams[i].mmap_base == MAP_FAILED) {
319 fprintf(stderr, "mmap error\n");
320 ret = -1;
321 goto error;
322 }
323
324 if (be32toh(stream.metadata_flag)) {
325 session->streams[i].metadata_flag = 1;
326 unlink("testlivetrace");
327 mkdir("testlivetrace", S_IRWXU | S_IRWXG);
328 snprintf(session->streams[i].path,
329 sizeof(session->streams[i].path),
330 "testlivetrace/%s",
331 stream.channel_name);
332 ret = open(session->streams[i].path,
333 O_WRONLY | O_CREAT | O_TRUNC,
334 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
335 if (ret < 0) {
336 goto error;
337 }
338 session->streams[i].fd = ret;
339 }
340 }
341 ret = 0;
342
343 end:
344 error:
345 return ret;
346 }
347
348 #if 0
349 /* useful debug */
350 static
351 void dump_packet_index(struct lttng_packet_index *index)
352 {
353 printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n",
354 be64toh(index->offset),
355 be64toh(index->packet_size),
356 be64toh(index->content_size),
357 be64toh(index->timestamp_begin),
358 be64toh(index->timestamp_end),
359 be64toh(index->events_discarded),
360 be64toh(index->stream_id));
361 }
362 #endif
363
364 static
365 int get_data_packet(int id, uint64_t offset,
366 uint64_t len)
367 {
368 struct lttng_viewer_cmd cmd;
369 struct lttng_viewer_get_packet rq;
370 struct lttng_viewer_trace_packet rp;
371 int ret;
372
373 cmd.cmd = htobe32(VIEWER_GET_PACKET);
374 cmd.data_size = sizeof(rq);
375 cmd.cmd_version = 0;
376
377 rq.stream_id = htobe64(session->streams[id].id);
378 /* Already in big endian. */
379 rq.offset = offset;
380 rq.len = htobe32(len);
381 fprintf(stderr, " - get_packet ");
382
383 do {
384 ret = send(control_sock, &cmd, sizeof(cmd), 0);
385 } while (ret < 0 && errno == EINTR);
386 if (ret < 0) {
387 fprintf(stderr, "Error sending cmd\n");
388 goto error;
389 }
390 do {
391 ret = send(control_sock, &rq, sizeof(rq), 0);
392 } while (ret < 0 && errno == EINTR);
393 if (ret < 0) {
394 fprintf(stderr, "Error sending get_data_packet request\n");
395 goto error;
396 }
397 do {
398 ret = recv(control_sock, &rp, sizeof(rp), 0);
399 } while (ret < 0 && errno == EINTR);
400 if (ret < 0) {
401 fprintf(stderr, "Error receiving data response\n");
402 goto error;
403 }
404 rp.flags = be32toh(rp.flags);
405
406 switch (be32toh(rp.status)) {
407 case VIEWER_GET_PACKET_OK:
408 fprintf(stderr, "OK\n");
409 break;
410 case VIEWER_GET_PACKET_RETRY:
411 fprintf(stderr, "RETRY\n");
412 ret = -1;
413 goto end;
414 case VIEWER_GET_PACKET_ERR:
415 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
416 fprintf(stderr, "NEW_METADATA\n");
417 ret = 0;
418 goto end;
419 }
420 fprintf(stderr, "ERR\n");
421 ret = -1;
422 goto end;
423 default:
424 fprintf(stderr, "UNKNOWN\n");
425 ret = -1;
426 goto end;
427 }
428
429 len = be32toh(rp.len);
430 fprintf(stderr, " - writing %" PRIu64" bytes to tracefile\n", len);
431 if (len <= 0) {
432 goto end;
433 }
434
435 if (len > mmap_size) {
436 fprintf(stderr, "mmap_size not big enough\n");
437 ret = -1;
438 goto error;
439 }
440
441 do {
442 ret = recv(control_sock, session->streams[id].mmap_base, len, MSG_WAITALL);
443 } while (ret < 0 && errno == EINTR);
444 if (ret < 0) {
445 fprintf(stderr, "Error receiving trace packet\n");
446 goto error;
447 }
448
449 end:
450 error:
451 return ret;
452 }
453
454 /*
455 * Return number of metadata bytes written or a negative value on error.
456 */
457 static
458 int get_new_metadata(int id)
459 {
460 struct lttng_viewer_cmd cmd;
461 struct lttng_viewer_get_metadata rq;
462 struct lttng_viewer_metadata_packet rp;
463 int ret;
464 uint64_t i;
465 char *data = NULL;
466 uint64_t len = 0;
467 int metadata_stream_id = -1;
468
469 cmd.cmd = htobe32(VIEWER_GET_METADATA);
470 cmd.data_size = sizeof(rq);
471 cmd.cmd_version = 0;
472
473 /* find the metadata stream for this ctf_trace */
474 for (i = 0; i < session->stream_count; i++) {
475 if (session->streams[i].metadata_flag &&
476 session->streams[i].ctf_trace_id ==
477 session->streams[id].ctf_trace_id) {
478 metadata_stream_id = i;
479 break;
480 }
481 }
482 if (metadata_stream_id < 0) {
483 fprintf(stderr, "No metadata stream found\n");
484 ret = -1;
485 goto error;
486 }
487
488 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
489 fprintf(stderr, " - get_metadata ");
490
491 do {
492 ret = send(control_sock, &cmd, sizeof(cmd), 0);
493 } while (ret < 0 && errno == EINTR);
494 if (ret < 0) {
495 fprintf(stderr, "Error sending cmd\n");
496 goto error;
497 }
498 do {
499 ret = send(control_sock, &rq, sizeof(rq), 0);
500 } while (ret < 0 && errno == EINTR);
501 if (ret < 0) {
502 fprintf(stderr, "Error sending get_metadata request\n");
503 goto error;
504 }
505 do {
506 ret = recv(control_sock, &rp, sizeof(rp), 0);
507 } while (ret < 0 && errno == EINTR);
508 if (ret < 0) {
509 fprintf(stderr, "Error receiving metadata response\n");
510 goto error;
511 }
512 switch (be32toh(rp.status)) {
513 case VIEWER_METADATA_OK:
514 fprintf(stderr, "OK\n");
515 break;
516 case VIEWER_NO_NEW_METADATA:
517 fprintf(stderr, "NO NEW\n");
518 ret = -1;
519 goto end;
520 case VIEWER_METADATA_ERR:
521 fprintf(stderr, "ERR\n");
522 ret = -1;
523 goto end;
524 default:
525 fprintf(stderr, "UNKNOWN\n");
526 ret = -1;
527 goto end;
528 }
529
530 len = be64toh(rp.len);
531 fprintf(stderr, " - writing %" PRIu64" bytes to metadata\n", len);
532 if (len <= 0) {
533 goto end;
534 }
535
536 data = zmalloc(len);
537 if (!data) {
538 perror("relay data zmalloc");
539 goto error;
540 }
541 do {
542 ret = recv(control_sock, data, len, MSG_WAITALL);
543 } while (ret < 0 && errno == EINTR);
544 if (ret < 0) {
545 fprintf(stderr, "Error receiving trace packet\n");
546 free(data);
547 goto error;
548 }
549 do {
550 ret = write(session->streams[metadata_stream_id].fd, data, len);
551 } while (ret < 0 && errno == EINTR);
552 if (ret < 0) {
553 free(data);
554 goto error;
555 }
556 free(data);
557
558 /* FIXME : bad */
559 ret = (int) len;
560 end:
561 error:
562 return ret;
563 }
564
565 /*
566 * Get one index for a stream.
567 */
568 int get_next_index(int id, struct packet_index *index)
569 {
570 struct lttng_viewer_cmd cmd;
571 struct lttng_viewer_get_next_index rq;
572 struct lttng_viewer_index rp;
573 int ret;
574
575 cmd.cmd = htobe32(VIEWER_GET_NEXT_INDEX);
576 cmd.data_size = sizeof(rq);
577 cmd.cmd_version = 0;
578
579 fprintf(stderr, " - get next index for stream %" PRIu64 "\n",
580 session->streams[id].id);
581 rq.stream_id = htobe64(session->streams[id].id);
582
583 retry:
584 do {
585 ret = send(control_sock, &cmd, sizeof(cmd), 0);
586 } while (ret < 0 && errno == EINTR);
587 if (ret < 0) {
588 fprintf(stderr, "Error sending cmd\n");
589 goto error;
590 }
591 do {
592 ret = send(control_sock, &rq, sizeof(rq), 0);
593 } while (ret < 0 && errno == EINTR);
594 if (ret < 0) {
595 fprintf(stderr, "Error sending get_next_index request\n");
596 goto error;
597 }
598 do {
599 ret = recv(control_sock, &rp, sizeof(rp), 0);
600 } while (ret < 0 && errno == EINTR);
601 if (ret < 0) {
602 fprintf(stderr, "Error receiving index response\n");
603 goto error;
604 }
605 fprintf(stderr, " - reply : %u ", be32toh(rp.status));
606
607 rp.flags = be32toh(rp.flags);
608
609 switch (be32toh(rp.status)) {
610 case VIEWER_INDEX_INACTIVE:
611 fprintf(stderr, "(INACTIVE)\n");
612 memset(index, 0, sizeof(struct packet_index));
613 index->timestamp_end = be64toh(rp.timestamp_end);
614 break;
615 case VIEWER_INDEX_OK:
616 fprintf(stderr, "(OK), need metadata update : %u\n",
617 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
618 index->offset = be64toh(rp.offset);
619 index->packet_size = be64toh(rp.packet_size);
620 index->content_size = be64toh(rp.content_size);
621 index->timestamp_begin = be64toh(rp.timestamp_begin);
622 index->timestamp_end = be64toh(rp.timestamp_end);
623 index->events_discarded = be64toh(rp.events_discarded);
624
625 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
626 fprintf(stderr, "NEW METADATA NEEDED\n");
627 ret = get_new_metadata(id);
628 if (ret < 0) {
629 goto error;
630 }
631 }
632 break;
633 case VIEWER_INDEX_RETRY:
634 fprintf(stderr, "(RETRY)\n");
635 sleep(1);
636 goto retry;
637 case VIEWER_INDEX_HUP:
638 fprintf(stderr, "(HUP)\n");
639 session->streams[id].id = -1ULL;
640 session->streams[id].fd = -1;
641 break;
642 case VIEWER_INDEX_ERR:
643 fprintf(stderr, "(ERR)\n");
644 ret = -1;
645 goto error;
646 default:
647 fprintf(stderr, "SHOULD NOT HAPPEN\n");
648 ret = -1;
649 goto error;
650 }
651
652 error:
653 return ret;
654 }
655
656 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
657 int whence)
658 {
659 struct ctf_stream_pos *pos;
660 struct ctf_file_stream *file_stream;
661 struct packet_index packet_index;
662 int ret;
663
664 pos = ctf_pos(stream_pos);
665 file_stream = container_of(pos, struct ctf_file_stream, pos);
666
667 fprintf(stderr, "BT GET_NEXT_INDEX %d\n", pos->fd);
668 ret = get_next_index(pos->fd, &packet_index);
669 if (ret < 0) {
670 fprintf(stderr, "get_next_index failed\n");
671 return;
672 }
673
674 pos->packet_size = packet_index.packet_size;
675 pos->content_size = packet_index.content_size;
676 pos->mmap_base_offset = 0;
677 pos->offset = 0;
678
679 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
680 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
681 &file_stream->parent, packet_index.timestamp_end);
682
683 if (pos->packet_size == 0) {
684 goto end;
685 }
686
687 fprintf(stderr, "BT GET_DATA_PACKET\n");
688 ret = get_data_packet(pos->fd, be64toh(packet_index.offset),
689 packet_index.packet_size / CHAR_BIT);
690 if (ret < 0) {
691 fprintf(stderr, "get_data_packet failed");
692 return;
693 }
694
695 fprintf(stderr, "BT MMAP %d\n", pos->fd);
696 fprintf(stderr, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n",
697 packet_index.packet_size,
698 packet_index.offset,
699 packet_index.content_size,
700 packet_index.timestamp_end,
701 ctf_get_real_timestamp(
702 &file_stream->parent, packet_index.timestamp_end));
703 if (!pos->base_mma) {
704 pos->base_mma = zmalloc(sizeof(*pos->base_mma));
705 if (!pos->base_mma) {
706 fprintf(stderr, "alloc pos->base_mma\n");
707 return;
708 }
709 }
710
711 mmap_align_set_addr(pos->base_mma, session->streams[pos->fd].mmap_base);
712 if (pos->base_mma == MAP_FAILED) {
713 perror("Error mmaping");
714 return;
715 }
716
717 /* update trace_packet_header and stream_packet_context */
718 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
719 /* Read packet header */
720 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
721 assert(!ret);
722 }
723 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
724 /* Read packet context */
725 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
726 assert(!ret);
727 }
728
729 end:
730 return;
731 }
732
733 int open_trace(struct bt_context **bt_ctx)
734 {
735 struct bt_mmap_stream *new_mmap_stream;
736 struct bt_mmap_stream_list mmap_list;
737 FILE *metadata_fp = NULL;
738 int i;
739 int ret = 0;
740
741 *bt_ctx = bt_context_create();
742 BT_INIT_LIST_HEAD(&mmap_list.head);
743
744 for (i = 0; i < session->stream_count; i++) {
745 int total_metadata = 0;
746
747 if (!session->streams[i].metadata_flag) {
748 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
749 /*
750 * The FD is unused when we handle manually the
751 * packet seek, so we store here the ID of the
752 * stream in our stream list to be able to use it
753 * later.
754 */
755 new_mmap_stream->fd = i;
756 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
757 } else {
758 /* Get all possible metadata before starting */
759 do {
760 ret = get_new_metadata(i);
761 if (ret > 0) {
762 total_metadata += ret;
763 }
764 } while (ret > 0 || total_metadata == 0);
765 metadata_fp = fopen(session->streams[i].path, "r");
766 }
767 }
768
769 if (!metadata_fp) {
770 fprintf(stderr, "No metadata stream opened\n");
771 goto end;
772 }
773
774 ret = bt_context_add_trace(*bt_ctx, NULL, "ctf",
775 ctf_live_packet_seek, &mmap_list, metadata_fp);
776 if (ret < 0) {
777 fprintf(stderr, "Error adding trace\n");
778 goto end;
779 }
780
781 /*
782 begin_pos.type = BT_SEEK_BEGIN;
783 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
784 while ((event = bt_ctf_iter_read_event(iter)) != NULL) {
785 if (!skip) {
786 ret = sout->parent.event_cb(&sout->parent, event->parent->stream);
787 if (ret) {
788 fprintf(stderr, "[error] Writing event failed.\n");
789 goto end;
790 }
791 }
792
793 ret = bt_iter_next(bt_ctf_get_iter(iter));
794 if (ret < 0) {
795 goto end;
796 } else if (ret == EAGAIN) {
797 skip = 1;
798 continue;
799 }
800 skip = 0;
801 }
802 */
803
804 end:
805 return ret;
806 }
807
808 int setup_network_live(char *hostname, int begin)
809 {
810 int ret;
811 int session_id;
812
813 session = zmalloc(sizeof(struct live_session));
814 if (!session) {
815 goto error;
816 }
817
818 ret = connect_viewer(hostname);
819 if (ret < 0) {
820 goto error;
821 }
822 fprintf(stderr, "* Connected\n");
823
824 fprintf(stderr, "* Establish connection and version check\n");
825 ret = establish_connection();
826 if (ret < 0) {
827 goto error;
828 }
829
830 fprintf(stderr, "* List sessions\n");
831 ret = list_sessions();
832 if (ret < 0) {
833 fprintf(stderr, "* List error\n");
834 goto error;
835 } else if (ret == 0) {
836 fprintf(stderr, "* No session to attach to, exiting\n");
837 ret = 0;
838 goto end;
839 }
840 session_id = ret;
841
842 do {
843 fprintf(stderr, "* Attach session %d\n", ret);
844 ret = attach_session(session_id, begin);
845 if (ret < 0) {
846 goto error;
847 }
848 } while (session->stream_count == 0);
849
850 end:
851 return 0;
852
853 error:
854 free(session->streams);
855 fprintf(stderr, "* Exiting %d\n", ret);
856 return ret;
857 }
This page took 0.044161 seconds and 3 git commands to generate.