46a5b4d2d0c317cd80de5be78c527b24c7871452
[lttngtop.git] / src / network-live.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include "lttng-viewer.h"
38 #include "ctf-index.h"
39 #include "network-live.h"
40
41 #include <babeltrace/babeltrace.h>
42 #include <babeltrace/ctf/events.h>
43 #include <babeltrace/ctf/callbacks.h>
44 #include <babeltrace/ctf/iterator.h>
45
46 /* for packet_index */
47 #include <babeltrace/ctf/types.h>
48
49 #include <babeltrace/ctf/metadata.h>
50 #include <babeltrace/ctf-text/types.h>
51 #include <babeltrace/ctf/events-internal.h>
52
53 /*
54 * Memory allocation zeroed
55 */
56 #define zmalloc(x) calloc(1, x)
57 /* FIXME : completely arbitrary */
58 #define mmap_size 524288
59
60 static int control_sock;
61 struct live_session *session;
62
63 struct viewer_stream {
64 uint64_t id;
65 uint64_t ctf_trace_id;
66 void *mmap_base;
67 int fd;
68 int metadata_flag;
69 int first_read;
70 char path[PATH_MAX];
71 };
72
73 struct live_session {
74 struct viewer_stream *streams;
75 uint64_t live_timer_interval;
76 uint64_t stream_count;
77 };
78
79 static
80 int connect_viewer(char *hostname)
81 {
82 struct hostent *host;
83 struct sockaddr_in server_addr;
84 int ret;
85
86 host = gethostbyname(hostname);
87 if (!host) {
88 ret = -1;
89 goto end;
90 }
91
92 if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
93 perror("Socket");
94 ret = -1;
95 goto end;
96 }
97
98 server_addr.sin_family = AF_INET;
99 server_addr.sin_port = htons(5344);
100 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
101 bzero(&(server_addr.sin_zero), 8);
102
103 if (connect(control_sock, (struct sockaddr *) &server_addr,
104 sizeof(struct sockaddr)) == -1) {
105 perror("Connect");
106 ret = -1;
107 goto end;
108 }
109
110 server_addr.sin_family = AF_INET;
111 server_addr.sin_port = htons(5345);
112 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
113 bzero(&(server_addr.sin_zero), 8);
114
115 ret = 0;
116
117 end:
118 return ret;
119 }
120
121 static
122 int establish_connection(void)
123 {
124 struct lttng_viewer_cmd cmd;
125 struct lttng_viewer_connect connect;
126 int ret;
127
128 cmd.cmd = htobe32(VIEWER_CONNECT);
129 cmd.data_size = sizeof(connect);
130 cmd.cmd_version = 0;
131
132 connect.major = htobe32(2);
133 connect.minor = htobe32(4);
134 connect.type = htobe32(VIEWER_CLIENT_COMMAND);
135
136 do {
137 ret = send(control_sock, &cmd, sizeof(cmd), 0);
138 } while (ret < 0 && errno == EINTR);
139 if (ret < 0) {
140 fprintf(stderr, "Error sending cmd\n");
141 goto error;
142 }
143 do {
144 ret = send(control_sock, &connect, sizeof(connect), 0);
145 } while (ret < 0 && errno == EINTR);
146 if (ret < 0) {
147 fprintf(stderr, "Error sending version\n");
148 goto error;
149 }
150
151 do {
152 ret = recv(control_sock, &connect, sizeof(connect), 0);
153 } while (ret < 0 && errno == EINTR);
154 if (ret < 0) {
155 fprintf(stderr, "Error receiving version\n");
156 goto error;
157 }
158 fprintf(stderr, " - Received viewer session ID : %" PRIu64 "\n",
159 be64toh(connect.viewer_session_id));
160 fprintf(stderr, " - Received version : %u.%u\n", be32toh(connect.major),
161 be32toh(connect.minor));
162
163 ret = 0;
164
165 error:
166 return ret;
167 }
168
169 int list_sessions(void)
170 {
171 struct lttng_viewer_cmd cmd;
172 struct lttng_viewer_list_sessions list;
173 struct lttng_viewer_session lsession;
174 int i, ret;
175 int first_session = 0;
176
177 cmd.cmd = htobe32(VIEWER_LIST_SESSIONS);
178 cmd.data_size = 0;
179 cmd.cmd_version = 0;
180
181 do {
182 ret = send(control_sock, &cmd, sizeof(cmd), 0);
183 } while (ret < 0 && errno == EINTR);
184 if (ret < 0) {
185 fprintf(stderr, "Error sending cmd\n");
186 goto error;
187 }
188
189 do {
190 ret = recv(control_sock, &list, sizeof(list), 0);
191 } while (ret < 0 && errno == EINTR);
192 if (ret < 0) {
193 fprintf(stderr, "Error receiving session list\n");
194 goto error;
195 }
196
197 fprintf(stderr, " - %u active session(s)\n", be32toh(list.sessions_count));
198 for (i = 0; i < be32toh(list.sessions_count); i++) {
199 do {
200 ret = recv(control_sock, &lsession, sizeof(lsession), 0);
201 } while (ret < 0 && errno == EINTR);
202 if (ret < 0) {
203 fprintf(stderr, "Error receiving session\n");
204 goto error;
205 }
206 fprintf(stderr, " - %" PRIu64 " : %s on host %s (timer = %u, "
207 "%u client(s) connected)\n",
208 be64toh(lsession.id), lsession.session_name,
209 lsession.hostname, be32toh(lsession.live_timer),
210 be32toh(lsession.clients));
211 if (first_session <= 0) {
212 first_session = be64toh(lsession.id);
213 }
214 }
215
216 /* I know, type mismatch */
217 ret = (int) first_session;
218
219 error:
220 return ret;
221 }
222
223 static
224 int attach_session(int id, int begin)
225 {
226 struct lttng_viewer_cmd cmd;
227 struct lttng_viewer_attach_session_request rq;
228 struct lttng_viewer_attach_session_response rp;
229 struct lttng_viewer_stream stream;
230 int ret, i;
231
232 cmd.cmd = htobe32(VIEWER_ATTACH_SESSION);
233 cmd.data_size = sizeof(rq);
234 cmd.cmd_version = 0;
235
236 rq.session_id = htobe64(id);
237 if (begin) {
238 rq.seek = htobe32(VIEWER_SEEK_BEGINNING);
239 } else {
240 rq.seek = htobe32(VIEWER_SEEK_LAST);
241 }
242
243 do {
244 ret = send(control_sock, &cmd, sizeof(cmd), 0);
245 } while (ret < 0 && errno == EINTR);
246 if (ret < 0) {
247 fprintf(stderr, "Error sending cmd\n");
248 goto error;
249 }
250 do {
251 ret = send(control_sock, &rq, sizeof(rq), 0);
252 } while (ret < 0 && errno == EINTR);
253 if (ret < 0) {
254 fprintf(stderr, "Error sending attach request\n");
255 goto error;
256 }
257
258 do {
259 ret = recv(control_sock, &rp, sizeof(rp), 0);
260 } while (ret < 0 && errno == EINTR);
261 if (ret < 0) {
262 fprintf(stderr, "Error receiving attach response\n");
263 goto error;
264 }
265 fprintf(stderr, " - session attach response : %u\n", be32toh(rp.status));
266 if (be32toh(rp.status) != VIEWER_ATTACH_OK) {
267 ret = 1;
268 goto end;
269 }
270
271 session->stream_count = be32toh(rp.streams_count);
272 fprintf(stderr, " - Waiting for %" PRIu64 " streams\n", session->stream_count);
273 session->streams = zmalloc(session->stream_count *
274 sizeof(struct viewer_stream));
275 if (!session->streams) {
276 ret = -1;
277 goto error;
278 }
279
280 for (i = 0; i < be32toh(rp.streams_count); i++) {
281 do {
282 ret = recv(control_sock, &stream, sizeof(stream), 0);
283 } while (ret < 0 && errno == EINTR);
284 if (ret < 0) {
285 fprintf(stderr, "Error receiving stream\n");
286 goto error;
287 }
288 fprintf(stderr, " - stream %" PRIu64 " : %s/%s\n",
289 be64toh(stream.id), stream.path_name,
290 stream.channel_name);
291 session->streams[i].id = be64toh(stream.id);
292
293 session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id);
294 session->streams[i].first_read = 1;
295 session->streams[i].mmap_base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
296 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
297 if (session->streams[i].mmap_base == MAP_FAILED) {
298 fprintf(stderr, "mmap error\n");
299 ret = -1;
300 goto error;
301 }
302
303 if (be32toh(stream.metadata_flag)) {
304 session->streams[i].metadata_flag = 1;
305 unlink("testlivetrace");
306 mkdir("testlivetrace", S_IRWXU | S_IRWXG);
307 snprintf(session->streams[i].path,
308 sizeof(session->streams[i].path),
309 "testlivetrace/%s",
310 stream.channel_name);
311 ret = open(session->streams[i].path,
312 O_WRONLY | O_CREAT | O_TRUNC,
313 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
314 if (ret < 0) {
315 goto error;
316 }
317 session->streams[i].fd = ret;
318 }
319 }
320 ret = 0;
321
322 end:
323 error:
324 return ret;
325 }
326
327 #if 0
328 /* useful debug */
329 static
330 void dump_packet_index(struct lttng_packet_index *index)
331 {
332 printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n",
333 be64toh(index->offset),
334 be64toh(index->packet_size),
335 be64toh(index->content_size),
336 be64toh(index->timestamp_begin),
337 be64toh(index->timestamp_end),
338 be64toh(index->events_discarded),
339 be64toh(index->stream_id));
340 }
341 #endif
342
343 static
344 int get_data_packet(int id, uint64_t offset,
345 uint64_t len)
346 {
347 struct lttng_viewer_cmd cmd;
348 struct lttng_viewer_get_packet rq;
349 struct lttng_viewer_trace_packet rp;
350 int ret;
351
352 cmd.cmd = htobe32(VIEWER_GET_PACKET);
353 cmd.data_size = sizeof(rq);
354 cmd.cmd_version = 0;
355
356 rq.stream_id = htobe64(session->streams[id].id);
357 /* Already in big endian. */
358 rq.offset = offset;
359 rq.len = htobe32(len);
360 fprintf(stderr, " - get_packet ");
361
362 do {
363 ret = send(control_sock, &cmd, sizeof(cmd), 0);
364 } while (ret < 0 && errno == EINTR);
365 if (ret < 0) {
366 fprintf(stderr, "Error sending cmd\n");
367 goto error;
368 }
369 do {
370 ret = send(control_sock, &rq, sizeof(rq), 0);
371 } while (ret < 0 && errno == EINTR);
372 if (ret < 0) {
373 fprintf(stderr, "Error sending get_data_packet request\n");
374 goto error;
375 }
376 do {
377 ret = recv(control_sock, &rp, sizeof(rp), 0);
378 } while (ret < 0 && errno == EINTR);
379 if (ret < 0) {
380 fprintf(stderr, "Error receiving data response\n");
381 goto error;
382 }
383 rp.flags = be32toh(rp.flags);
384
385 switch (be32toh(rp.status)) {
386 case VIEWER_GET_PACKET_OK:
387 fprintf(stderr, "OK\n");
388 break;
389 case VIEWER_GET_PACKET_RETRY:
390 fprintf(stderr, "RETRY\n");
391 ret = -1;
392 goto end;
393 case VIEWER_GET_PACKET_ERR:
394 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
395 fprintf(stderr, "NEW_METADATA\n");
396 ret = 0;
397 goto end;
398 }
399 fprintf(stderr, "ERR\n");
400 ret = -1;
401 goto end;
402 default:
403 fprintf(stderr, "UNKNOWN\n");
404 ret = -1;
405 goto end;
406 }
407
408 len = be32toh(rp.len);
409 fprintf(stderr, " - writing %" PRIu64" bytes to tracefile\n", len);
410 if (len <= 0) {
411 goto end;
412 }
413
414 if (len > mmap_size) {
415 fprintf(stderr, "mmap_size not big enough\n");
416 ret = -1;
417 goto error;
418 }
419
420 do {
421 ret = recv(control_sock, session->streams[id].mmap_base, len, MSG_WAITALL);
422 } while (ret < 0 && errno == EINTR);
423 if (ret < 0) {
424 fprintf(stderr, "Error receiving trace packet\n");
425 goto error;
426 }
427
428 end:
429 error:
430 return ret;
431 }
432
433 /*
434 * Return number of metadata bytes written or a negative value on error.
435 */
436 static
437 int get_new_metadata(int id)
438 {
439 struct lttng_viewer_cmd cmd;
440 struct lttng_viewer_get_metadata rq;
441 struct lttng_viewer_metadata_packet rp;
442 int ret;
443 uint64_t i;
444 char *data = NULL;
445 uint64_t len = 0;
446 int metadata_stream_id = -1;
447
448 cmd.cmd = htobe32(VIEWER_GET_METADATA);
449 cmd.data_size = sizeof(rq);
450 cmd.cmd_version = 0;
451
452 /* find the metadata stream for this ctf_trace */
453 for (i = 0; i < session->stream_count; i++) {
454 if (session->streams[i].metadata_flag &&
455 session->streams[i].ctf_trace_id ==
456 session->streams[id].ctf_trace_id) {
457 metadata_stream_id = i;
458 break;
459 }
460 }
461 if (metadata_stream_id < 0) {
462 fprintf(stderr, "No metadata stream found\n");
463 ret = -1;
464 goto error;
465 }
466
467 rq.stream_id = htobe64(session->streams[metadata_stream_id].id);
468 fprintf(stderr, " - get_metadata ");
469
470 do {
471 ret = send(control_sock, &cmd, sizeof(cmd), 0);
472 } while (ret < 0 && errno == EINTR);
473 if (ret < 0) {
474 fprintf(stderr, "Error sending cmd\n");
475 goto error;
476 }
477 do {
478 ret = send(control_sock, &rq, sizeof(rq), 0);
479 } while (ret < 0 && errno == EINTR);
480 if (ret < 0) {
481 fprintf(stderr, "Error sending get_metadata request\n");
482 goto error;
483 }
484 do {
485 ret = recv(control_sock, &rp, sizeof(rp), 0);
486 } while (ret < 0 && errno == EINTR);
487 if (ret < 0) {
488 fprintf(stderr, "Error receiving metadata response\n");
489 goto error;
490 }
491 switch (be32toh(rp.status)) {
492 case VIEWER_METADATA_OK:
493 fprintf(stderr, "OK\n");
494 break;
495 case VIEWER_NO_NEW_METADATA:
496 fprintf(stderr, "NO NEW\n");
497 ret = -1;
498 goto end;
499 case VIEWER_METADATA_ERR:
500 fprintf(stderr, "ERR\n");
501 ret = -1;
502 goto end;
503 default:
504 fprintf(stderr, "UNKNOWN\n");
505 ret = -1;
506 goto end;
507 }
508
509 len = be64toh(rp.len);
510 fprintf(stderr, " - writing %" PRIu64" bytes to metadata\n", len);
511 if (len <= 0) {
512 goto end;
513 }
514
515 data = zmalloc(len);
516 if (!data) {
517 perror("relay data zmalloc");
518 goto error;
519 }
520 do {
521 ret = recv(control_sock, data, len, MSG_WAITALL);
522 } while (ret < 0 && errno == EINTR);
523 if (ret < 0) {
524 fprintf(stderr, "Error receiving trace packet\n");
525 free(data);
526 goto error;
527 }
528 do {
529 ret = write(session->streams[metadata_stream_id].fd, data, len);
530 } while (ret < 0 && errno == EINTR);
531 if (ret < 0) {
532 free(data);
533 goto error;
534 }
535 free(data);
536
537 /* FIXME : bad */
538 ret = (int) len;
539 end:
540 error:
541 return ret;
542 }
543
544 /*
545 * Get one index for a stream.
546 */
547 int get_next_index(int id, struct packet_index *index)
548 {
549 struct lttng_viewer_cmd cmd;
550 struct lttng_viewer_get_next_index rq;
551 struct lttng_viewer_index rp;
552 int ret;
553
554 cmd.cmd = htobe32(VIEWER_GET_NEXT_INDEX);
555 cmd.data_size = sizeof(rq);
556 cmd.cmd_version = 0;
557
558 fprintf(stderr, " - get next index for stream %" PRIu64 "\n",
559 session->streams[id].id);
560 rq.stream_id = htobe64(session->streams[id].id);
561
562 retry:
563 do {
564 ret = send(control_sock, &cmd, sizeof(cmd), 0);
565 } while (ret < 0 && errno == EINTR);
566 if (ret < 0) {
567 fprintf(stderr, "Error sending cmd\n");
568 goto error;
569 }
570 do {
571 ret = send(control_sock, &rq, sizeof(rq), 0);
572 } while (ret < 0 && errno == EINTR);
573 if (ret < 0) {
574 fprintf(stderr, "Error sending get_next_index request\n");
575 goto error;
576 }
577 do {
578 ret = recv(control_sock, &rp, sizeof(rp), 0);
579 } while (ret < 0 && errno == EINTR);
580 if (ret < 0) {
581 fprintf(stderr, "Error receiving index response\n");
582 goto error;
583 }
584 fprintf(stderr, " - reply : %u ", be32toh(rp.status));
585
586 rp.flags = be32toh(rp.flags);
587
588 switch (be32toh(rp.status)) {
589 case VIEWER_INDEX_INACTIVE:
590 fprintf(stderr, "(INACTIVE)\n");
591 memset(index, 0, sizeof(struct packet_index));
592 index->timestamp_end = be64toh(rp.timestamp_end);
593 break;
594 case VIEWER_INDEX_OK:
595 fprintf(stderr, "(OK), need metadata update : %u\n",
596 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
597 index->offset = be64toh(rp.offset);
598 index->packet_size = be64toh(rp.packet_size);
599 index->content_size = be64toh(rp.content_size);
600 index->timestamp_begin = be64toh(rp.timestamp_begin);
601 index->timestamp_end = be64toh(rp.timestamp_end);
602 index->events_discarded = be64toh(rp.events_discarded);
603
604 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
605 fprintf(stderr, "NEW METADATA NEEDED\n");
606 ret = get_new_metadata(id);
607 if (ret < 0) {
608 goto error;
609 }
610 }
611 break;
612 case VIEWER_INDEX_RETRY:
613 fprintf(stderr, "(RETRY)\n");
614 sleep(1);
615 goto retry;
616 case VIEWER_INDEX_HUP:
617 fprintf(stderr, "(HUP)\n");
618 session->streams[id].id = -1ULL;
619 session->streams[id].fd = -1;
620 break;
621 case VIEWER_INDEX_ERR:
622 fprintf(stderr, "(ERR)\n");
623 ret = -1;
624 goto error;
625 default:
626 fprintf(stderr, "SHOULD NOT HAPPEN\n");
627 ret = -1;
628 goto error;
629 }
630
631 error:
632 return ret;
633 }
634
635 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
636 int whence)
637 {
638 struct ctf_stream_pos *pos;
639 struct ctf_file_stream *file_stream;
640 struct packet_index packet_index;
641 int ret;
642
643 pos = ctf_pos(stream_pos);
644 file_stream = container_of(pos, struct ctf_file_stream, pos);
645
646 fprintf(stderr, "BT GET_NEXT_INDEX %d\n", pos->fd);
647 ret = get_next_index(pos->fd, &packet_index);
648 if (ret < 0) {
649 fprintf(stderr, "get_next_index failed\n");
650 return;
651 }
652
653 pos->packet_size = packet_index.packet_size;
654 pos->content_size = packet_index.content_size;
655 pos->mmap_base_offset = 0;
656 pos->offset = 0;
657 if (packet_index.offset == EOF) {
658 pos->offset = EOF;
659 } else {
660 pos->offset = 0;
661 }
662
663 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
664 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
665 &file_stream->parent, packet_index.timestamp_end);
666
667 if (pos->packet_size == 0) {
668 goto end;
669 }
670
671 fprintf(stderr, "BT GET_DATA_PACKET\n");
672 ret = get_data_packet(pos->fd, be64toh(packet_index.offset),
673 packet_index.packet_size / CHAR_BIT);
674 if (ret < 0) {
675 fprintf(stderr, "get_data_packet failed");
676 return;
677 }
678
679 fprintf(stderr, "BT MMAP %d\n", pos->fd);
680 fprintf(stderr, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n",
681 packet_index.packet_size,
682 packet_index.offset,
683 packet_index.content_size,
684 packet_index.timestamp_end,
685 ctf_get_real_timestamp(
686 &file_stream->parent, packet_index.timestamp_end));
687 if (!pos->base_mma) {
688 pos->base_mma = zmalloc(sizeof(*pos->base_mma));
689 if (!pos->base_mma) {
690 fprintf(stderr, "alloc pos->base_mma\n");
691 return;
692 }
693 }
694
695 mmap_align_set_addr(pos->base_mma, session->streams[pos->fd].mmap_base);
696 if (pos->base_mma == MAP_FAILED) {
697 perror("Error mmaping");
698 return;
699 }
700
701 /* update trace_packet_header and stream_packet_context */
702 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
703 /* Read packet header */
704 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
705 assert(!ret);
706 }
707 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
708 /* Read packet context */
709 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
710 assert(!ret);
711 }
712
713 end:
714 return;
715 }
716
717 int open_trace(struct bt_context **bt_ctx)
718 {
719 struct bt_mmap_stream *new_mmap_stream;
720 struct bt_mmap_stream_list mmap_list;
721 FILE *metadata_fp = NULL;
722 int i;
723 int ret = 0;
724
725 *bt_ctx = bt_context_create();
726 BT_INIT_LIST_HEAD(&mmap_list.head);
727
728 for (i = 0; i < session->stream_count; i++) {
729 int total_metadata = 0;
730
731 if (!session->streams[i].metadata_flag) {
732 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
733 /*
734 * The FD is unused when we handle manually the
735 * packet seek, so we store here the ID of the
736 * stream in our stream list to be able to use it
737 * later.
738 */
739 new_mmap_stream->fd = i;
740 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
741 } else {
742 /* Get all possible metadata before starting */
743 do {
744 ret = get_new_metadata(i);
745 if (ret > 0) {
746 total_metadata += ret;
747 }
748 } while (ret > 0 || total_metadata == 0);
749 metadata_fp = fopen(session->streams[i].path, "r");
750 }
751 }
752
753 if (!metadata_fp) {
754 fprintf(stderr, "No metadata stream opened\n");
755 goto end;
756 }
757
758 ret = bt_context_add_trace(*bt_ctx, NULL, "ctf",
759 ctf_live_packet_seek, &mmap_list, metadata_fp);
760 if (ret < 0) {
761 fprintf(stderr, "Error adding trace\n");
762 goto end;
763 }
764
765 /*
766 begin_pos.type = BT_SEEK_BEGIN;
767 iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL);
768 while ((event = bt_ctf_iter_read_event(iter)) != NULL) {
769 if (!skip) {
770 ret = sout->parent.event_cb(&sout->parent, event->parent->stream);
771 if (ret) {
772 fprintf(stderr, "[error] Writing event failed.\n");
773 goto end;
774 }
775 }
776
777 ret = bt_iter_next(bt_ctf_get_iter(iter));
778 if (ret < 0) {
779 goto end;
780 } else if (ret == EAGAIN) {
781 skip = 1;
782 continue;
783 }
784 skip = 0;
785 }
786 */
787
788 end:
789 return ret;
790 }
791
792 int setup_network_live(char *hostname, int begin)
793 {
794 int ret;
795 int session_id;
796
797 session = zmalloc(sizeof(struct live_session));
798 if (!session) {
799 goto error;
800 }
801
802 ret = connect_viewer(hostname);
803 if (ret < 0) {
804 goto error;
805 }
806 fprintf(stderr, "* Connected\n");
807
808 fprintf(stderr, "* Establish connection and version check\n");
809 ret = establish_connection();
810 if (ret < 0) {
811 goto error;
812 }
813
814 fprintf(stderr, "* List sessions\n");
815 ret = list_sessions();
816 if (ret < 0) {
817 fprintf(stderr, "* List error\n");
818 goto error;
819 } else if (ret == 0) {
820 fprintf(stderr, "* No session to attach to, exiting\n");
821 ret = 0;
822 goto end;
823 }
824 session_id = ret;
825
826 do {
827 fprintf(stderr, "* Attach session %d\n", ret);
828 ret = attach_session(session_id, begin);
829 if (ret < 0) {
830 goto error;
831 }
832 } while (session->stream_count == 0);
833
834 end:
835 return 0;
836
837 error:
838 free(session->streams);
839 fprintf(stderr, "* Exiting %d\n", ret);
840 return ret;
841 }
This page took 0.043819 seconds and 3 git commands to generate.