Big cleanup of network live
[lttngtop.git] / src / lttng-live-functions.c
... / ...
CommitLineData
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50/*
51#include <formats/ctf/events-private.h>
52replaced with
53*/
54#include "network-live.h"
55
56#include "lttng-live-functions.h"
57#include "lttng-viewer.h"
58#include "lttngtop.h"
59
60/*
61 * Memory allocation zeroed
62 */
63#define zmalloc(x) calloc(1, x)
64
65#ifndef max_t
66#define max_t(type, a, b) \
67 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
68#endif
69
70int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
71 int port)
72{
73 struct hostent *host;
74 struct sockaddr_in server_addr;
75 int ret;
76
77 host = gethostbyname(hostname);
78 if (!host) {
79 ret = -1;
80 goto end;
81 }
82
83 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
84 perror("Socket");
85 ret = -1;
86 goto end;
87 }
88
89 server_addr.sin_family = AF_INET;
90 server_addr.sin_port = htons(port);
91 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
92 bzero(&(server_addr.sin_zero), 8);
93
94 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
95 sizeof(struct sockaddr)) == -1) {
96 perror("Connect");
97 ret = -1;
98 goto end;
99 }
100
101 ret = 0;
102
103end:
104 return ret;
105}
106
107int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
108{
109 struct lttng_viewer_cmd cmd;
110 struct lttng_viewer_connect connect;
111 int ret;
112 ssize_t ret_len;
113
114 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
115 cmd.data_size = sizeof(connect);
116 cmd.cmd_version = 0;
117
118 connect.viewer_session_id = -1ULL; /* will be set on recv */
119 connect.major = htobe32(LTTNG_LIVE_MAJOR);
120 connect.minor = htobe32(LTTNG_LIVE_MINOR);
121 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
122
123 do {
124 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
125 } while (ret_len < 0 && errno == EINTR);
126 if (ret_len < 0) {
127 fprintf(stderr, "[error] Error sending cmd\n");
128 ret = ret_len;
129 goto error;
130 }
131 assert(ret_len == sizeof(cmd));
132
133 do {
134 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
135 } while (ret_len < 0 && errno == EINTR);
136 if (ret_len < 0) {
137 fprintf(stderr, "[error] Error sending version\n");
138 ret = ret_len;
139 goto error;
140 }
141 assert(ret_len == sizeof(connect));
142
143 do {
144 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
145 } while (ret_len < 0 && errno == EINTR);
146 if (ret_len < 0) {
147 fprintf(stderr, "[error] Error receiving version\n");
148 ret = ret_len;
149 goto error;
150 }
151 assert(ret_len == sizeof(connect));
152
153 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
154 be64toh(connect.viewer_session_id));
155 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
156 be32toh(connect.minor));
157
158 ret = 0;
159
160error:
161 return ret;
162}
163
164int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
165{
166 struct lttng_viewer_cmd cmd;
167 struct lttng_viewer_list_sessions list;
168 struct lttng_viewer_session lsession;
169 int i, ret;
170 ssize_t ret_len;
171 int sessions_count;
172
173 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
174 cmd.data_size = 0;
175 cmd.cmd_version = 0;
176
177 do {
178 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
179 } while (ret_len < 0 && errno == EINTR);
180 if (ret_len < 0) {
181 fprintf(stderr, "[error] Error sending cmd\n");
182 ret = ret_len;
183 goto error;
184 }
185 assert(ret_len == sizeof(cmd));
186
187 do {
188 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
189 } while (ret_len < 0 && errno == EINTR);
190 if (ret_len < 0) {
191 fprintf(stderr, "[error] Error receiving session list\n");
192 ret = ret_len;
193 goto error;
194 }
195 assert(ret_len == sizeof(list));
196
197 sessions_count = be32toh(list.sessions_count);
198 fprintf(stdout, "%u active session(s)%c\n", sessions_count,
199 sessions_count > 0 ? ':' : ' ');
200 for (i = 0; i < sessions_count; i++) {
201 do {
202 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
203 } while (ret_len < 0 && errno == EINTR);
204 if (ret_len < 0) {
205 fprintf(stderr, "[error] Error receiving session\n");
206 ret = ret_len;
207 goto error;
208 }
209 assert(ret_len == sizeof(lsession));
210 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
211 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
212
213 fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
214 "%u stream(s), %u client(s) connected)\n",
215 path, be64toh(lsession.id),
216 lsession.session_name, lsession.hostname,
217 be32toh(lsession.live_timer),
218 be32toh(lsession.streams),
219 be32toh(lsession.clients));
220 }
221
222 ret = 0;
223
224error:
225 return ret;
226}
227
228int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
229 uint64_t ctf_trace_id)
230{
231 struct lttng_live_ctf_trace *trace;
232 int ret = 0;
233
234 trace = g_hash_table_lookup(stream->session->ctf_traces,
235 (gpointer) ctf_trace_id);
236 if (!trace) {
237 trace = g_new0(struct lttng_live_ctf_trace, 1);
238 trace->ctf_trace_id = ctf_trace_id;
239 trace->streams = g_ptr_array_new();
240 g_hash_table_insert(stream->session->ctf_traces,
241 (gpointer) ctf_trace_id,
242 trace);
243 }
244 if (stream->metadata_flag)
245 trace->metadata_stream = stream;
246
247 stream->ctf_trace = trace;
248 g_ptr_array_add(trace->streams, stream);
249
250 return ret;
251}
252
253int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
254{
255 struct lttng_viewer_cmd cmd;
256 struct lttng_viewer_attach_session_request rq;
257 struct lttng_viewer_attach_session_response rp;
258 struct lttng_viewer_stream stream;
259 int ret, i;
260 ssize_t ret_len;
261
262 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
263 cmd.data_size = sizeof(rq);
264 cmd.cmd_version = 0;
265
266 memset(&rq, 0, sizeof(rq));
267 rq.session_id = htobe64(id);
268 // TODO: add cmd line parameter to select seek beginning
269 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
270 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
271
272 do {
273 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
274 } while (ret_len < 0 && errno == EINTR);
275 if (ret_len < 0) {
276 fprintf(stderr, "[error] Error sending cmd\n");
277 ret = ret_len;
278 goto error;
279 }
280 assert(ret_len == sizeof(cmd));
281
282 do {
283 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
284 } while (ret_len < 0 && errno == EINTR);
285 if (ret_len < 0) {
286 fprintf(stderr, "[error] Error sending attach request\n");
287 ret = ret_len;
288 goto error;
289 }
290 assert(ret_len == sizeof(rq));
291
292 do {
293 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
294 } while (ret_len < 0 && errno == EINTR);
295 if (ret_len < 0) {
296 fprintf(stderr, "[error] Error receiving attach response\n");
297 ret = ret_len;
298 goto error;
299 }
300 assert(ret_len == sizeof(rp));
301
302 switch(be32toh(rp.status)) {
303 case LTTNG_VIEWER_ATTACH_OK:
304 break;
305 case LTTNG_VIEWER_ATTACH_UNK:
306 ret = -LTTNG_VIEWER_ATTACH_UNK;
307 goto end;
308 case LTTNG_VIEWER_ATTACH_ALREADY:
309 fprintf(stderr, "[error] Already a viewer attached\n");
310 ret = -1;
311 goto end;
312 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
313 fprintf(stderr, "[error] Not a live session\n");
314 ret = -1;
315 goto end;
316 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
317 fprintf(stderr, "[error] Wrong seek parameter\n");
318 ret = -1;
319 goto end;
320 default:
321 fprintf(stderr, "[error] Unknown attach return code %u\n",
322 be32toh(rp.status));
323 ret = -1;
324 goto end;
325 }
326 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
327 ret = -1;
328 goto end;
329 }
330
331 ctx->session->stream_count = be32toh(rp.streams_count);
332 /*
333 * When the session is created but not started, we do an active wait
334 * until it starts. It allows the viewer to start processing the trace
335 * as soon as the session starts.
336 */
337 if (ctx->session->stream_count == 0) {
338 ret = 0;
339 goto end;
340 }
341 printf_verbose("Waiting for %" PRIu64 " streams:\n",
342 ctx->session->stream_count);
343 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
344 ctx->session->stream_count);
345 for (i = 0; i < be32toh(rp.streams_count); i++) {
346 do {
347 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
348 } while (ret_len < 0 && errno == EINTR);
349 if (ret_len < 0) {
350 fprintf(stderr, "[error] Error receiving stream\n");
351 ret = ret_len;
352 goto error;
353 }
354 assert(ret_len == sizeof(stream));
355 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
356 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
357
358 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
359 be64toh(stream.id), stream.path_name,
360 stream.channel_name);
361 ctx->session->streams[i].id = be64toh(stream.id);
362 ctx->session->streams[i].session = ctx->session;
363
364 ctx->session->streams[i].first_read = 1;
365 ctx->session->streams[i].mmap_size = 0;
366
367 if (be32toh(stream.metadata_flag)) {
368 char *path;
369
370 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
371 if (!path) {
372 perror("strdup");
373 ret = -1;
374 goto error;
375 }
376 if (!mkdtemp(path)) {
377 perror("mkdtemp");
378 free(path);
379 ret = -1;
380 goto error;
381 }
382 ctx->session->streams[i].metadata_flag = 1;
383 snprintf(ctx->session->streams[i].path,
384 sizeof(ctx->session->streams[i].path),
385 "%s/%s", path,
386 stream.channel_name);
387 ret = open(ctx->session->streams[i].path,
388 O_WRONLY | O_CREAT | O_TRUNC,
389 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
390 if (ret < 0) {
391 perror("open");
392 free(path);
393 goto error;
394 }
395 ctx->session->streams[i].fd = ret;
396 free(path);
397 }
398 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
399 be64toh(stream.ctf_trace_id));
400 if (ret < 0) {
401 goto error;
402 }
403
404 }
405 ret = 0;
406
407end:
408error:
409 return ret;
410}
411
412static
413int get_data_packet(struct lttng_live_ctx *ctx,
414 struct ctf_stream_pos *pos,
415 struct lttng_live_viewer_stream *stream, uint64_t offset,
416 uint64_t len)
417{
418 struct lttng_viewer_cmd cmd;
419 struct lttng_viewer_get_packet rq;
420 struct lttng_viewer_trace_packet rp;
421 ssize_t ret_len;
422 int ret;
423
424 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
425 cmd.data_size = sizeof(rq);
426 cmd.cmd_version = 0;
427
428 memset(&rq, 0, sizeof(rq));
429 rq.stream_id = htobe64(stream->id);
430 /* Already in big endian. */
431 rq.offset = offset;
432 rq.len = htobe32(len);
433
434 do {
435 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
436 } while (ret_len < 0 && errno == EINTR);
437 if (ret_len < 0) {
438 fprintf(stderr, "[error] Error sending cmd\n");
439 ret = ret_len;
440 goto error;
441 }
442 assert(ret_len == sizeof(cmd));
443
444 do {
445 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
446 } while (ret_len < 0 && errno == EINTR);
447 if (ret_len < 0) {
448 fprintf(stderr, "[error] Error sending get_data_packet request\n");
449 ret = ret_len;
450 goto error;
451 }
452 assert(ret_len == sizeof(rq));
453
454 do {
455 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
456 } while (ret_len < 0 && errno == EINTR);
457 if (ret_len < 0) {
458 fprintf(stderr, "[error] Error receiving data response\n");
459 ret = ret_len;
460 goto error;
461 }
462 if (ret_len != sizeof(rp)) {
463 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
464 ", received %" PRId64 "\n", ret_len,
465 sizeof(rp));
466 ret = -1;
467 goto error;
468 }
469
470 rp.flags = be32toh(rp.flags);
471
472 switch (be32toh(rp.status)) {
473 case LTTNG_VIEWER_GET_PACKET_OK:
474 len = be32toh(rp.len);
475 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
476 "\n", len);
477 break;
478 case LTTNG_VIEWER_GET_PACKET_RETRY:
479 printf_verbose("get_data_packet: retry\n");
480 ret = -1;
481 goto end;
482 case LTTNG_VIEWER_GET_PACKET_ERR:
483 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
484 printf_verbose("get_data_packet: new metadata needed\n");
485 ret = 0;
486 goto end;
487 }
488 fprintf(stderr, "[error] get_data_packet: error\n");
489 ret = -1;
490 goto end;
491 case LTTNG_VIEWER_GET_PACKET_EOF:
492 ret = -2;
493 goto error;
494 default:
495 printf_verbose("get_data_packet: unknown\n");
496 assert(0);
497 ret = -1;
498 goto end;
499 }
500
501 if (len <= 0) {
502 ret = -1;
503 goto end;
504 }
505
506 if (len > stream->mmap_size) {
507 uint64_t new_size;
508
509 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
510 if (pos->base_mma) {
511 /* unmap old base */
512 ret = munmap_align(pos->base_mma);
513 if (ret) {
514 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
515 strerror(errno));
516 ret = -1;
517 goto error;
518 }
519 pos->base_mma = NULL;
520 }
521 pos->base_mma = mmap_align(new_size,
522 PROT_READ | PROT_WRITE,
523 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
524 if (pos->base_mma == MAP_FAILED) {
525 fprintf(stderr, "[error] mmap error %s.\n",
526 strerror(errno));
527 pos->base_mma = NULL;
528 ret = -1;
529 goto error;
530 }
531
532 stream->mmap_size = new_size;
533 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
534 stream->mmap_size);
535 }
536
537 do {
538 ret_len = recv(ctx->control_sock,
539 mmap_align_addr(pos->base_mma), len,
540 MSG_WAITALL);
541 } while (ret_len < 0 && errno == EINTR);
542 if (ret_len < 0) {
543 fprintf(stderr, "[error] Error receiving trace packet\n");
544 ret = ret_len;
545 goto error;
546 }
547 assert(ret_len == len);
548
549end:
550error:
551 return ret;
552}
553
554/*
555 * Return number of metadata bytes written or a negative value on error.
556 */
557static
558int get_new_metadata(struct lttng_live_ctx *ctx,
559 struct lttng_live_viewer_stream *viewer_stream,
560 uint64_t *metadata_len)
561{
562 uint64_t len = 0;
563 int ret;
564 struct lttng_viewer_cmd cmd;
565 struct lttng_viewer_get_metadata rq;
566 struct lttng_viewer_metadata_packet rp;
567 struct lttng_live_viewer_stream *metadata_stream;
568 char *data = NULL;
569 ssize_t ret_len;
570
571 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
572 cmd.data_size = sizeof(rq);
573 cmd.cmd_version = 0;
574
575 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
576 rq.stream_id = htobe64(metadata_stream->id);
577
578 do {
579 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
580 } while (ret_len < 0 && errno == EINTR);
581 if (ret_len < 0) {
582 fprintf(stderr, "[error] Error sending cmd\n");
583 ret = ret_len;
584 goto error;
585 }
586 assert(ret_len == sizeof(cmd));
587
588 do {
589 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
590 } while (ret_len < 0 && errno == EINTR);
591 if (ret_len < 0) {
592 fprintf(stderr, "[error] Error sending get_metadata request\n");
593 ret = ret_len;
594 goto error;
595 }
596 assert(ret_len == sizeof(rq));
597
598 do {
599 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
600 } while (ret_len < 0 && errno == EINTR);
601 if (ret_len < 0) {
602 fprintf(stderr, "[error] Error receiving metadata response\n");
603 ret = ret_len;
604 goto error;
605 }
606 assert(ret_len == sizeof(rp));
607
608 switch (be32toh(rp.status)) {
609 case LTTNG_VIEWER_METADATA_OK:
610 printf_verbose("get_metadata : OK\n");
611 break;
612 case LTTNG_VIEWER_NO_NEW_METADATA:
613 printf_verbose("get_metadata : NO NEW\n");
614 ret = -1;
615 goto end;
616 case LTTNG_VIEWER_METADATA_ERR:
617 printf_verbose("get_metadata : ERR\n");
618 ret = -1;
619 goto end;
620 default:
621 printf_verbose("get_metadata : UNKNOWN\n");
622 ret = -1;
623 goto end;
624 }
625
626 len = be64toh(rp.len);
627 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
628 if (len <= 0) {
629 ret = -1;
630 goto end;
631 }
632
633 data = zmalloc(len);
634 if (!data) {
635 perror("relay data zmalloc");
636 ret = -1;
637 goto error;
638 }
639 do {
640 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
641 } while (ret_len < 0 && errno == EINTR);
642 if (ret_len < 0) {
643 fprintf(stderr, "[error] Error receiving trace packet\n");
644 ret = ret_len;
645 free(data);
646 goto error;
647 }
648 assert(ret_len == len);
649
650 do {
651 ret_len = write(metadata_stream->fd, data, len);
652 } while (ret_len < 0 && errno == EINTR);
653 if (ret_len < 0) {
654 free(data);
655 ret = ret_len;
656 goto error;
657 }
658 assert(ret_len == len);
659
660 free(data);
661
662 *metadata_len = len;
663 ret = 0;
664end:
665error:
666 return ret;
667}
668
669/*
670 * Get one index for a stream.
671 *
672 * Returns 0 on success or a negative value on error.
673 */
674static
675int get_next_index(struct lttng_live_ctx *ctx,
676 struct lttng_live_viewer_stream *viewer_stream,
677 struct packet_index *index)
678{
679 struct lttng_viewer_cmd cmd;
680 struct lttng_viewer_get_next_index rq;
681 struct lttng_viewer_index rp;
682 int ret;
683 uint64_t metadata_len;
684 ssize_t ret_len;
685
686 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
687 cmd.data_size = sizeof(rq);
688 cmd.cmd_version = 0;
689
690 memset(&rq, 0, sizeof(rq));
691 rq.stream_id = htobe64(viewer_stream->id);
692
693retry:
694 do {
695 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
696 } while (ret_len < 0 && errno == EINTR);
697 if (ret_len < 0) {
698 fprintf(stderr, "[error] Error sending cmd\n");
699 ret = ret_len;
700 goto error;
701 }
702 assert(ret_len == sizeof(cmd));
703
704 do {
705 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
706 } while (ret_len < 0 && errno == EINTR);
707 if (ret_len < 0) {
708 fprintf(stderr, "[error] Error sending get_next_index request\n");
709 ret = ret_len;
710 goto error;
711 }
712 assert(ret_len == sizeof(rq));
713
714 do {
715 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
716 } while (ret_len < 0 && errno == EINTR);
717 if (ret_len < 0) {
718 fprintf(stderr, "[error] Error receiving index response\n");
719 ret = ret_len;
720 goto error;
721 }
722 assert(ret_len == sizeof(rp));
723
724 rp.flags = be32toh(rp.flags);
725
726 switch (be32toh(rp.status)) {
727 case LTTNG_VIEWER_INDEX_INACTIVE:
728 printf_verbose("get_next_index: inactive\n");
729 memset(index, 0, sizeof(struct packet_index));
730 index->timestamp_end = be64toh(rp.timestamp_end);
731 break;
732 case LTTNG_VIEWER_INDEX_OK:
733 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
734 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
735 index->offset = be64toh(rp.offset);
736 index->packet_size = be64toh(rp.packet_size);
737 index->content_size = be64toh(rp.content_size);
738 index->timestamp_begin = be64toh(rp.timestamp_begin);
739 index->timestamp_end = be64toh(rp.timestamp_end);
740 index->events_discarded = be64toh(rp.events_discarded);
741
742 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
743 printf_verbose("get_next_index: new metadata needed\n");
744 ret = get_new_metadata(ctx, viewer_stream,
745 &metadata_len);
746 if (ret < 0) {
747 goto error;
748 }
749 }
750 break;
751 case LTTNG_VIEWER_INDEX_RETRY:
752 printf_verbose("get_next_index: retry\n");
753 sleep(1);
754 goto retry;
755 case LTTNG_VIEWER_INDEX_HUP:
756 printf_verbose("get_next_index: stream hung up\n");
757 viewer_stream->id = -1ULL;
758 viewer_stream->fd = -1;
759 index->offset = EOF;
760 break;
761 case LTTNG_VIEWER_INDEX_ERR:
762 fprintf(stderr, "[error] get_next_index: error\n");
763 ret = -1;
764 goto error;
765 default:
766 fprintf(stderr, "[error] get_next_index: unkwown value\n");
767 ret = -1;
768 goto error;
769 }
770
771 ret = 0;
772
773error:
774 return ret;
775}
776
777void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
778 int whence)
779{
780 struct ctf_stream_pos *pos;
781 struct ctf_file_stream *file_stream;
782 struct packet_index packet_index;
783 struct lttng_live_viewer_stream *viewer_stream;
784 struct lttng_live_session *session;
785 int ret;
786
787retry:
788 pos = ctf_pos(stream_pos);
789 file_stream = container_of(pos, struct ctf_file_stream, pos);
790 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
791 session = viewer_stream->session;
792
793 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
794 ret = get_next_index(session->ctx, viewer_stream, &packet_index);
795 if (ret < 0) {
796 pos->offset = EOF;
797 fprintf(stderr, "[error] get_next_index failed\n");
798 return;
799 }
800
801 pos->packet_size = packet_index.packet_size;
802 pos->content_size = packet_index.content_size;
803 pos->mmap_base_offset = 0;
804 if (packet_index.offset == EOF) {
805 pos->offset = EOF;
806 } else {
807 pos->offset = 0;
808 }
809
810 if (packet_index.content_size == 0) {
811 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
812 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
813 &file_stream->parent, packet_index.timestamp_end);
814 } else {
815 /* Append to the cycles_index. */
816 g_array_append_val(file_stream->pos.packet_cycles_index,
817 packet_index);
818 /* Convert the timestamps and append to the real_index. */
819 packet_index.timestamp_begin = ctf_get_real_timestamp(
820 &file_stream->parent, packet_index.timestamp_begin);
821 packet_index.timestamp_end = ctf_get_real_timestamp(
822 &file_stream->parent, packet_index.timestamp_end);
823 g_array_append_val(file_stream->pos.packet_real_index,
824 packet_index);
825
826 compute_discarded_events(file_stream, pos);
827
828 packet_index = g_array_index(pos->packet_cycles_index,
829 struct packet_index, pos->cur_index);
830 file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
831
832 packet_index = g_array_index(pos->packet_real_index,
833 struct packet_index, pos->cur_index);
834 file_stream->parent.real_timestamp = packet_index.timestamp_begin;
835 ++pos->cur_index;
836 }
837
838 if (pos->packet_size == 0 || pos->offset == EOF) {
839 goto end;
840 }
841
842 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
843 viewer_stream->id);
844 ret = get_data_packet(session->ctx, pos, viewer_stream,
845 be64toh(packet_index.offset),
846 packet_index.packet_size / CHAR_BIT);
847 if (ret == -2) {
848 goto retry;
849 } else if (ret < 0) {
850 pos->offset = EOF;
851 fprintf(stderr, "[error] get_data_packet failed\n");
852 return;
853 }
854
855 printf_verbose("Index received : packet_size : %" PRIu64
856 ", offset %" PRIu64 ", content_size %" PRIu64
857 ", timestamp_end : %" PRIu64 "\n",
858 packet_index.packet_size, packet_index.offset,
859 packet_index.content_size, packet_index.timestamp_end);
860
861 /* update trace_packet_header and stream_packet_context */
862 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
863 /* Read packet header */
864 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
865 if (ret) {
866 pos->offset = EOF;
867 fprintf(stderr, "[error] trace packet header read failed\n");
868 goto end;
869 }
870 }
871 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
872 /* Read packet context */
873 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
874 if (ret) {
875 pos->offset = EOF;
876 fprintf(stderr, "[error] stream packet context read failed\n");
877 goto end;
878 }
879 }
880 pos->data_offset = pos->offset;
881
882end:
883 return;
884}
885
886static int del_traces(gpointer key, gpointer value, gpointer user_data)
887{
888 struct bt_context *bt_ctx = user_data;
889 struct lttng_live_ctf_trace *trace = value;
890 int ret;
891
892 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
893 if (ret < 0)
894 fprintf(stderr, "[error] removing trace from context\n");
895
896 /* remove the key/value pair from the HT. */
897 return 1;
898}
899
900static void add_traces(gpointer key, gpointer value, gpointer user_data)
901{
902 int i, ret, total_metadata = 0;
903 uint64_t metadata_len;
904 struct bt_context *bt_ctx = user_data;
905 struct lttng_live_ctf_trace *trace = value;
906 struct lttng_live_viewer_stream *stream;
907 struct bt_mmap_stream *new_mmap_stream;
908 struct bt_mmap_stream_list mmap_list;
909 struct lttng_live_ctx *ctx = NULL;
910
911 BT_INIT_LIST_HEAD(&mmap_list.head);
912
913 for (i = 0; i < trace->streams->len; i++) {
914 stream = g_ptr_array_index(trace->streams, i);
915 ctx = stream->session->ctx;
916
917 if (!stream->metadata_flag) {
918 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
919 new_mmap_stream->priv = (void *) stream;
920 new_mmap_stream->fd = -1;
921 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
922 } else {
923 /* Get all possible metadata before starting */
924 do {
925 ret = get_new_metadata(ctx, stream,
926 &metadata_len);
927 if (ret == 0) {
928 total_metadata += metadata_len;
929 }
930 } while (ret == 0 || total_metadata == 0);
931 trace->metadata_fp = fopen(stream->path, "r");
932 }
933 }
934
935 if (!trace->metadata_fp) {
936 fprintf(stderr, "[error] No metadata stream opened\n");
937 goto end_free;
938 }
939
940 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
941 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
942 if (ret < 0) {
943 fprintf(stderr, "[error] Error adding trace\n");
944 goto end_free;
945 }
946 trace->trace_id = ret;
947
948 goto end;
949
950end_free:
951 bt_context_put(bt_ctx);
952end:
953 return;
954}
955
956void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
957{
958 int ret, active_session = 0;
959 struct bt_context *bt_ctx;
960
961 bt_ctx = bt_context_create();
962 if (!bt_ctx) {
963 fprintf(stderr, "[error] bt_context_create allocation\n");
964 goto end;
965 }
966
967 /*
968 * As long as the session is active, we try to reattach to it,
969 * even if all the streams get closed.
970 */
971// do {
972 do {
973 ret = lttng_live_attach_session(ctx, session_id);
974 printf_verbose("Attaching session returns %d\n", ret);
975 if (ret < 0) {
976 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
977 if (active_session)
978 goto end_free;
979 fprintf(stderr, "[error] Unknown "
980 "session ID\n");
981 }
982 goto end_free;
983 } else {
984 active_session = 1;
985 }
986 } while (ctx->session->stream_count == 0);
987
988 g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
989
990 ret = check_requirements(bt_ctx);
991 if (ret < 0) {
992 fprintf(stderr, "[error] some mandatory contexts "
993 "were missing, exiting.\n");
994 goto end;
995 }
996
997 if (!opt_textdump) {
998 pthread_create(&display_thread, NULL, ncurses_display,
999 (void *) NULL);
1000 pthread_create(&timer_thread, NULL, refresh_thread,
1001 (void *) NULL);
1002 }
1003
1004 iter_trace(bt_ctx);
1005
1006 g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
1007// } while (active_session);
1008
1009end_free:
1010 bt_context_put(bt_ctx);
1011end:
1012 return;
1013}
This page took 0.025127 seconds and 4 git commands to generate.