076c4391d55c64b0ca3d01fd9642dba80f8249f4
[lttngtop.git] / src / liblttng-live.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <netinet/in.h>
27 #include <netdb.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <fcntl.h>
35 #include <sys/mman.h>
36
37 #include <babeltrace/ctf/ctf-index.h>
38
39 #include <babeltrace/babeltrace.h>
40 #include <babeltrace/ctf/events.h>
41 #include <babeltrace/ctf/callbacks.h>
42 #include <babeltrace/ctf/iterator.h>
43
44 /* for packet_index */
45 #include <babeltrace/ctf/types.h>
46
47 #include <babeltrace/ctf/metadata.h>
48 #include <babeltrace/ctf-text/types.h>
49 #include <babeltrace/ctf/events-internal.h>
50 /*
51 #include <formats/ctf/events-private.h>
52 replaced with
53 */
54 #include "network-live.h"
55
56 #include "liblttng-live.h"
57 #include "lttng-viewer-abi.h"
58 #include "lttngtop.h"
59
60 /*
61 * Memory allocation zeroed
62 */
63 #define zmalloc(x) calloc(1, x)
64
65 #ifndef max_t
66 #define max_t(type, a, b) \
67 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
68 #endif
69
70 int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
71 int port)
72 {
73 struct hostent *host;
74 struct sockaddr_in server_addr;
75 int ret;
76
77 host = gethostbyname(hostname);
78 if (!host) {
79 ret = -1;
80 goto end;
81 }
82
83 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
84 perror("Socket");
85 ret = -1;
86 goto end;
87 }
88
89 server_addr.sin_family = AF_INET;
90 server_addr.sin_port = htons(port);
91 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
92 bzero(&(server_addr.sin_zero), 8);
93
94 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
95 sizeof(struct sockaddr)) == -1) {
96 perror("Connect");
97 ret = -1;
98 goto end;
99 }
100
101 ret = 0;
102
103 end:
104 return ret;
105 }
106
107 int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
108 {
109 struct lttng_viewer_cmd cmd;
110 struct lttng_viewer_connect connect;
111 int ret;
112 ssize_t ret_len;
113
114 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
115 cmd.data_size = sizeof(connect);
116 cmd.cmd_version = 0;
117
118 connect.viewer_session_id = -1ULL; /* will be set on recv */
119 connect.major = htobe32(LTTNG_LIVE_MAJOR);
120 connect.minor = htobe32(LTTNG_LIVE_MINOR);
121 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
122
123 do {
124 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
125 } while (ret_len < 0 && errno == EINTR);
126 if (ret_len < 0) {
127 fprintf(stderr, "[error] Error sending cmd\n");
128 ret = ret_len;
129 goto error;
130 }
131 assert(ret_len == sizeof(cmd));
132
133 do {
134 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
135 } while (ret_len < 0 && errno == EINTR);
136 if (ret_len < 0) {
137 fprintf(stderr, "[error] Error sending version\n");
138 ret = ret_len;
139 goto error;
140 }
141 assert(ret_len == sizeof(connect));
142
143 do {
144 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
145 } while (ret_len < 0 && errno == EINTR);
146 if (ret_len < 0) {
147 fprintf(stderr, "[error] Error receiving version\n");
148 ret = ret_len;
149 goto error;
150 }
151 assert(ret_len == sizeof(connect));
152
153 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
154 be64toh(connect.viewer_session_id));
155 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
156 be32toh(connect.minor));
157
158 ret = 0;
159
160 error:
161 return ret;
162 }
163
164 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
165 {
166 struct lttng_viewer_cmd cmd;
167 struct lttng_viewer_list_sessions list;
168 struct lttng_viewer_session lsession;
169 int i, ret;
170 ssize_t ret_len;
171 int sessions_count;
172
173 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
174 cmd.data_size = 0;
175 cmd.cmd_version = 0;
176
177 do {
178 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
179 } while (ret_len < 0 && errno == EINTR);
180 if (ret_len < 0) {
181 fprintf(stderr, "[error] Error sending cmd\n");
182 ret = ret_len;
183 goto error;
184 }
185 assert(ret_len == sizeof(cmd));
186
187 do {
188 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
189 } while (ret_len < 0 && errno == EINTR);
190 if (ret_len < 0) {
191 fprintf(stderr, "[error] Error receiving session list\n");
192 ret = ret_len;
193 goto error;
194 }
195 assert(ret_len == sizeof(list));
196
197 sessions_count = be32toh(list.sessions_count);
198 fprintf(stdout, "%u active session(s)%c\n", sessions_count,
199 sessions_count > 0 ? ':' : ' ');
200 for (i = 0; i < sessions_count; i++) {
201 do {
202 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
203 } while (ret_len < 0 && errno == EINTR);
204 if (ret_len < 0) {
205 fprintf(stderr, "[error] Error receiving session\n");
206 ret = ret_len;
207 goto error;
208 }
209 assert(ret_len == sizeof(lsession));
210 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
211 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
212
213 fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
214 "%u stream(s), %u client(s) connected)\n",
215 path, be64toh(lsession.id),
216 lsession.session_name, lsession.hostname,
217 be32toh(lsession.live_timer),
218 be32toh(lsession.streams),
219 be32toh(lsession.clients));
220 }
221
222 ret = 0;
223
224 error:
225 return ret;
226 }
227
228 int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
229 uint64_t ctf_trace_id)
230 {
231 struct lttng_live_ctf_trace *trace;
232 int ret = 0;
233
234 trace = g_hash_table_lookup(stream->session->ctf_traces,
235 (gpointer) ctf_trace_id);
236 if (!trace) {
237 trace = g_new0(struct lttng_live_ctf_trace, 1);
238 trace->ctf_trace_id = ctf_trace_id;
239 trace->streams = g_ptr_array_new();
240 g_hash_table_insert(stream->session->ctf_traces,
241 (gpointer) ctf_trace_id,
242 trace);
243 }
244 if (stream->metadata_flag)
245 trace->metadata_stream = stream;
246
247 stream->ctf_trace = trace;
248 g_ptr_array_add(trace->streams, stream);
249
250 return ret;
251 }
252
253 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
254 {
255 struct lttng_viewer_cmd cmd;
256 struct lttng_viewer_attach_session_request rq;
257 struct lttng_viewer_attach_session_response rp;
258 struct lttng_viewer_stream stream;
259 int ret, i;
260 ssize_t ret_len;
261
262 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
263 cmd.data_size = sizeof(rq);
264 cmd.cmd_version = 0;
265
266 memset(&rq, 0, sizeof(rq));
267 rq.session_id = htobe64(id);
268 // TODO: add cmd line parameter to select seek beginning
269 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
270 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
271
272 do {
273 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
274 } while (ret_len < 0 && errno == EINTR);
275 if (ret_len < 0) {
276 fprintf(stderr, "[error] Error sending cmd\n");
277 ret = ret_len;
278 goto error;
279 }
280 assert(ret_len == sizeof(cmd));
281
282 do {
283 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
284 } while (ret_len < 0 && errno == EINTR);
285 if (ret_len < 0) {
286 fprintf(stderr, "[error] Error sending attach request\n");
287 ret = ret_len;
288 goto error;
289 }
290 assert(ret_len == sizeof(rq));
291
292 do {
293 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
294 } while (ret_len < 0 && errno == EINTR);
295 if (ret_len < 0) {
296 fprintf(stderr, "[error] Error receiving attach response\n");
297 ret = ret_len;
298 goto error;
299 }
300 assert(ret_len == sizeof(rp));
301
302 switch(be32toh(rp.status)) {
303 case LTTNG_VIEWER_ATTACH_OK:
304 break;
305 case LTTNG_VIEWER_ATTACH_UNK:
306 ret = -LTTNG_VIEWER_ATTACH_UNK;
307 goto end;
308 case LTTNG_VIEWER_ATTACH_ALREADY:
309 fprintf(stderr, "[error] Already a viewer attached\n");
310 ret = -1;
311 goto end;
312 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
313 fprintf(stderr, "[error] Not a live session\n");
314 ret = -1;
315 goto end;
316 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
317 fprintf(stderr, "[error] Wrong seek parameter\n");
318 ret = -1;
319 goto end;
320 default:
321 fprintf(stderr, "[error] Unknown attach return code %u\n",
322 be32toh(rp.status));
323 ret = -1;
324 goto end;
325 }
326 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
327 ret = -1;
328 goto end;
329 }
330
331 ctx->session->stream_count = be32toh(rp.streams_count);
332 /*
333 * When the session is created but not started, we do an active wait
334 * until it starts. It allows the viewer to start processing the trace
335 * as soon as the session starts.
336 */
337 if (ctx->session->stream_count == 0) {
338 ret = 0;
339 goto end;
340 }
341 printf_verbose("Waiting for %" PRIu64 " streams:\n",
342 ctx->session->stream_count);
343 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
344 ctx->session->stream_count);
345 for (i = 0; i < be32toh(rp.streams_count); i++) {
346 do {
347 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
348 } while (ret_len < 0 && errno == EINTR);
349 if (ret_len < 0) {
350 fprintf(stderr, "[error] Error receiving stream\n");
351 ret = ret_len;
352 goto error;
353 }
354 assert(ret_len == sizeof(stream));
355 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
356 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
357
358 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
359 be64toh(stream.id), stream.path_name,
360 stream.channel_name);
361 ctx->session->streams[i].id = be64toh(stream.id);
362 ctx->session->streams[i].session = ctx->session;
363
364 ctx->session->streams[i].first_read = 1;
365 ctx->session->streams[i].mmap_size = 0;
366
367 if (be32toh(stream.metadata_flag)) {
368 char *path;
369
370 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
371 if (!path) {
372 perror("strdup");
373 ret = -1;
374 goto error;
375 }
376 if (!mkdtemp(path)) {
377 perror("mkdtemp");
378 free(path);
379 ret = -1;
380 goto error;
381 }
382 ctx->session->streams[i].metadata_flag = 1;
383 snprintf(ctx->session->streams[i].path,
384 sizeof(ctx->session->streams[i].path),
385 "%s/%s", path,
386 stream.channel_name);
387 ret = open(ctx->session->streams[i].path,
388 O_WRONLY | O_CREAT | O_TRUNC,
389 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
390 if (ret < 0) {
391 perror("open");
392 free(path);
393 goto error;
394 }
395 ctx->session->streams[i].fd = ret;
396 free(path);
397 }
398 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
399 be64toh(stream.ctf_trace_id));
400 if (ret < 0) {
401 goto error;
402 }
403
404 }
405 ret = 0;
406
407 end:
408 error:
409 return ret;
410 }
411
412 static
413 int get_data_packet(struct lttng_live_ctx *ctx,
414 struct ctf_stream_pos *pos,
415 struct lttng_live_viewer_stream *stream, uint64_t offset,
416 uint64_t len)
417 {
418 struct lttng_viewer_cmd cmd;
419 struct lttng_viewer_get_packet rq;
420 struct lttng_viewer_trace_packet rp;
421 ssize_t ret_len;
422 int ret;
423
424 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
425 cmd.data_size = sizeof(rq);
426 cmd.cmd_version = 0;
427
428 memset(&rq, 0, sizeof(rq));
429 rq.stream_id = htobe64(stream->id);
430 /* Already in big endian. */
431 rq.offset = offset;
432 rq.len = htobe32(len);
433
434 do {
435 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
436 } while (ret_len < 0 && errno == EINTR);
437 if (ret_len < 0) {
438 fprintf(stderr, "[error] Error sending cmd\n");
439 ret = ret_len;
440 goto error;
441 }
442 assert(ret_len == sizeof(cmd));
443
444 do {
445 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
446 } while (ret_len < 0 && errno == EINTR);
447 if (ret_len < 0) {
448 fprintf(stderr, "[error] Error sending get_data_packet request\n");
449 ret = ret_len;
450 goto error;
451 }
452 assert(ret_len == sizeof(rq));
453
454 do {
455 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
456 } while (ret_len < 0 && errno == EINTR);
457 if (ret_len < 0) {
458 fprintf(stderr, "[error] Error receiving data response\n");
459 ret = ret_len;
460 goto error;
461 }
462 if (ret_len != sizeof(rp)) {
463 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
464 ", received %" PRId64 "\n", ret_len,
465 sizeof(rp));
466 ret = -1;
467 goto error;
468 }
469
470 rp.flags = be32toh(rp.flags);
471
472 switch (be32toh(rp.status)) {
473 case LTTNG_VIEWER_GET_PACKET_OK:
474 len = be32toh(rp.len);
475 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
476 "\n", len);
477 break;
478 case LTTNG_VIEWER_GET_PACKET_RETRY:
479 printf_verbose("get_data_packet: retry\n");
480 ret = -1;
481 goto end;
482 case LTTNG_VIEWER_GET_PACKET_ERR:
483 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
484 printf_verbose("get_data_packet: new metadata needed\n");
485 ret = 0;
486 goto end;
487 }
488 fprintf(stderr, "[error] get_data_packet: error\n");
489 ret = -1;
490 goto end;
491 case LTTNG_VIEWER_GET_PACKET_EOF:
492 ret = -2;
493 goto error;
494 default:
495 printf_verbose("get_data_packet: unknown\n");
496 assert(0);
497 ret = -1;
498 goto end;
499 }
500
501 if (len <= 0) {
502 ret = -1;
503 goto end;
504 }
505
506 if (len > stream->mmap_size) {
507 uint64_t new_size;
508
509 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
510 if (pos->base_mma) {
511 /* unmap old base */
512 ret = munmap_align(pos->base_mma);
513 if (ret) {
514 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
515 strerror(errno));
516 ret = -1;
517 goto error;
518 }
519 pos->base_mma = NULL;
520 }
521 pos->base_mma = mmap_align(new_size,
522 PROT_READ | PROT_WRITE,
523 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
524 if (pos->base_mma == MAP_FAILED) {
525 fprintf(stderr, "[error] mmap error %s.\n",
526 strerror(errno));
527 pos->base_mma = NULL;
528 ret = -1;
529 goto error;
530 }
531
532 stream->mmap_size = new_size;
533 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
534 stream->mmap_size);
535 }
536
537 do {
538 ret_len = recv(ctx->control_sock,
539 mmap_align_addr(pos->base_mma), len,
540 MSG_WAITALL);
541 } while (ret_len < 0 && errno == EINTR);
542 if (ret_len < 0) {
543 fprintf(stderr, "[error] Error receiving trace packet\n");
544 ret = ret_len;
545 goto error;
546 }
547 assert(ret_len == len);
548
549 end:
550 error:
551 return ret;
552 }
553
554 /*
555 * Return number of metadata bytes written or a negative value on error.
556 */
557 static
558 int get_new_metadata(struct lttng_live_ctx *ctx,
559 struct lttng_live_viewer_stream *viewer_stream,
560 uint64_t *metadata_len)
561 {
562 uint64_t len = 0;
563 int ret;
564 struct lttng_viewer_cmd cmd;
565 struct lttng_viewer_get_metadata rq;
566 struct lttng_viewer_metadata_packet rp;
567 struct lttng_live_viewer_stream *metadata_stream;
568 char *data = NULL;
569 ssize_t ret_len;
570
571 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
572 cmd.data_size = sizeof(rq);
573 cmd.cmd_version = 0;
574
575 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
576 rq.stream_id = htobe64(metadata_stream->id);
577
578 do {
579 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
580 } while (ret_len < 0 && errno == EINTR);
581 if (ret_len < 0) {
582 fprintf(stderr, "[error] Error sending cmd\n");
583 ret = ret_len;
584 goto error;
585 }
586 assert(ret_len == sizeof(cmd));
587
588 do {
589 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
590 } while (ret_len < 0 && errno == EINTR);
591 if (ret_len < 0) {
592 fprintf(stderr, "[error] Error sending get_metadata request\n");
593 ret = ret_len;
594 goto error;
595 }
596 assert(ret_len == sizeof(rq));
597
598 do {
599 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
600 } while (ret_len < 0 && errno == EINTR);
601 if (ret_len < 0) {
602 fprintf(stderr, "[error] Error receiving metadata response\n");
603 ret = ret_len;
604 goto error;
605 }
606 assert(ret_len == sizeof(rp));
607
608 switch (be32toh(rp.status)) {
609 case LTTNG_VIEWER_METADATA_OK:
610 printf_verbose("get_metadata : OK\n");
611 break;
612 case LTTNG_VIEWER_NO_NEW_METADATA:
613 printf_verbose("get_metadata : NO NEW\n");
614 ret = -1;
615 goto end;
616 case LTTNG_VIEWER_METADATA_ERR:
617 printf_verbose("get_metadata : ERR\n");
618 ret = -1;
619 goto end;
620 default:
621 printf_verbose("get_metadata : UNKNOWN\n");
622 ret = -1;
623 goto end;
624 }
625
626 len = be64toh(rp.len);
627 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
628 if (len <= 0) {
629 ret = -1;
630 goto end;
631 }
632
633 data = zmalloc(len);
634 if (!data) {
635 perror("relay data zmalloc");
636 ret = -1;
637 goto error;
638 }
639 do {
640 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
641 } while (ret_len < 0 && errno == EINTR);
642 if (ret_len < 0) {
643 fprintf(stderr, "[error] Error receiving trace packet\n");
644 ret = ret_len;
645 free(data);
646 goto error;
647 }
648 assert(ret_len == len);
649
650 do {
651 ret_len = write(metadata_stream->fd, data, len);
652 } while (ret_len < 0 && errno == EINTR);
653 if (ret_len < 0) {
654 free(data);
655 ret = ret_len;
656 goto error;
657 }
658 assert(ret_len == len);
659
660 free(data);
661
662 *metadata_len = len;
663 ret = 0;
664 end:
665 error:
666 return ret;
667 }
668
669 /*
670 * Get one index for a stream.
671 *
672 * Returns 0 on success or a negative value on error.
673 */
674 static
675 int get_next_index(struct lttng_live_ctx *ctx,
676 struct lttng_live_viewer_stream *viewer_stream,
677 struct packet_index *index)
678 {
679 struct lttng_viewer_cmd cmd;
680 struct lttng_viewer_get_next_index rq;
681 struct lttng_viewer_index rp;
682 int ret;
683 uint64_t metadata_len;
684 ssize_t ret_len;
685
686 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
687 cmd.data_size = sizeof(rq);
688 cmd.cmd_version = 0;
689
690 memset(&rq, 0, sizeof(rq));
691 rq.stream_id = htobe64(viewer_stream->id);
692
693 retry:
694 do {
695 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
696 } while (ret_len < 0 && errno == EINTR);
697 if (ret_len < 0) {
698 fprintf(stderr, "[error] Error sending cmd\n");
699 ret = ret_len;
700 goto error;
701 }
702 assert(ret_len == sizeof(cmd));
703
704 do {
705 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
706 } while (ret_len < 0 && errno == EINTR);
707 if (ret_len < 0) {
708 fprintf(stderr, "[error] Error sending get_next_index request\n");
709 ret = ret_len;
710 goto error;
711 }
712 assert(ret_len == sizeof(rq));
713
714 do {
715 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
716 } while (ret_len < 0 && errno == EINTR);
717 if (ret_len < 0) {
718 fprintf(stderr, "[error] Error receiving index response\n");
719 ret = ret_len;
720 goto error;
721 }
722 assert(ret_len == sizeof(rp));
723
724 rp.flags = be32toh(rp.flags);
725
726 switch (be32toh(rp.status)) {
727 case LTTNG_VIEWER_INDEX_INACTIVE:
728 printf_verbose("get_next_index: inactive\n");
729 memset(index, 0, sizeof(struct packet_index));
730 index->timestamp_end = be64toh(rp.timestamp_end);
731 break;
732 case LTTNG_VIEWER_INDEX_OK:
733 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
734 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
735 index->offset = be64toh(rp.offset);
736 index->packet_size = be64toh(rp.packet_size);
737 index->content_size = be64toh(rp.content_size);
738 index->timestamp_begin = be64toh(rp.timestamp_begin);
739 index->timestamp_end = be64toh(rp.timestamp_end);
740 index->events_discarded = be64toh(rp.events_discarded);
741
742 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
743 printf_verbose("get_next_index: new metadata needed\n");
744 ret = get_new_metadata(ctx, viewer_stream,
745 &metadata_len);
746 if (ret < 0) {
747 goto error;
748 }
749 }
750 break;
751 case LTTNG_VIEWER_INDEX_RETRY:
752 printf_verbose("get_next_index: retry\n");
753 sleep(1);
754 goto retry;
755 case LTTNG_VIEWER_INDEX_HUP:
756 printf_verbose("get_next_index: stream hung up\n");
757 viewer_stream->id = -1ULL;
758 viewer_stream->fd = -1;
759 index->offset = EOF;
760 break;
761 case LTTNG_VIEWER_INDEX_ERR:
762 fprintf(stderr, "[error] get_next_index: error\n");
763 ret = -1;
764 goto error;
765 default:
766 fprintf(stderr, "[error] get_next_index: unkwown value\n");
767 ret = -1;
768 goto error;
769 }
770
771 ret = 0;
772
773 error:
774 return ret;
775 }
776
777 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
778 int whence)
779 {
780 struct ctf_stream_pos *pos;
781 struct ctf_file_stream *file_stream;
782 struct packet_index packet_index;
783 struct lttng_live_viewer_stream *viewer_stream;
784 struct lttng_live_session *session;
785 int ret;
786
787 retry:
788 pos = ctf_pos(stream_pos);
789 file_stream = container_of(pos, struct ctf_file_stream, pos);
790 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
791 session = viewer_stream->session;
792
793 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
794 ret = get_next_index(session->ctx, viewer_stream, &packet_index);
795 if (ret < 0) {
796 pos->offset = EOF;
797 fprintf(stderr, "[error] get_next_index failed\n");
798 return;
799 }
800
801 pos->packet_size = packet_index.packet_size;
802 pos->content_size = packet_index.content_size;
803 pos->mmap_base_offset = 0;
804 if (packet_index.offset == EOF) {
805 pos->offset = EOF;
806 } else {
807 pos->offset = 0;
808 }
809
810 if (packet_index.content_size == 0) {
811 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
812 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
813 &file_stream->parent, packet_index.timestamp_end);
814 } else {
815 file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
816 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
817 &file_stream->parent, packet_index.timestamp_begin);
818 }
819
820 if (pos->packet_size == 0 || pos->offset == EOF) {
821 goto end;
822 }
823
824 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
825 viewer_stream->id);
826 ret = get_data_packet(session->ctx, pos, viewer_stream,
827 be64toh(packet_index.offset),
828 packet_index.packet_size / CHAR_BIT);
829 if (ret == -2) {
830 goto retry;
831 } else if (ret < 0) {
832 pos->offset = EOF;
833 fprintf(stderr, "[error] get_data_packet failed\n");
834 return;
835 }
836
837 printf_verbose("Index received : packet_size : %" PRIu64
838 ", offset %" PRIu64 ", content_size %" PRIu64
839 ", timestamp_end : %" PRIu64 "\n",
840 packet_index.packet_size, packet_index.offset,
841 packet_index.content_size, packet_index.timestamp_end);
842
843 /* update trace_packet_header and stream_packet_context */
844 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
845 /* Read packet header */
846 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
847 if (ret) {
848 pos->offset = EOF;
849 fprintf(stderr, "[error] trace packet header read failed\n");
850 goto end;
851 }
852 }
853 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
854 /* Read packet context */
855 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
856 if (ret) {
857 pos->offset = EOF;
858 fprintf(stderr, "[error] stream packet context read failed\n");
859 goto end;
860 }
861 }
862 pos->data_offset = pos->offset;
863
864 end:
865 return;
866 }
867
868 static int del_traces(gpointer key, gpointer value, gpointer user_data)
869 {
870 struct bt_context *bt_ctx = user_data;
871 struct lttng_live_ctf_trace *trace = value;
872 int ret;
873
874 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
875 if (ret < 0)
876 fprintf(stderr, "[error] removing trace from context\n");
877
878 /* remove the key/value pair from the HT. */
879 return 1;
880 }
881
882 static void add_traces(gpointer key, gpointer value, gpointer user_data)
883 {
884 int i, ret, total_metadata = 0;
885 uint64_t metadata_len;
886 struct bt_context *bt_ctx = user_data;
887 struct lttng_live_ctf_trace *trace = value;
888 struct lttng_live_viewer_stream *stream;
889 struct bt_mmap_stream *new_mmap_stream;
890 struct bt_mmap_stream_list mmap_list;
891 struct lttng_live_ctx *ctx = NULL;
892
893 BT_INIT_LIST_HEAD(&mmap_list.head);
894
895 for (i = 0; i < trace->streams->len; i++) {
896 stream = g_ptr_array_index(trace->streams, i);
897 ctx = stream->session->ctx;
898
899 if (!stream->metadata_flag) {
900 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
901 new_mmap_stream->priv = (void *) stream;
902 new_mmap_stream->fd = -1;
903 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
904 } else {
905 /* Get all possible metadata before starting */
906 do {
907 ret = get_new_metadata(ctx, stream,
908 &metadata_len);
909 if (ret == 0) {
910 total_metadata += metadata_len;
911 }
912 } while (ret == 0 || total_metadata == 0);
913 trace->metadata_fp = fopen(stream->path, "r");
914 }
915 }
916
917 if (!trace->metadata_fp) {
918 fprintf(stderr, "[error] No metadata stream opened\n");
919 goto end_free;
920 }
921
922 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
923 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
924 if (ret < 0) {
925 fprintf(stderr, "[error] Error adding trace\n");
926 goto end_free;
927 }
928 trace->trace_id = ret;
929
930 goto end;
931
932 end_free:
933 bt_context_put(bt_ctx);
934 end:
935 return;
936 }
937
938 void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
939 {
940 int ret, active_session = 0;
941 struct bt_context *bt_ctx;
942
943 bt_ctx = bt_context_create();
944 if (!bt_ctx) {
945 fprintf(stderr, "[error] bt_context_create allocation\n");
946 goto end;
947 }
948
949 /*
950 * As long as the session is active, we try to reattach to it,
951 * even if all the streams get closed.
952 */
953 // do {
954 do {
955 ret = lttng_live_attach_session(ctx, session_id);
956 printf_verbose("Attaching session returns %d\n", ret);
957 if (ret < 0) {
958 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
959 if (active_session)
960 goto end_free;
961 fprintf(stderr, "[error] Unknown "
962 "session ID\n");
963 }
964 goto end_free;
965 } else {
966 active_session = 1;
967 }
968 } while (ctx->session->stream_count == 0);
969
970 g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
971
972 ret = check_requirements(bt_ctx);
973 if (ret < 0) {
974 fprintf(stderr, "[error] some mandatory contexts "
975 "were missing, exiting.\n");
976 goto end;
977 }
978
979 if (!opt_textdump) {
980 pthread_create(&display_thread, NULL, ncurses_display,
981 (void *) NULL);
982 pthread_create(&timer_thread, NULL, refresh_thread,
983 (void *) NULL);
984 }
985
986 iter_trace(bt_ctx);
987
988 g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
989 // } while (active_session);
990
991 end_free:
992 bt_context_put(bt_ctx);
993 end:
994 return;
995 }
This page took 0.046756 seconds and 3 git commands to generate.