begin support for UST in live, not yet perfect but starting to work
[lttngtop.git] / src / liblttng-live.c
CommitLineData
12a91e9d
JD
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24#include <sys/socket.h>
25#include <sys/types.h>
26#include <netinet/in.h>
27#include <netdb.h>
28#include <stdio.h>
29#include <string.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <errno.h>
33#include <inttypes.h>
34#include <fcntl.h>
35#include <sys/mman.h>
36
37#include <babeltrace/ctf/ctf-index.h>
38
39#include <babeltrace/babeltrace.h>
40#include <babeltrace/ctf/events.h>
41#include <babeltrace/ctf/callbacks.h>
42#include <babeltrace/ctf/iterator.h>
43
44/* for packet_index */
45#include <babeltrace/ctf/types.h>
46
47#include <babeltrace/ctf/metadata.h>
48#include <babeltrace/ctf-text/types.h>
49#include <babeltrace/ctf/events-internal.h>
50/*
51#include <formats/ctf/events-private.h>
d81179de 52replaced with
12a91e9d
JD
53*/
54#include "network-live.h"
d81179de 55#include "lttngtop.h"
12a91e9d 56
e3db2966 57#include "liblttng-live.h"
0c445945 58#include "lttng-viewer-abi.h"
12a91e9d
JD
59
60/*
61 * Memory allocation zeroed
62 */
63#define zmalloc(x) calloc(1, x)
64
65#ifndef max_t
66#define max_t(type, a, b) \
67 ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
68#endif
69
d81179de
JD
70static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
71 size_t index, int whence);
72static void add_traces(gpointer key, gpointer value, gpointer user_data);
73static int del_traces(gpointer key, gpointer value, gpointer user_data);
74
75int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
12a91e9d
JD
76{
77 struct hostent *host;
78 struct sockaddr_in server_addr;
79 int ret;
80
d81179de 81 host = gethostbyname(ctx->relay_hostname);
12a91e9d
JD
82 if (!host) {
83 ret = -1;
84 goto end;
85 }
86
87 if ((ctx->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
88 perror("Socket");
89 ret = -1;
90 goto end;
91 }
92
93 server_addr.sin_family = AF_INET;
d81179de 94 server_addr.sin_port = htons(ctx->port);
12a91e9d
JD
95 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
96 bzero(&(server_addr.sin_zero), 8);
97
98 if (connect(ctx->control_sock, (struct sockaddr *) &server_addr,
99 sizeof(struct sockaddr)) == -1) {
100 perror("Connect");
101 ret = -1;
102 goto end;
103 }
104
105 ret = 0;
106
107end:
108 return ret;
109}
110
111int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
112{
113 struct lttng_viewer_cmd cmd;
114 struct lttng_viewer_connect connect;
115 int ret;
116 ssize_t ret_len;
117
118 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
119 cmd.data_size = sizeof(connect);
120 cmd.cmd_version = 0;
121
122 connect.viewer_session_id = -1ULL; /* will be set on recv */
123 connect.major = htobe32(LTTNG_LIVE_MAJOR);
124 connect.minor = htobe32(LTTNG_LIVE_MINOR);
125 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
126
127 do {
128 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
129 } while (ret_len < 0 && errno == EINTR);
130 if (ret_len < 0) {
131 fprintf(stderr, "[error] Error sending cmd\n");
132 ret = ret_len;
133 goto error;
134 }
135 assert(ret_len == sizeof(cmd));
136
137 do {
138 ret_len = send(ctx->control_sock, &connect, sizeof(connect), 0);
139 } while (ret_len < 0 && errno == EINTR);
140 if (ret_len < 0) {
141 fprintf(stderr, "[error] Error sending version\n");
142 ret = ret_len;
143 goto error;
144 }
145 assert(ret_len == sizeof(connect));
146
147 do {
148 ret_len = recv(ctx->control_sock, &connect, sizeof(connect), 0);
149 } while (ret_len < 0 && errno == EINTR);
150 if (ret_len < 0) {
151 fprintf(stderr, "[error] Error receiving version\n");
152 ret = ret_len;
153 goto error;
154 }
155 assert(ret_len == sizeof(connect));
156
157 printf_verbose("Received viewer session ID : %" PRIu64 "\n",
158 be64toh(connect.viewer_session_id));
159 printf_verbose("Relayd version : %u.%u\n", be32toh(connect.major),
160 be32toh(connect.minor));
161
162 ret = 0;
163
164error:
165 return ret;
166}
167
d81179de
JD
168static
169void free_session_list(GPtrArray *session_list)
170{
171 int i;
172 struct lttng_live_relay_session *relay_session;
173
174 for (i = 0; i < session_list->len; i++) {
175 relay_session = g_ptr_array_index(session_list, i);
176 free(relay_session->name);
177 free(relay_session->hostname);
178 }
179 g_ptr_array_free(session_list, TRUE);
180}
181
182static
183void print_session_list(GPtrArray *session_list, const char *path)
184{
185 int i;
186 struct lttng_live_relay_session *relay_session;
187
188 for (i = 0; i < session_list->len; i++) {
189 relay_session = g_ptr_array_index(session_list, i);
190 fprintf(stdout, "%s/host/%s/%s (timer = %u, "
191 "%u stream(s), %u client(s) connected)\n",
192 path, relay_session->hostname,
193 relay_session->name, relay_session->timer,
194 relay_session->streams, relay_session->clients);
195 }
196}
197
198static
199void update_session_list(GPtrArray *session_list, char *hostname,
200 char *session_name, uint32_t streams, uint32_t clients,
201 uint32_t timer)
202{
203 int i, found = 0;
204 struct lttng_live_relay_session *relay_session;
205
206 for (i = 0; i < session_list->len; i++) {
207 relay_session = g_ptr_array_index(session_list, i);
208 if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
209 strncmp(relay_session->name,
210 session_name, NAME_MAX) == 0) {
211 relay_session->streams += streams;
212 if (relay_session->clients < clients)
213 relay_session->clients = clients;
214 found = 1;
215 break;
216 }
217 }
218 if (found)
219 return;
220
221 relay_session = g_new0(struct lttng_live_relay_session, 1);
222 relay_session->hostname = strndup(hostname, NAME_MAX);
223 relay_session->name = strndup(session_name, NAME_MAX);
224 relay_session->clients = clients;
225 relay_session->streams = streams;
226 relay_session->timer = timer;
227 g_ptr_array_add(session_list, relay_session);
228}
229
12a91e9d
JD
230int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
231{
232 struct lttng_viewer_cmd cmd;
233 struct lttng_viewer_list_sessions list;
234 struct lttng_viewer_session lsession;
d81179de 235 int i, ret, sessions_count, print_list = 0;
12a91e9d 236 ssize_t ret_len;
d81179de
JD
237 uint64_t session_id;
238 GPtrArray *session_list = NULL;
239
240 if (strlen(ctx->session_name) == 0) {
241 print_list = 1;
242 session_list = g_ptr_array_new();
243 }
12a91e9d
JD
244
245 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
246 cmd.data_size = 0;
247 cmd.cmd_version = 0;
248
249 do {
250 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
251 } while (ret_len < 0 && errno == EINTR);
252 if (ret_len < 0) {
253 fprintf(stderr, "[error] Error sending cmd\n");
254 ret = ret_len;
255 goto error;
256 }
257 assert(ret_len == sizeof(cmd));
258
259 do {
260 ret_len = recv(ctx->control_sock, &list, sizeof(list), 0);
261 } while (ret_len < 0 && errno == EINTR);
262 if (ret_len < 0) {
263 fprintf(stderr, "[error] Error receiving session list\n");
264 ret = ret_len;
265 goto error;
266 }
267 assert(ret_len == sizeof(list));
268
269 sessions_count = be32toh(list.sessions_count);
12a91e9d
JD
270 for (i = 0; i < sessions_count; i++) {
271 do {
272 ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
273 } while (ret_len < 0 && errno == EINTR);
274 if (ret_len < 0) {
275 fprintf(stderr, "[error] Error receiving session\n");
276 ret = ret_len;
277 goto error;
278 }
279 assert(ret_len == sizeof(lsession));
280 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
281 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
d81179de
JD
282 session_id = be64toh(lsession.id);
283
284 if (print_list) {
285 update_session_list(session_list,
286 lsession.hostname,
287 lsession.session_name,
288 be32toh(lsession.streams),
289 be32toh(lsession.clients),
290 be32toh(lsession.live_timer));
291 } else {
292 if ((strncmp(lsession.session_name, ctx->session_name,
293 NAME_MAX) == 0) && (strncmp(lsession.hostname,
294 ctx->traced_hostname, NAME_MAX) == 0)) {
295 printf_verbose("Reading from session %" PRIu64 "\n",
296 session_id);
297 g_array_append_val(ctx->session_ids,
298 session_id);
299 }
300 }
301 }
12a91e9d 302
d81179de
JD
303 if (print_list) {
304 print_session_list(session_list, path);
305 free_session_list(session_list);
12a91e9d
JD
306 }
307
308 ret = 0;
309
310error:
311 return ret;
312}
313
314int lttng_live_ctf_trace_assign(struct lttng_live_viewer_stream *stream,
315 uint64_t ctf_trace_id)
316{
317 struct lttng_live_ctf_trace *trace;
318 int ret = 0;
319
320 trace = g_hash_table_lookup(stream->session->ctf_traces,
321 (gpointer) ctf_trace_id);
322 if (!trace) {
323 trace = g_new0(struct lttng_live_ctf_trace, 1);
324 trace->ctf_trace_id = ctf_trace_id;
325 trace->streams = g_ptr_array_new();
326 g_hash_table_insert(stream->session->ctf_traces,
327 (gpointer) ctf_trace_id,
328 trace);
329 }
330 if (stream->metadata_flag)
331 trace->metadata_stream = stream;
332
333 stream->ctf_trace = trace;
334 g_ptr_array_add(trace->streams, stream);
335
336 return ret;
337}
338
339int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
340{
341 struct lttng_viewer_cmd cmd;
342 struct lttng_viewer_attach_session_request rq;
343 struct lttng_viewer_attach_session_response rp;
344 struct lttng_viewer_stream stream;
345 int ret, i;
346 ssize_t ret_len;
347
348 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
349 cmd.data_size = sizeof(rq);
350 cmd.cmd_version = 0;
351
352 memset(&rq, 0, sizeof(rq));
353 rq.session_id = htobe64(id);
354 // TODO: add cmd line parameter to select seek beginning
355 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
356 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
357
358 do {
359 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
360 } while (ret_len < 0 && errno == EINTR);
361 if (ret_len < 0) {
362 fprintf(stderr, "[error] Error sending cmd\n");
363 ret = ret_len;
364 goto error;
365 }
366 assert(ret_len == sizeof(cmd));
367
368 do {
369 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
370 } while (ret_len < 0 && errno == EINTR);
371 if (ret_len < 0) {
372 fprintf(stderr, "[error] Error sending attach request\n");
373 ret = ret_len;
374 goto error;
375 }
376 assert(ret_len == sizeof(rq));
377
378 do {
379 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
380 } while (ret_len < 0 && errno == EINTR);
381 if (ret_len < 0) {
382 fprintf(stderr, "[error] Error receiving attach response\n");
383 ret = ret_len;
384 goto error;
385 }
386 assert(ret_len == sizeof(rp));
387
388 switch(be32toh(rp.status)) {
389 case LTTNG_VIEWER_ATTACH_OK:
390 break;
391 case LTTNG_VIEWER_ATTACH_UNK:
392 ret = -LTTNG_VIEWER_ATTACH_UNK;
393 goto end;
394 case LTTNG_VIEWER_ATTACH_ALREADY:
395 fprintf(stderr, "[error] Already a viewer attached\n");
396 ret = -1;
397 goto end;
398 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
399 fprintf(stderr, "[error] Not a live session\n");
400 ret = -1;
401 goto end;
402 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
403 fprintf(stderr, "[error] Wrong seek parameter\n");
404 ret = -1;
405 goto end;
406 default:
407 fprintf(stderr, "[error] Unknown attach return code %u\n",
408 be32toh(rp.status));
409 ret = -1;
410 goto end;
411 }
412 if (be32toh(rp.status) != LTTNG_VIEWER_ATTACH_OK) {
413 ret = -1;
414 goto end;
415 }
416
d81179de 417 ctx->session->stream_count += be32toh(rp.streams_count);
12a91e9d
JD
418 /*
419 * When the session is created but not started, we do an active wait
420 * until it starts. It allows the viewer to start processing the trace
421 * as soon as the session starts.
422 */
423 if (ctx->session->stream_count == 0) {
424 ret = 0;
425 goto end;
426 }
8fe4d0cf
JD
427 printf_verbose("Waiting for %u streams:\n",
428 be32toh(rp.streams_count));
12a91e9d
JD
429 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
430 ctx->session->stream_count);
431 for (i = 0; i < be32toh(rp.streams_count); i++) {
432 do {
433 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
434 } while (ret_len < 0 && errno == EINTR);
435 if (ret_len < 0) {
436 fprintf(stderr, "[error] Error receiving stream\n");
437 ret = ret_len;
438 goto error;
439 }
440 assert(ret_len == sizeof(stream));
441 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
442 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
443
444 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
445 be64toh(stream.id), stream.path_name,
446 stream.channel_name);
447 ctx->session->streams[i].id = be64toh(stream.id);
448 ctx->session->streams[i].session = ctx->session;
449
450 ctx->session->streams[i].first_read = 1;
451 ctx->session->streams[i].mmap_size = 0;
452
453 if (be32toh(stream.metadata_flag)) {
454 char *path;
455
456 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
457 if (!path) {
458 perror("strdup");
459 ret = -1;
460 goto error;
461 }
462 if (!mkdtemp(path)) {
463 perror("mkdtemp");
464 free(path);
465 ret = -1;
466 goto error;
467 }
468 ctx->session->streams[i].metadata_flag = 1;
469 snprintf(ctx->session->streams[i].path,
470 sizeof(ctx->session->streams[i].path),
471 "%s/%s", path,
472 stream.channel_name);
473 ret = open(ctx->session->streams[i].path,
474 O_WRONLY | O_CREAT | O_TRUNC,
475 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
476 if (ret < 0) {
477 perror("open");
478 free(path);
479 goto error;
480 }
481 ctx->session->streams[i].fd = ret;
482 free(path);
483 }
484 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
485 be64toh(stream.ctf_trace_id));
486 if (ret < 0) {
487 goto error;
488 }
489
490 }
491 ret = 0;
492
493end:
494error:
495 return ret;
496}
497
d81179de
JD
498static
499int ask_new_streams(struct lttng_live_ctx *ctx)
500{
501 int i, ret = 0;
502 uint64_t id;
503
504restart:
505 for (i = 0; i < ctx->session_ids->len; i++) {
506 id = g_array_index(ctx->session_ids, uint64_t, i);
507 ret = lttng_live_get_new_streams(ctx, id);
508 printf_verbose("Asking for new streams returns %d\n",
509 ret);
510 if (ret < 0) {
511 if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
512 printf_verbose("Session %" PRIu64 " closed\n",
513 id);
514 /*
515 * The streams have already been closed during
516 * the reading, so we only need to get rid of
517 * the trace in our internal table of sessions.
518 */
519 g_array_remove_index(ctx->session_ids, i);
520 /*
521 * We can't continue iterating on the g_array
522 * after a remove, we have to start again.
523 */
524 goto restart;
525 } else {
526 ret = -1;
527 goto end;
528 }
529 }
530 }
531
532end:
533 return ret;
534}
535
12a91e9d
JD
536static
537int get_data_packet(struct lttng_live_ctx *ctx,
538 struct ctf_stream_pos *pos,
539 struct lttng_live_viewer_stream *stream, uint64_t offset,
540 uint64_t len)
541{
542 struct lttng_viewer_cmd cmd;
543 struct lttng_viewer_get_packet rq;
544 struct lttng_viewer_trace_packet rp;
545 ssize_t ret_len;
546 int ret;
547
548 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
549 cmd.data_size = sizeof(rq);
550 cmd.cmd_version = 0;
551
552 memset(&rq, 0, sizeof(rq));
553 rq.stream_id = htobe64(stream->id);
554 /* Already in big endian. */
555 rq.offset = offset;
556 rq.len = htobe32(len);
557
558 do {
559 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
560 } while (ret_len < 0 && errno == EINTR);
561 if (ret_len < 0) {
562 fprintf(stderr, "[error] Error sending cmd\n");
563 ret = ret_len;
564 goto error;
565 }
566 assert(ret_len == sizeof(cmd));
567
568 do {
569 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
570 } while (ret_len < 0 && errno == EINTR);
571 if (ret_len < 0) {
572 fprintf(stderr, "[error] Error sending get_data_packet request\n");
573 ret = ret_len;
574 goto error;
575 }
576 assert(ret_len == sizeof(rq));
577
578 do {
579 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
580 } while (ret_len < 0 && errno == EINTR);
581 if (ret_len < 0) {
582 fprintf(stderr, "[error] Error receiving data response\n");
583 ret = ret_len;
584 goto error;
585 }
586 if (ret_len != sizeof(rp)) {
587 fprintf(stderr, "[error] get_data_packet: expected %" PRId64
588 ", received %" PRId64 "\n", ret_len,
589 sizeof(rp));
590 ret = -1;
591 goto error;
592 }
593
594 rp.flags = be32toh(rp.flags);
595
596 switch (be32toh(rp.status)) {
597 case LTTNG_VIEWER_GET_PACKET_OK:
598 len = be32toh(rp.len);
599 printf_verbose("get_data_packet: Ok, packet size : %" PRIu64
600 "\n", len);
601 break;
602 case LTTNG_VIEWER_GET_PACKET_RETRY:
603 printf_verbose("get_data_packet: retry\n");
604 ret = -1;
605 goto end;
606 case LTTNG_VIEWER_GET_PACKET_ERR:
607 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
608 printf_verbose("get_data_packet: new metadata needed\n");
609 ret = 0;
610 goto end;
611 }
d81179de 612 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
8fe4d0cf 613 printf_verbose("get_data_packet: new streams needed\n");
d81179de
JD
614 ret = ask_new_streams(ctx);
615 if (ret < 0)
616 goto error;
617 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
618 ctx->bt_ctx);
8fe4d0cf
JD
619 ret = 0;
620 goto end;
d81179de 621 }
12a91e9d
JD
622 fprintf(stderr, "[error] get_data_packet: error\n");
623 ret = -1;
624 goto end;
625 case LTTNG_VIEWER_GET_PACKET_EOF:
626 ret = -2;
627 goto error;
628 default:
629 printf_verbose("get_data_packet: unknown\n");
630 assert(0);
631 ret = -1;
632 goto end;
633 }
634
635 if (len <= 0) {
636 ret = -1;
637 goto end;
638 }
639
640 if (len > stream->mmap_size) {
641 uint64_t new_size;
642
643 new_size = max_t(uint64_t, len, stream->mmap_size << 1);
644 if (pos->base_mma) {
645 /* unmap old base */
646 ret = munmap_align(pos->base_mma);
647 if (ret) {
648 fprintf(stderr, "[error] Unable to unmap old base: %s.\n",
649 strerror(errno));
650 ret = -1;
651 goto error;
652 }
653 pos->base_mma = NULL;
654 }
655 pos->base_mma = mmap_align(new_size,
656 PROT_READ | PROT_WRITE,
657 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
658 if (pos->base_mma == MAP_FAILED) {
659 fprintf(stderr, "[error] mmap error %s.\n",
660 strerror(errno));
661 pos->base_mma = NULL;
662 ret = -1;
663 goto error;
664 }
665
666 stream->mmap_size = new_size;
667 printf_verbose("Expanding stream mmap size to %" PRIu64 " bytes\n",
668 stream->mmap_size);
669 }
670
671 do {
672 ret_len = recv(ctx->control_sock,
673 mmap_align_addr(pos->base_mma), len,
674 MSG_WAITALL);
675 } while (ret_len < 0 && errno == EINTR);
676 if (ret_len < 0) {
677 fprintf(stderr, "[error] Error receiving trace packet\n");
678 ret = ret_len;
679 goto error;
680 }
681 assert(ret_len == len);
682
683end:
684error:
685 return ret;
686}
687
688/*
689 * Return number of metadata bytes written or a negative value on error.
690 */
691static
692int get_new_metadata(struct lttng_live_ctx *ctx,
693 struct lttng_live_viewer_stream *viewer_stream,
694 uint64_t *metadata_len)
695{
696 uint64_t len = 0;
697 int ret;
698 struct lttng_viewer_cmd cmd;
699 struct lttng_viewer_get_metadata rq;
700 struct lttng_viewer_metadata_packet rp;
701 struct lttng_live_viewer_stream *metadata_stream;
702 char *data = NULL;
703 ssize_t ret_len;
704
705 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
706 cmd.data_size = sizeof(rq);
707 cmd.cmd_version = 0;
708
709 metadata_stream = viewer_stream->ctf_trace->metadata_stream;
710 rq.stream_id = htobe64(metadata_stream->id);
711
712 do {
713 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
714 } while (ret_len < 0 && errno == EINTR);
715 if (ret_len < 0) {
716 fprintf(stderr, "[error] Error sending cmd\n");
717 ret = ret_len;
718 goto error;
719 }
720 assert(ret_len == sizeof(cmd));
721
722 do {
723 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
724 } while (ret_len < 0 && errno == EINTR);
725 if (ret_len < 0) {
726 fprintf(stderr, "[error] Error sending get_metadata request\n");
727 ret = ret_len;
728 goto error;
729 }
730 assert(ret_len == sizeof(rq));
731
732 do {
733 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
734 } while (ret_len < 0 && errno == EINTR);
735 if (ret_len < 0) {
736 fprintf(stderr, "[error] Error receiving metadata response\n");
737 ret = ret_len;
738 goto error;
739 }
740 assert(ret_len == sizeof(rp));
741
742 switch (be32toh(rp.status)) {
743 case LTTNG_VIEWER_METADATA_OK:
744 printf_verbose("get_metadata : OK\n");
745 break;
746 case LTTNG_VIEWER_NO_NEW_METADATA:
747 printf_verbose("get_metadata : NO NEW\n");
748 ret = -1;
749 goto end;
750 case LTTNG_VIEWER_METADATA_ERR:
751 printf_verbose("get_metadata : ERR\n");
752 ret = -1;
753 goto end;
754 default:
755 printf_verbose("get_metadata : UNKNOWN\n");
756 ret = -1;
757 goto end;
758 }
759
760 len = be64toh(rp.len);
761 printf_verbose("Writing %" PRIu64" bytes to metadata\n", len);
762 if (len <= 0) {
763 ret = -1;
764 goto end;
765 }
766
767 data = zmalloc(len);
768 if (!data) {
769 perror("relay data zmalloc");
770 ret = -1;
771 goto error;
772 }
773 do {
774 ret_len = recv(ctx->control_sock, data, len, MSG_WAITALL);
775 } while (ret_len < 0 && errno == EINTR);
776 if (ret_len < 0) {
777 fprintf(stderr, "[error] Error receiving trace packet\n");
778 ret = ret_len;
779 free(data);
780 goto error;
781 }
782 assert(ret_len == len);
783
784 do {
785 ret_len = write(metadata_stream->fd, data, len);
786 } while (ret_len < 0 && errno == EINTR);
787 if (ret_len < 0) {
788 free(data);
789 ret = ret_len;
790 goto error;
791 }
792 assert(ret_len == len);
793
794 free(data);
795
796 *metadata_len = len;
797 ret = 0;
798end:
799error:
800 return ret;
801}
802
803/*
804 * Get one index for a stream.
805 *
806 * Returns 0 on success or a negative value on error.
807 */
808static
809int get_next_index(struct lttng_live_ctx *ctx,
810 struct lttng_live_viewer_stream *viewer_stream,
811 struct packet_index *index)
812{
813 struct lttng_viewer_cmd cmd;
814 struct lttng_viewer_get_next_index rq;
815 struct lttng_viewer_index rp;
816 int ret;
817 uint64_t metadata_len;
818 ssize_t ret_len;
819
820 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
821 cmd.data_size = sizeof(rq);
822 cmd.cmd_version = 0;
823
824 memset(&rq, 0, sizeof(rq));
825 rq.stream_id = htobe64(viewer_stream->id);
826
827retry:
828 do {
829 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
830 } while (ret_len < 0 && errno == EINTR);
831 if (ret_len < 0) {
832 fprintf(stderr, "[error] Error sending cmd\n");
833 ret = ret_len;
834 goto error;
835 }
836 assert(ret_len == sizeof(cmd));
837
838 do {
839 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
840 } while (ret_len < 0 && errno == EINTR);
841 if (ret_len < 0) {
842 fprintf(stderr, "[error] Error sending get_next_index request\n");
843 ret = ret_len;
844 goto error;
845 }
846 assert(ret_len == sizeof(rq));
847
848 do {
849 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
850 } while (ret_len < 0 && errno == EINTR);
851 if (ret_len < 0) {
852 fprintf(stderr, "[error] Error receiving index response\n");
853 ret = ret_len;
854 goto error;
855 }
856 assert(ret_len == sizeof(rp));
857
858 rp.flags = be32toh(rp.flags);
859
860 switch (be32toh(rp.status)) {
861 case LTTNG_VIEWER_INDEX_INACTIVE:
862 printf_verbose("get_next_index: inactive\n");
863 memset(index, 0, sizeof(struct packet_index));
864 index->timestamp_end = be64toh(rp.timestamp_end);
865 break;
866 case LTTNG_VIEWER_INDEX_OK:
867 printf_verbose("get_next_index: Ok, need metadata update : %u\n",
868 rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA);
869 index->offset = be64toh(rp.offset);
870 index->packet_size = be64toh(rp.packet_size);
871 index->content_size = be64toh(rp.content_size);
872 index->timestamp_begin = be64toh(rp.timestamp_begin);
873 index->timestamp_end = be64toh(rp.timestamp_end);
874 index->events_discarded = be64toh(rp.events_discarded);
875
876 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
877 printf_verbose("get_next_index: new metadata needed\n");
878 ret = get_new_metadata(ctx, viewer_stream,
879 &metadata_len);
880 if (ret < 0) {
881 goto error;
882 }
883 }
d81179de
JD
884 if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
885 ret = ask_new_streams(ctx);
886 if (ret < 0)
887 goto error;
888 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
889 ctx->bt_ctx);
890 }
12a91e9d
JD
891 break;
892 case LTTNG_VIEWER_INDEX_RETRY:
893 printf_verbose("get_next_index: retry\n");
71ba9228 894 sleep(0.1);
12a91e9d
JD
895 goto retry;
896 case LTTNG_VIEWER_INDEX_HUP:
897 printf_verbose("get_next_index: stream hung up\n");
898 viewer_stream->id = -1ULL;
899 viewer_stream->fd = -1;
900 index->offset = EOF;
d81179de 901 ctx->session->stream_count--;
12a91e9d
JD
902 break;
903 case LTTNG_VIEWER_INDEX_ERR:
904 fprintf(stderr, "[error] get_next_index: error\n");
905 ret = -1;
906 goto error;
907 default:
908 fprintf(stderr, "[error] get_next_index: unkwown value\n");
909 ret = -1;
910 goto error;
911 }
912
913 ret = 0;
914
915error:
916 return ret;
917}
918
d81179de 919static
12a91e9d
JD
920void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
921 int whence)
922{
923 struct ctf_stream_pos *pos;
924 struct ctf_file_stream *file_stream;
925 struct packet_index packet_index;
926 struct lttng_live_viewer_stream *viewer_stream;
927 struct lttng_live_session *session;
928 int ret;
929
930retry:
931 pos = ctf_pos(stream_pos);
932 file_stream = container_of(pos, struct ctf_file_stream, pos);
933 viewer_stream = (struct lttng_live_viewer_stream *) pos->priv;
934 session = viewer_stream->session;
935
936 printf_verbose("get_next_index for stream %" PRIu64 "\n", viewer_stream->id);
937 ret = get_next_index(session->ctx, viewer_stream, &packet_index);
938 if (ret < 0) {
939 pos->offset = EOF;
940 fprintf(stderr, "[error] get_next_index failed\n");
941 return;
942 }
943
944 pos->packet_size = packet_index.packet_size;
945 pos->content_size = packet_index.content_size;
946 pos->mmap_base_offset = 0;
947 if (packet_index.offset == EOF) {
948 pos->offset = EOF;
949 } else {
950 pos->offset = 0;
951 }
952
953 if (packet_index.content_size == 0) {
954 file_stream->parent.cycles_timestamp = packet_index.timestamp_end;
955 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
956 &file_stream->parent, packet_index.timestamp_end);
957 } else {
12a91e9d 958 file_stream->parent.cycles_timestamp = packet_index.timestamp_begin;
6c7065a7
JD
959 file_stream->parent.real_timestamp = ctf_get_real_timestamp(
960 &file_stream->parent, packet_index.timestamp_begin);
12a91e9d
JD
961 }
962
963 if (pos->packet_size == 0 || pos->offset == EOF) {
964 goto end;
965 }
966
967 printf_verbose("get_data_packet for stream %" PRIu64 "\n",
968 viewer_stream->id);
969 ret = get_data_packet(session->ctx, pos, viewer_stream,
970 be64toh(packet_index.offset),
971 packet_index.packet_size / CHAR_BIT);
972 if (ret == -2) {
973 goto retry;
974 } else if (ret < 0) {
975 pos->offset = EOF;
976 fprintf(stderr, "[error] get_data_packet failed\n");
977 return;
978 }
979
980 printf_verbose("Index received : packet_size : %" PRIu64
981 ", offset %" PRIu64 ", content_size %" PRIu64
982 ", timestamp_end : %" PRIu64 "\n",
983 packet_index.packet_size, packet_index.offset,
984 packet_index.content_size, packet_index.timestamp_end);
985
986 /* update trace_packet_header and stream_packet_context */
987 if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) {
988 /* Read packet header */
989 ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p);
990 if (ret) {
991 pos->offset = EOF;
992 fprintf(stderr, "[error] trace packet header read failed\n");
993 goto end;
994 }
995 }
996 if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) {
997 /* Read packet context */
998 ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p);
999 if (ret) {
1000 pos->offset = EOF;
1001 fprintf(stderr, "[error] stream packet context read failed\n");
1002 goto end;
1003 }
1004 }
1005 pos->data_offset = pos->offset;
1006
1007end:
1008 return;
1009}
1010
d81179de
JD
1011int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
1012{
1013 struct lttng_viewer_cmd cmd;
1014 struct lttng_viewer_create_session_response resp;
1015 int ret;
1016 ssize_t ret_len;
1017
1018 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1019 cmd.data_size = 0;
1020 cmd.cmd_version = 0;
1021
1022 do {
1023 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1024 } while (ret_len < 0 && errno == EINTR);
1025 if (ret_len < 0) {
1026 fprintf(stderr, "[error] Error sending cmd\n");
1027 ret = ret_len;
1028 goto error;
1029 }
1030 assert(ret_len == sizeof(cmd));
1031
1032 do {
1033 ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
1034 } while (ret_len < 0 && errno == EINTR);
1035 if (ret_len < 0) {
1036 fprintf(stderr, "[error] Error receiving create session reply\n");
1037 ret = ret_len;
1038 goto error;
1039 }
1040 assert(ret_len == sizeof(resp));
1041
1042 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1043 fprintf(stderr, "[error] Error creating viewer session\n");
1044 ret = -1;
1045 goto error;
1046 }
1047 ret = 0;
1048
1049error:
1050 return ret;
1051}
1052
1053static
1054int del_traces(gpointer key, gpointer value, gpointer user_data)
12a91e9d
JD
1055{
1056 struct bt_context *bt_ctx = user_data;
1057 struct lttng_live_ctf_trace *trace = value;
1058 int ret;
1059
1060 ret = bt_context_remove_trace(bt_ctx, trace->trace_id);
1061 if (ret < 0)
1062 fprintf(stderr, "[error] removing trace from context\n");
1063
1064 /* remove the key/value pair from the HT. */
1065 return 1;
1066}
1067
d81179de
JD
1068static
1069void add_traces(gpointer key, gpointer value, gpointer user_data)
12a91e9d
JD
1070{
1071 int i, ret, total_metadata = 0;
1072 uint64_t metadata_len;
1073 struct bt_context *bt_ctx = user_data;
1074 struct lttng_live_ctf_trace *trace = value;
1075 struct lttng_live_viewer_stream *stream;
1076 struct bt_mmap_stream *new_mmap_stream;
1077 struct bt_mmap_stream_list mmap_list;
1078 struct lttng_live_ctx *ctx = NULL;
1079
d81179de
JD
1080 /*
1081 * We don't know how many streams we will receive for a trace, so
1082 * once we are done receiving the traces, we add all the traces
1083 * received to the bt_context.
1084 * We can receive streams during the attach command or the
1085 * get_new_streams, so we have to make sure not to add multiple
1086 * times the same traces.
1087 * If a trace is already in the context, we just skip this function.
1088 */
1089 if (trace->in_use)
1090 return;
1091
12a91e9d
JD
1092 BT_INIT_LIST_HEAD(&mmap_list.head);
1093
1094 for (i = 0; i < trace->streams->len; i++) {
1095 stream = g_ptr_array_index(trace->streams, i);
1096 ctx = stream->session->ctx;
1097
1098 if (!stream->metadata_flag) {
1099 new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream));
1100 new_mmap_stream->priv = (void *) stream;
1101 new_mmap_stream->fd = -1;
1102 bt_list_add(&new_mmap_stream->list, &mmap_list.head);
1103 } else {
1104 /* Get all possible metadata before starting */
1105 do {
1106 ret = get_new_metadata(ctx, stream,
1107 &metadata_len);
1108 if (ret == 0) {
1109 total_metadata += metadata_len;
1110 }
1111 } while (ret == 0 || total_metadata == 0);
1112 trace->metadata_fp = fopen(stream->path, "r");
1113 }
1114 }
1115
1116 if (!trace->metadata_fp) {
1117 fprintf(stderr, "[error] No metadata stream opened\n");
1118 goto end_free;
1119 }
1120
1121 ret = bt_context_add_trace(bt_ctx, NULL, "ctf",
1122 ctf_live_packet_seek, &mmap_list, trace->metadata_fp);
1123 if (ret < 0) {
1124 fprintf(stderr, "[error] Error adding trace\n");
1125 goto end_free;
1126 }
d81179de
JD
1127
1128 if (bt_ctx->current_iterator) {
1129 struct bt_trace_descriptor *td;
1130 struct bt_trace_handle *handle;
1131
1132 handle = (struct bt_trace_handle *) g_hash_table_lookup(
1133 bt_ctx->trace_handles,
1134 (gpointer) (unsigned long) ret);
1135 td = handle->td;
1136 bt_iter_add_trace(bt_ctx->current_iterator, td);
1137 }
1138
12a91e9d 1139 trace->trace_id = ret;
d81179de 1140 trace->in_use = 1;
12a91e9d
JD
1141
1142 goto end;
1143
1144end_free:
1145 bt_context_put(bt_ctx);
1146end:
1147 return;
1148}
1149
d81179de 1150int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
12a91e9d 1151{
d81179de
JD
1152 struct lttng_viewer_cmd cmd;
1153 struct lttng_viewer_new_streams_request rq;
1154 struct lttng_viewer_new_streams_response rp;
1155 struct lttng_viewer_stream stream;
1156 int ret, i;
1157 ssize_t ret_len;
1158 uint32_t stream_count;
12a91e9d 1159
d81179de
JD
1160 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1161 cmd.data_size = sizeof(rq);
1162 cmd.cmd_version = 0;
1163
1164 memset(&rq, 0, sizeof(rq));
1165 rq.session_id = htobe64(id);
1166
1167 do {
1168 ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
1169 } while (ret_len < 0 && errno == EINTR);
1170 if (ret_len < 0) {
1171 fprintf(stderr, "[error] Error sending cmd\n");
1172 ret = ret_len;
1173 goto error;
1174 }
1175 assert(ret_len == sizeof(cmd));
1176
1177 do {
1178 ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
1179 } while (ret_len < 0 && errno == EINTR);
1180 if (ret_len < 0) {
1181 fprintf(stderr, "[error] Error sending get_new_streams request\n");
1182 ret = ret_len;
1183 goto error;
1184 }
1185 assert(ret_len == sizeof(rq));
1186
1187 do {
1188 ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
1189 } while (ret_len < 0 && errno == EINTR);
1190 if (ret_len < 0) {
1191 fprintf(stderr, "[error] Error receiving get_new_streams response\n");
1192 ret = ret_len;
1193 goto error;
1194 }
1195 assert(ret_len == sizeof(rp));
1196
1197 switch(be32toh(rp.status)) {
1198 case LTTNG_VIEWER_NEW_STREAMS_OK:
1199 break;
1200 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1201 ret = 0;
1202 goto end;
1203 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1204 ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
1205 goto end;
1206 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1207 fprintf(stderr, "[error] get_new_streams error\n");
1208 ret = -1;
1209 goto end;
1210 default:
1211 fprintf(stderr, "[error] Unknown return code %u\n",
1212 be32toh(rp.status));
1213 ret = -1;
12a91e9d
JD
1214 goto end;
1215 }
1216
d81179de
JD
1217 stream_count = be32toh(rp.streams_count);
1218 ctx->session->stream_count += stream_count;
12a91e9d 1219 /*
d81179de
JD
1220 * When the session is created but not started, we do an active wait
1221 * until it starts. It allows the viewer to start processing the trace
1222 * as soon as the session starts.
12a91e9d 1223 */
d81179de
JD
1224 if (ctx->session->stream_count == 0) {
1225 ret = 0;
1226 goto end;
1227 }
1228 printf_verbose("Waiting for %" PRIu64 " streams:\n",
1229 ctx->session->stream_count);
1230 ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
1231 ctx->session->stream_count);
1232 for (i = 0; i < stream_count; i++) {
12a91e9d 1233 do {
d81179de
JD
1234 ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
1235 } while (ret_len < 0 && errno == EINTR);
1236 if (ret_len < 0) {
1237 fprintf(stderr, "[error] Error receiving stream\n");
1238 ret = ret_len;
1239 goto error;
1240 }
1241 assert(ret_len == sizeof(stream));
1242 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1243 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1244
1245 printf_verbose(" stream %" PRIu64 " : %s/%s\n",
1246 be64toh(stream.id), stream.path_name,
1247 stream.channel_name);
1248 ctx->session->streams[i].id = be64toh(stream.id);
1249 ctx->session->streams[i].session = ctx->session;
1250
1251 ctx->session->streams[i].first_read = 1;
1252 ctx->session->streams[i].mmap_size = 0;
1253
1254 if (be32toh(stream.metadata_flag)) {
1255 char *path;
1256
1257 path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
1258 if (!path) {
1259 perror("strdup");
1260 ret = -1;
1261 goto error;
1262 }
1263 if (!mkdtemp(path)) {
1264 perror("mkdtemp");
1265 free(path);
1266 ret = -1;
1267 goto error;
1268 }
1269 ctx->session->streams[i].metadata_flag = 1;
1270 snprintf(ctx->session->streams[i].path,
1271 sizeof(ctx->session->streams[i].path),
1272 "%s/%s", path,
1273 stream.channel_name);
1274 ret = open(ctx->session->streams[i].path,
1275 O_WRONLY | O_CREAT | O_TRUNC,
1276 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
12a91e9d 1277 if (ret < 0) {
d81179de
JD
1278 perror("open");
1279 free(path);
1280 goto error;
12a91e9d 1281 }
d81179de
JD
1282 ctx->session->streams[i].fd = ret;
1283 free(path);
1284 }
1285 ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
1286 be64toh(stream.ctf_trace_id));
1287 if (ret < 0) {
1288 goto error;
1289 }
1290
1291 }
1292 ret = 0;
1293
1294end:
1295error:
1296 return ret;
1297}
12a91e9d 1298
d81179de
JD
1299void lttng_live_read(struct lttng_live_ctx *ctx)
1300{
1301 int ret, i;
1302#if 0
1303 struct bt_ctf_iter *iter;
1304 const struct bt_ctf_event *event;
1305 struct bt_iter_pos begin_pos;
1306 struct bt_trace_descriptor *td_write;
1307 struct bt_format *fmt_write;
1308 struct ctf_text_stream_pos *sout;
1309#endif
1310 uint64_t id;
12a91e9d 1311
d81179de
JD
1312 ctx->bt_ctx = bt_context_create();
1313 if (!ctx->bt_ctx) {
1314 fprintf(stderr, "[error] bt_context_create allocation\n");
1315 goto end;
1316 }
1317
1318#if 0
1319 fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
1320 if (!fmt_write) {
1321 fprintf(stderr, "[error] ctf-text error\n");
1322 goto end;
1323 }
1324
1325 td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
1326 if (!td_write) {
1327 fprintf(stderr, "[error] Error opening output trace\n");
1328 goto end_free;
1329 }
1330
1331 sout = container_of(td_write, struct ctf_text_stream_pos,
1332 trace_descriptor);
1333 if (!sout->parent.event_cb)
1334 goto end_free;
1335#endif
1336
1337 ret = lttng_live_create_viewer_session(ctx);
1338 if (ret < 0) {
1339 goto end_free;
1340 }
1341
1342 for (i = 0; i < ctx->session_ids->len; i++) {
1343 id = g_array_index(ctx->session_ids, uint64_t, i);
1344 printf_verbose("Attaching to session %lu\n", id);
1345 ret = lttng_live_attach_session(ctx, id);
1346 printf_verbose("Attaching session returns %d\n", ret);
1347 if (ret < 0) {
1348 if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
1349 fprintf(stderr, "[error] Unknown session ID\n");
1350 }
1351 goto end_free;
1352 }
1353 }
1354
1355 /*
1356 * As long as the session is active, we try to get new streams.
1357 */
1358#if 0
1359 for (;;) {
1360 int flags;
1361#endif
1362
1363 while (!ctx->session->stream_count) {
1364 if (ctx->session_ids->len == 0)
1365 goto end_free;
1366 ret = ask_new_streams(ctx);
1367 if (ret < 0)
1368 goto end_free;
1369 }
1370
1371 g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
1372 ctx->bt_ctx);
1373
1374#if 0
1375 begin_pos.type = BT_SEEK_BEGIN;
1376 iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
1377 if (!iter) {
1378 fprintf(stderr, "[error] Iterator creation error\n");
1379 goto end;
1380 }
1381
1382 for (;;) {
1383 event = bt_ctf_iter_read_event_flags(iter, &flags);
1384 if (!(flags & BT_ITER_FLAG_RETRY)) {
1385 if (!event) {
1386 /* End of trace */
1387 break;
1388 }
1389 ret = sout->parent.event_cb(&sout->parent,
1390 event->parent->stream);
1391 if (ret) {
1392 fprintf(stderr, "[error] Writing "
1393 "event failed.\n");
1394 goto end_free;
1395 }
1396 }
1397 ret = bt_iter_next(bt_ctf_get_iter(iter));
1398 if (ret < 0) {
1399 goto end_free;
1400 }
1401 }
1402 bt_ctf_iter_destroy(iter);
1403#endif
1404 ret = check_requirements(ctx->bt_ctx);
8fe4d0cf 1405 if (ret < 0 && !valid_trace) {
12a91e9d
JD
1406 fprintf(stderr, "[error] some mandatory contexts "
1407 "were missing, exiting.\n");
1408 goto end;
1409 }
1410
1411 if (!opt_textdump) {
1412 pthread_create(&display_thread, NULL, ncurses_display,
1413 (void *) NULL);
1414 pthread_create(&timer_thread, NULL, refresh_thread,
1415 (void *) NULL);
1416 }
d81179de
JD
1417 iter_trace(ctx->bt_ctx);
1418 g_hash_table_foreach_remove(ctx->session->ctf_traces,
1419 del_traces, ctx->bt_ctx);
1420 ctx->session->stream_count = 0;
1421#if 0
1422 }
1423#endif
12a91e9d
JD
1424
1425end_free:
d81179de 1426 bt_context_put(ctx->bt_ctx);
12a91e9d
JD
1427end:
1428 return;
1429}
This page took 0.07584 seconds and 4 git commands to generate.