Docs: relayd: live: clarify ownership of vstream after viewer release
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
CommitLineData
d3e2ba59 1/*
ab5be9fa
MJ
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
d3e2ba59 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
d3e2ba59 7 *
d3e2ba59
JD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
8bb66c3c 11#include <fcntl.h>
d3e2ba59
JD
12#include <getopt.h>
13#include <grp.h>
8bb66c3c 14#include <inttypes.h>
d3e2ba59
JD
15#include <limits.h>
16#include <pthread.h>
17#include <signal.h>
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <sys/mman.h>
22#include <sys/mount.h>
23#include <sys/resource.h>
24#include <sys/socket.h>
25#include <sys/stat.h>
26#include <sys/types.h>
27#include <sys/wait.h>
8bb66c3c 28#include <unistd.h>
d3e2ba59 29#include <urcu/futex.h>
7591bab1 30#include <urcu/rculist.h>
8bb66c3c 31#include <urcu/uatomic.h>
bb1dcf01 32#include <string>
d3e2ba59 33
d3e2ba59 34#include <common/common.h>
8bb66c3c 35#include <common/compat/endian.h>
d3e2ba59
JD
36#include <common/compat/poll.h>
37#include <common/compat/socket.h>
38#include <common/defaults.h>
8bb66c3c
JG
39#include <common/fd-tracker/utils.h>
40#include <common/fs-handle.h>
d3e2ba59 41#include <common/futex.h>
2f8f53af 42#include <common/index/index.h>
d3e2ba59
JD
43#include <common/sessiond-comm/inet.h>
44#include <common/sessiond-comm/relayd.h>
8bb66c3c 45#include <common/sessiond-comm/sessiond-comm.h>
d3e2ba59
JD
46#include <common/uri.h>
47#include <common/utils.h>
8bb66c3c 48#include <lttng/lttng.h>
d3e2ba59
JD
49
50#include "cmd.h"
8bb66c3c
JG
51#include "connection.h"
52#include "ctf-trace.h"
53#include "health-relayd.h"
d3e2ba59
JD
54#include "live.h"
55#include "lttng-relayd.h"
2a174661 56#include "session.h"
8bb66c3c
JG
57#include "stream.h"
58#include "testpoint.h"
59#include "utils.h"
7591bab1 60#include "viewer-session.h"
8bb66c3c 61#include "viewer-stream.h"
7591bab1
MD
62
63#define SESSION_BUF_DEFAULT_COUNT 16
d3e2ba59
JD
64
65static struct lttng_uri *live_uri;
66
d3e2ba59
JD
67/*
68 * This pipe is used to inform the worker thread that a command is queued and
69 * ready to be processed.
70 */
58eb9381 71static int live_conn_pipe[2] = { -1, -1 };
d3e2ba59
JD
72
73/* Shared between threads */
74static int live_dispatch_thread_exit;
75
76static pthread_t live_listener_thread;
77static pthread_t live_dispatcher_thread;
78static pthread_t live_worker_thread;
79
80/*
81 * Relay command queue.
82 *
83 * The live_thread_listener and live_thread_dispatcher communicate with this
84 * queue.
85 */
58eb9381 86static struct relay_conn_queue viewer_conn_queue;
d3e2ba59
JD
87
88static uint64_t last_relay_viewer_session_id;
7591bab1
MD
89static pthread_mutex_t last_relay_viewer_session_id_lock =
90 PTHREAD_MUTEX_INITIALIZER;
d3e2ba59 91
7201079f
FD
92static
93const char *lttng_viewer_command_str(lttng_viewer_command cmd)
94{
95 switch (cmd) {
96 case LTTNG_VIEWER_CONNECT:
97 return "CONNECT";
98 case LTTNG_VIEWER_LIST_SESSIONS:
99 return "LIST_SESSIONS";
100 case LTTNG_VIEWER_ATTACH_SESSION:
101 return "ATTACH_SESSION";
102 case LTTNG_VIEWER_GET_NEXT_INDEX:
103 return "GET_NEXT_INDEX";
104 case LTTNG_VIEWER_GET_PACKET:
105 return "GET_PACKET";
106 case LTTNG_VIEWER_GET_METADATA:
107 return "GET_METADATA";
108 case LTTNG_VIEWER_GET_NEW_STREAMS:
109 return "GET_NEW_STREAMS";
110 case LTTNG_VIEWER_CREATE_SESSION:
111 return "CREATE_SESSION";
112 case LTTNG_VIEWER_DETACH_SESSION:
113 return "DETACH_SESSION";
114 default:
115 abort();
116 }
117}
118
d3e2ba59
JD
119/*
120 * Cleanup the daemon
121 */
122static
178a0557 123void cleanup_relayd_live(void)
d3e2ba59
JD
124{
125 DBG("Cleaning up");
126
d3e2ba59
JD
127 free(live_uri);
128}
129
2f8f53af
DG
130/*
131 * Receive a request buffer using a given socket, destination allocated buffer
132 * of length size.
133 *
134 * Return the size of the received message or else a negative value on error
135 * with errno being set by recvmsg() syscall.
136 */
137static
138ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
139{
140 ssize_t ret;
141
2f8f53af
DG
142 ret = sock->ops->recvmsg(sock, buf, size, 0);
143 if (ret < 0 || ret != size) {
144 if (ret == 0) {
145 /* Orderly shutdown. Not necessary to print an error. */
146 DBG("Socket %d did an orderly shutdown", sock->fd);
147 } else {
148 ERR("Relay failed to receive request.");
149 }
150 ret = -1;
151 }
152
153 return ret;
154}
155
156/*
157 * Send a response buffer using a given socket, source allocated buffer of
158 * length size.
159 *
160 * Return the size of the sent message or else a negative value on error with
161 * errno being set by sendmsg() syscall.
162 */
163static
164ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
165{
166 ssize_t ret;
167
2f8f53af
DG
168 ret = sock->ops->sendmsg(sock, buf, size, 0);
169 if (ret < 0) {
170 ERR("Relayd failed to send response.");
171 }
172
173 return ret;
174}
175
176/*
f04a971b
JD
177 * Atomically check if new streams got added in one of the sessions attached
178 * and reset the flag to 0.
2f8f53af
DG
179 *
180 * Returns 1 if new streams got added, 0 if nothing changed, a negative value
181 * on error.
182 */
183static
f04a971b 184int check_new_streams(struct relay_connection *conn)
2f8f53af 185{
2f8f53af 186 struct relay_session *session;
f04a971b
JD
187 unsigned long current_val;
188 int ret = 0;
2f8f53af 189
f04a971b
JD
190 if (!conn->viewer_session) {
191 goto end;
192 }
7591bab1
MD
193 rcu_read_lock();
194 cds_list_for_each_entry_rcu(session,
195 &conn->viewer_session->session_list,
196 viewer_session_node) {
197 if (!session_get(session)) {
198 continue;
199 }
f04a971b
JD
200 current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
201 ret = current_val;
7591bab1 202 session_put(session);
f04a971b
JD
203 if (ret == 1) {
204 goto end;
205 }
2f8f53af 206 }
f04a971b 207end:
7591bab1 208 rcu_read_unlock();
2f8f53af
DG
209 return ret;
210}
211
212/*
213 * Send viewer streams to the given socket. The ignore_sent_flag indicates if
214 * this function should ignore the sent flag or not.
215 *
216 * Return 0 on success or else a negative value.
217 */
218static
219ssize_t send_viewer_streams(struct lttcomm_sock *sock,
c06fdd95 220 uint64_t session_id, unsigned int ignore_sent_flag)
2f8f53af
DG
221{
222 ssize_t ret;
2f8f53af
DG
223 struct lttng_ht_iter iter;
224 struct relay_viewer_stream *vstream;
225
2f8f53af
DG
226 rcu_read_lock();
227
228 cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
229 stream_n.node) {
2a174661 230 struct ctf_trace *ctf_trace;
aaabc543 231 struct lttng_viewer_stream send_stream = {};
2a174661 232
2f8f53af
DG
233 health_code_update();
234
7591bab1
MD
235 if (!viewer_stream_get(vstream)) {
236 continue;
237 }
238
239 pthread_mutex_lock(&vstream->stream->lock);
2f8f53af 240 /* Ignore if not the same session. */
c06fdd95 241 if (vstream->stream->trace->session->id != session_id ||
2f8f53af 242 (!ignore_sent_flag && vstream->sent_flag)) {
7591bab1
MD
243 pthread_mutex_unlock(&vstream->stream->lock);
244 viewer_stream_put(vstream);
2f8f53af
DG
245 continue;
246 }
247
7591bab1
MD
248 ctf_trace = vstream->stream->trace;
249 send_stream.id = htobe64(vstream->stream->stream_handle);
2a174661 250 send_stream.ctf_trace_id = htobe64(ctf_trace->id);
7591bab1
MD
251 send_stream.metadata_flag = htobe32(
252 vstream->stream->is_metadata);
f8f011fb
MD
253 if (lttng_strncpy(send_stream.path_name, vstream->path_name,
254 sizeof(send_stream.path_name))) {
255 pthread_mutex_unlock(&vstream->stream->lock);
256 viewer_stream_put(vstream);
257 ret = -1; /* Error. */
258 goto end_unlock;
259 }
260 if (lttng_strncpy(send_stream.channel_name,
261 vstream->channel_name,
262 sizeof(send_stream.channel_name))) {
263 pthread_mutex_unlock(&vstream->stream->lock);
264 viewer_stream_put(vstream);
265 ret = -1; /* Error. */
266 goto end_unlock;
267 }
2f8f53af 268
7591bab1
MD
269 DBG("Sending stream %" PRIu64 " to viewer",
270 vstream->stream->stream_handle);
271 vstream->sent_flag = 1;
272 pthread_mutex_unlock(&vstream->stream->lock);
273
2f8f53af 274 ret = send_response(sock, &send_stream, sizeof(send_stream));
7591bab1 275 viewer_stream_put(vstream);
2f8f53af
DG
276 if (ret < 0) {
277 goto end_unlock;
278 }
2f8f53af
DG
279 }
280
281 ret = 0;
282
283end_unlock:
284 rcu_read_unlock();
285 return ret;
286}
287
288/*
289 * Create every viewer stream possible for the given session with the seek
290 * type. Three counters *can* be return which are in order the total amount of
291 * viewer stream of the session, the number of unsent stream and the number of
292 * stream created. Those counters can be NULL and thus will be ignored.
293 *
c06fdd95
JG
294 * session must be locked to ensure that we see either none or all initial
295 * streams for a session, but no intermediate state..
296 *
2f8f53af
DG
297 * Return 0 on success or else a negative value.
298 */
9edaf114
JG
299static int make_viewer_streams(struct relay_session *relay_session,
300 struct relay_viewer_session *viewer_session,
b66a15d1
JG
301 enum lttng_viewer_seek seek_t,
302 uint32_t *nb_total,
303 uint32_t *nb_unsent,
304 uint32_t *nb_created,
305 bool *closed)
2f8f53af
DG
306{
307 int ret;
2f8f53af 308 struct lttng_ht_iter iter;
2a174661 309 struct ctf_trace *ctf_trace;
9edaf114 310 struct relay_stream *relay_stream = NULL;
2f8f53af 311
a0377dfe 312 LTTNG_ASSERT(relay_session);
9edaf114 313 ASSERT_LOCKED(relay_session->lock);
2f8f53af 314
9edaf114 315 if (relay_session->connection_closed) {
bddf80e4
MD
316 *closed = true;
317 }
318
2f8f53af 319 /*
7591bab1
MD
320 * Create viewer streams for relay streams that are ready to be
321 * used for a the given session id only.
2f8f53af 322 */
2a174661 323 rcu_read_lock();
9edaf114
JG
324 cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter,
325 ctf_trace, node.node) {
123ed7c2 326 bool trace_has_metadata_stream = false;
2f8f53af
DG
327
328 health_code_update();
329
7591bab1 330 if (!ctf_trace_get(ctf_trace)) {
2f8f53af
DG
331 continue;
332 }
333
123ed7c2
FD
334 /*
335 * Iterate over all the streams of the trace to see if we have a
336 * metadata stream.
337 */
9edaf114
JG
338 cds_list_for_each_entry_rcu(relay_stream,
339 &ctf_trace->stream_list, stream_node)
123ed7c2 340 {
9edaf114
JG
341 bool is_metadata_stream;
342
343 pthread_mutex_lock(&relay_stream->lock);
344 is_metadata_stream = relay_stream->is_metadata;
345 pthread_mutex_unlock(&relay_stream->lock);
346
347 if (is_metadata_stream) {
123ed7c2
FD
348 trace_has_metadata_stream = true;
349 break;
350 }
351 }
352
9edaf114
JG
353 relay_stream = NULL;
354
123ed7c2
FD
355 /*
356 * If there is no metadata stream in this trace at the moment
357 * and we never sent one to the viewer, skip the trace. We
358 * accept that the viewer will not see this trace at all.
359 */
360 if (!trace_has_metadata_stream &&
361 !ctf_trace->metadata_stream_sent_to_viewer) {
3d7708ff
JG
362 ctf_trace_put(ctf_trace);
363 continue;
123ed7c2
FD
364 }
365
9edaf114
JG
366 cds_list_for_each_entry_rcu(relay_stream,
367 &ctf_trace->stream_list, stream_node)
368 {
369 struct relay_viewer_stream *viewer_stream;
2a174661 370
9edaf114 371 if (!stream_get(relay_stream)) {
2a174661
DG
372 continue;
373 }
9edaf114
JG
374
375 pthread_mutex_lock(&relay_stream->lock);
7591bab1 376 /*
d812ecb9 377 * stream published is protected by the session lock.
7591bab1 378 */
9edaf114 379 if (!relay_stream->published) {
7591bab1
MD
380 goto next;
381 }
9edaf114
JG
382 viewer_stream = viewer_stream_get_by_id(
383 relay_stream->stream_handle);
384 if (!viewer_stream) {
80516611 385 struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
9edaf114 386
123ed7c2
FD
387 /*
388 * Save that we sent the metadata stream to the
389 * viewer. So that we know what trace the viewer
390 * is aware of.
391 */
9edaf114
JG
392 if (relay_stream->is_metadata) {
393 ctf_trace->metadata_stream_sent_to_viewer = true;
394 }
395
396 /*
397 * If a rotation is ongoing, use a copy of the
398 * relay stream's chunk to ensure the stream
399 * files exist.
400 *
401 * Otherwise, the viewer session's current trace
402 * chunk can be used safely.
403 */
404 if ((relay_stream->ongoing_rotation.is_set ||
405 relay_session->ongoing_rotation) &&
406 relay_stream->trace_chunk) {
407 viewer_stream_trace_chunk = lttng_trace_chunk_copy(
408 relay_stream->trace_chunk);
409 if (!viewer_stream_trace_chunk) {
410 ret = -1;
411 ctf_trace_put(ctf_trace);
412 goto error_unlock;
413 }
414 } else {
80516611
JG
415 /*
416 * Transition the viewer session into the newest trace chunk available.
417 */
418 if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk,
419 relay_stream->trace_chunk)) {
420
421 ret = viewer_session_set_trace_chunk_copy(
422 viewer_session,
423 relay_stream->trace_chunk);
424 if (ret) {
425 ret = -1;
426 ctf_trace_put(ctf_trace);
427 goto error_unlock;
428 }
429 }
9edaf114 430
87250ba1
JG
431 if (relay_stream->trace_chunk) {
432 /*
433 * If the corresponding relay
434 * stream's trace chunk is set,
435 * the viewer stream will be
436 * created under it.
437 *
438 * Note that a relay stream can
439 * have a NULL output trace
440 * chunk (for instance, after a
441 * clear against a stopped
442 * session).
443 */
444 const bool reference_acquired = lttng_trace_chunk_get(
445 viewer_session->current_trace_chunk);
446
447 LTTNG_ASSERT(reference_acquired);
448 viewer_stream_trace_chunk =
449 viewer_session->current_trace_chunk;
450 }
123ed7c2 451 }
9edaf114
JG
452
453 viewer_stream = viewer_stream_create(
454 relay_stream,
455 viewer_stream_trace_chunk,
456 seek_t);
457 lttng_trace_chunk_put(viewer_stream_trace_chunk);
458 viewer_stream_trace_chunk = NULL;
459 if (!viewer_stream) {
2a174661 460 ret = -1;
7591bab1 461 ctf_trace_put(ctf_trace);
2a174661
DG
462 goto error_unlock;
463 }
2a174661
DG
464
465 if (nb_created) {
466 /* Update number of created stream counter. */
467 (*nb_created)++;
468 }
2229a09c
MD
469 /*
470 * Ensure a self-reference is preserved even
471 * after we have put our local reference.
472 */
9edaf114 473 if (!viewer_stream_get(viewer_stream)) {
862d3a3b
MD
474 ERR("Unable to get self-reference on viewer stream, logic error.");
475 abort();
476 }
7591bab1 477 } else {
9edaf114 478 if (!viewer_stream->sent_flag && nb_unsent) {
7591bab1
MD
479 /* Update number of unsent stream counter. */
480 (*nb_unsent)++;
481 }
2f8f53af 482 }
2a174661
DG
483 /* Update number of total stream counter. */
484 if (nb_total) {
9edaf114
JG
485 if (relay_stream->is_metadata) {
486 if (!relay_stream->closed ||
487 relay_stream->metadata_received >
488 viewer_stream->metadata_sent) {
2229a09c
MD
489 (*nb_total)++;
490 }
491 } else {
9edaf114
JG
492 if (!relay_stream->closed ||
493 !(((int64_t)(relay_stream->prev_data_seq -
494 relay_stream->last_net_seq_num)) >=
495 0)) {
2229a09c
MD
496 (*nb_total)++;
497 }
498 }
2f8f53af 499 }
2229a09c 500 /* Put local reference. */
9edaf114 501 viewer_stream_put(viewer_stream);
7591bab1 502 next:
9edaf114
JG
503 pthread_mutex_unlock(&relay_stream->lock);
504 stream_put(relay_stream);
2f8f53af 505 }
9edaf114 506 relay_stream = NULL;
7591bab1 507 ctf_trace_put(ctf_trace);
2f8f53af
DG
508 }
509
510 ret = 0;
511
512error_unlock:
2a174661 513 rcu_read_unlock();
80516611 514
9edaf114
JG
515 if (relay_stream) {
516 pthread_mutex_unlock(&relay_stream->lock);
517 stream_put(relay_stream);
518 }
519
2f8f53af
DG
520 return ret;
521}
522
b4aacfdc 523int relayd_live_stop(void)
d3e2ba59 524{
b4aacfdc 525 /* Stop dispatch thread */
d3e2ba59 526 CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
58eb9381 527 futex_nto1_wake(&viewer_conn_queue.futex);
b4aacfdc 528 return 0;
d3e2ba59
JD
529}
530
d3e2ba59
JD
531/*
532 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
533 */
534static
23f940ff
JG
535int create_named_thread_poll_set(struct lttng_poll_event *events,
536 int size, const char *name)
d3e2ba59
JD
537{
538 int ret;
539
540 if (events == NULL || size == 0) {
541 ret = -1;
542 goto error;
543 }
544
23f940ff
JG
545 ret = fd_tracker_util_poll_create(the_fd_tracker,
546 name, events, 1, LTTNG_CLOEXEC);
ad36f3a7
JG
547 if (ret) {
548 PERROR("Failed to create \"%s\" poll file descriptor", name);
549 goto error;
550 }
d3e2ba59
JD
551
552 /* Add quit pipe */
bcf4a440 553 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
d3e2ba59
JD
554 if (ret < 0) {
555 goto error;
556 }
557
558 return 0;
559
560error:
561 return ret;
562}
563
564/*
565 * Check if the thread quit pipe was triggered.
566 *
567 * Return 1 if it was triggered else 0;
568 */
569static
bcf4a440 570int check_thread_quit_pipe(int fd, uint32_t events)
d3e2ba59 571{
bcf4a440 572 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
d3e2ba59
JD
573 return 1;
574 }
575
576 return 0;
577}
578
8855795d
JG
579static
580int create_sock(void *data, int *out_fd)
581{
582 int ret;
ac497a37 583 struct lttcomm_sock *sock = (lttcomm_sock *) data;
8855795d
JG
584
585 ret = lttcomm_create_sock(sock);
586 if (ret < 0) {
587 goto end;
588 }
589
590 *out_fd = sock->fd;
591end:
592 return ret;
593}
594
595static
596int close_sock(void *data, int *in_fd)
597{
ac497a37 598 struct lttcomm_sock *sock = (lttcomm_sock *) data;
8855795d
JG
599
600 return sock->ops->close(sock);
601}
602
29555a78
JG
603static int accept_sock(void *data, int *out_fd)
604{
605 int ret = 0;
606 /* Socks is an array of in_sock, out_sock. */
ac497a37 607 struct lttcomm_sock **socks = (lttcomm_sock **) data;
29555a78
JG
608 struct lttcomm_sock *in_sock = socks[0];
609
610 socks[1] = in_sock->ops->accept(in_sock);
611 if (!socks[1]) {
612 ret = -1;
613 goto end;
614 }
615 *out_fd = socks[1]->fd;
616end:
617 return ret;
618}
619
620static
621struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
622 const char *name)
623{
624 int out_fd, ret;
625 struct lttcomm_sock *socks[2] = { listening_sock, NULL };
626 struct lttcomm_sock *new_sock = NULL;
627
628 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd,
629 (const char **) &name, 1, accept_sock, &socks);
630 if (ret) {
631 goto end;
632 }
633 new_sock = socks[1];
634 DBG("%s accepted, socket %d", name, new_sock->fd);
635end:
636 return new_sock;
637}
638
d3e2ba59
JD
639/*
640 * Create and init socket from uri.
641 */
642static
8855795d 643struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
d3e2ba59 644{
8855795d 645 int ret, sock_fd;
d3e2ba59 646 struct lttcomm_sock *sock = NULL;
8855795d
JG
647 char uri_str[LTTNG_PATH_MAX];
648 char *formated_name = NULL;
d3e2ba59
JD
649
650 sock = lttcomm_alloc_sock_from_uri(uri);
651 if (sock == NULL) {
652 ERR("Allocating socket");
653 goto error;
654 }
655
8855795d
JG
656 /*
657 * Don't fail to create the socket if the name can't be built as it is
658 * only used for debugging purposes.
659 */
660 ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
661 uri_str[sizeof(uri_str) - 1] = '\0';
662 if (ret >= 0) {
663 ret = asprintf(&formated_name, "%s socket @ %s", name,
664 uri_str);
665 if (ret < 0) {
666 formated_name = NULL;
667 }
d3e2ba59 668 }
8855795d
JG
669
670 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
671 (const char **) (formated_name ? &formated_name : NULL),
672 1, create_sock, sock);
d546eeec
JG
673 if (ret) {
674 PERROR("Failed to create \"%s\" socket",
675 formated_name ?: "Unknown");
676 goto error;
677 }
8855795d 678 DBG("Listening on %s socket %d", name, sock->fd);
d3e2ba59
JD
679
680 ret = sock->ops->bind(sock);
681 if (ret < 0) {
2288467f 682 PERROR("Failed to bind lttng-live socket");
d3e2ba59
JD
683 goto error;
684 }
685
686 ret = sock->ops->listen(sock, -1);
687 if (ret < 0) {
688 goto error;
689
690 }
691
d546eeec 692 free(formated_name);
d3e2ba59
JD
693 return sock;
694
695error:
696 if (sock) {
697 lttcomm_destroy_sock(sock);
698 }
d546eeec 699 free(formated_name);
d3e2ba59
JD
700 return NULL;
701}
702
703/*
704 * This thread manages the listening for new connections on the network
705 */
706static
707void *thread_listener(void *data)
708{
709 int i, ret, pollfd, err = -1;
d3e2ba59
JD
710 uint32_t revents, nb_fd;
711 struct lttng_poll_event events;
712 struct lttcomm_sock *live_control_sock;
713
714 DBG("[thread] Relay live listener started");
715
8fba2b8d 716 rcu_register_thread();
eea7556c
MD
717 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_LISTENER);
718
719 health_code_update();
720
8855795d 721 live_control_sock = init_socket(live_uri, "Live listener");
d3e2ba59
JD
722 if (!live_control_sock) {
723 goto error_sock_control;
724 }
725
fb4d42ab 726 /* Pass 2 as size here for the thread quit pipe and control sockets. */
23f940ff
JG
727 ret = create_named_thread_poll_set(&events, 2,
728 "Live listener thread epoll");
d3e2ba59
JD
729 if (ret < 0) {
730 goto error_create_poll;
731 }
732
733 /* Add the control socket */
734 ret = lttng_poll_add(&events, live_control_sock->fd, LPOLLIN | LPOLLRDHUP);
735 if (ret < 0) {
736 goto error_poll_add;
737 }
738
3fd27398
MD
739 lttng_relay_notify_ready();
740
9b5e0863
MD
741 if (testpoint(relayd_thread_live_listener)) {
742 goto error_testpoint;
743 }
744
d3e2ba59 745 while (1) {
eea7556c
MD
746 health_code_update();
747
d3e2ba59
JD
748 DBG("Listener accepting live viewers connections");
749
750restart:
eea7556c 751 health_poll_entry();
d3e2ba59 752 ret = lttng_poll_wait(&events, -1);
eea7556c 753 health_poll_exit();
d3e2ba59
JD
754 if (ret < 0) {
755 /*
756 * Restart interrupted system call.
757 */
758 if (errno == EINTR) {
759 goto restart;
760 }
761 goto error;
762 }
763 nb_fd = ret;
764
765 DBG("Relay new viewer connection received");
766 for (i = 0; i < nb_fd; i++) {
eea7556c
MD
767 health_code_update();
768
d3e2ba59
JD
769 /* Fetch once the poll data */
770 revents = LTTNG_POLL_GETEV(&events, i);
771 pollfd = LTTNG_POLL_GETFD(&events, i);
772
773 /* Thread quit pipe has been closed. Killing thread. */
bcf4a440 774 ret = check_thread_quit_pipe(pollfd, revents);
d3e2ba59
JD
775 if (ret) {
776 err = 0;
777 goto exit;
778 }
779
03e43155 780 if (revents & LPOLLIN) {
d3e2ba59 781 /*
7591bab1
MD
782 * A new connection is requested, therefore a
783 * viewer connection is allocated in this
784 * thread, enqueued to a global queue and
785 * dequeued (and freed) in the worker thread.
d3e2ba59 786 */
58eb9381
DG
787 int val = 1;
788 struct relay_connection *new_conn;
d3e2ba59
JD
789 struct lttcomm_sock *newsock;
790
29555a78
JG
791 newsock = accept_live_sock(live_control_sock,
792 "Live socket to client");
d3e2ba59
JD
793 if (!newsock) {
794 PERROR("accepting control sock");
d3e2ba59
JD
795 goto error;
796 }
797 DBG("Relay viewer connection accepted socket %d", newsock->fd);
58eb9381 798
d3e2ba59 799 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
58eb9381 800 sizeof(val));
d3e2ba59
JD
801 if (ret < 0) {
802 PERROR("setsockopt inet");
803 lttcomm_destroy_sock(newsock);
d3e2ba59
JD
804 goto error;
805 }
7591bab1
MD
806 new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
807 if (!new_conn) {
808 lttcomm_destroy_sock(newsock);
809 goto error;
810 }
811 /* Ownership assumed by the connection. */
812 newsock = NULL;
d3e2ba59 813
58eb9381 814 /* Enqueue request for the dispatcher thread. */
ac497a37
SM
815 cds_wfcq_head_ptr_t head;
816 head.h = &viewer_conn_queue.head;
817 cds_wfcq_enqueue(head, &viewer_conn_queue.tail,
8bdee6e2 818 &new_conn->qnode);
d3e2ba59
JD
819
820 /*
7591bab1
MD
821 * Wake the dispatch queue futex.
822 * Implicit memory barrier with the
823 * exchange in cds_wfcq_enqueue.
d3e2ba59 824 */
58eb9381 825 futex_nto1_wake(&viewer_conn_queue.futex);
03e43155
MD
826 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
827 ERR("socket poll error");
828 goto error;
829 } else {
830 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
831 goto error;
d3e2ba59
JD
832 }
833 }
834 }
835
836exit:
837error:
838error_poll_add:
9b5e0863 839error_testpoint:
23f940ff 840 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
d3e2ba59
JD
841error_create_poll:
842 if (live_control_sock->fd >= 0) {
8855795d
JG
843 int sock_fd = live_control_sock->fd;
844
845 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
846 &sock_fd, 1, close_sock,
847 live_control_sock);
d3e2ba59
JD
848 if (ret) {
849 PERROR("close");
850 }
8855795d 851 live_control_sock->fd = -1;
d3e2ba59
JD
852 }
853 lttcomm_destroy_sock(live_control_sock);
854error_sock_control:
855 if (err) {
eea7556c 856 health_error();
d3e2ba59
JD
857 DBG("Live viewer listener thread exited with error");
858 }
eea7556c 859 health_unregister(health_relayd);
8fba2b8d 860 rcu_unregister_thread();
d3e2ba59 861 DBG("Live viewer listener thread cleanup complete");
b4aacfdc
MD
862 if (lttng_relay_stop_threads()) {
863 ERR("Error stopping threads");
178a0557 864 }
d3e2ba59
JD
865 return NULL;
866}
867
868/*
869 * This thread manages the dispatching of the requests to worker threads
870 */
871static
872void *thread_dispatcher(void *data)
873{
6cd525e8
MD
874 int err = -1;
875 ssize_t ret;
8bdee6e2 876 struct cds_wfcq_node *node;
58eb9381 877 struct relay_connection *conn = NULL;
d3e2ba59
JD
878
879 DBG("[thread] Live viewer relay dispatcher started");
880
eea7556c
MD
881 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_DISPATCHER);
882
9b5e0863
MD
883 if (testpoint(relayd_thread_live_dispatcher)) {
884 goto error_testpoint;
885 }
886
eea7556c
MD
887 health_code_update();
888
0ed3b1a8 889 for (;;) {
eea7556c
MD
890 health_code_update();
891
d3e2ba59 892 /* Atomically prepare the queue futex */
58eb9381 893 futex_nto1_prepare(&viewer_conn_queue.futex);
d3e2ba59 894
0ed3b1a8
MD
895 if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
896 break;
897 }
898
d3e2ba59 899 do {
eea7556c
MD
900 health_code_update();
901
d3e2ba59 902 /* Dequeue commands */
8bdee6e2
SM
903 node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
904 &viewer_conn_queue.tail);
d3e2ba59
JD
905 if (node == NULL) {
906 DBG("Woken up but nothing in the live-viewer "
907 "relay command queue");
908 /* Continue thread execution */
909 break;
910 }
58eb9381 911 conn = caa_container_of(node, struct relay_connection, qnode);
d3e2ba59 912 DBG("Dispatching viewer request waiting on sock %d",
58eb9381 913 conn->sock->fd);
d3e2ba59
JD
914
915 /*
7591bab1
MD
916 * Inform worker thread of the new request. This
917 * call is blocking so we can be assured that
918 * the data will be read at some point in time
919 * or wait to the end of the world :)
d3e2ba59 920 */
58eb9381
DG
921 ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
922 if (ret < 0) {
923 PERROR("write conn pipe");
7591bab1 924 connection_put(conn);
d3e2ba59
JD
925 goto error;
926 }
927 } while (node != NULL);
928
929 /* Futex wait on queue. Blocking call on futex() */
eea7556c 930 health_poll_entry();
58eb9381 931 futex_nto1_wait(&viewer_conn_queue.futex);
eea7556c 932 health_poll_exit();
d3e2ba59
JD
933 }
934
eea7556c
MD
935 /* Normal exit, no error */
936 err = 0;
937
d3e2ba59 938error:
9b5e0863 939error_testpoint:
eea7556c
MD
940 if (err) {
941 health_error();
942 ERR("Health error occurred in %s", __func__);
943 }
944 health_unregister(health_relayd);
d3e2ba59 945 DBG("Live viewer dispatch thread dying");
b4aacfdc
MD
946 if (lttng_relay_stop_threads()) {
947 ERR("Error stopping threads");
178a0557 948 }
d3e2ba59
JD
949 return NULL;
950}
951
952/*
953 * Establish connection with the viewer and check the versions.
954 *
955 * Return 0 on success or else negative value.
956 */
957static
58eb9381 958int viewer_connect(struct relay_connection *conn)
d3e2ba59
JD
959{
960 int ret;
961 struct lttng_viewer_connect reply, msg;
962
58eb9381 963 conn->version_check_done = 1;
d3e2ba59 964
eea7556c
MD
965 health_code_update();
966
58eb9381 967 ret = recv_request(conn->sock, &msg, sizeof(msg));
2f8f53af 968 if (ret < 0) {
d3e2ba59
JD
969 goto end;
970 }
971
eea7556c
MD
972 health_code_update();
973
f46b2ce6 974 memset(&reply, 0, sizeof(reply));
d3e2ba59
JD
975 reply.major = RELAYD_VERSION_COMM_MAJOR;
976 reply.minor = RELAYD_VERSION_COMM_MINOR;
977
978 /* Major versions must be the same */
979 if (reply.major != be32toh(msg.major)) {
2f8f53af
DG
980 DBG("Incompatible major versions ([relayd] %u vs [client] %u)",
981 reply.major, be32toh(msg.major));
72180669 982 ret = -1;
d3e2ba59
JD
983 goto end;
984 }
985
58eb9381 986 conn->major = reply.major;
d3e2ba59
JD
987 /* We adapt to the lowest compatible version */
988 if (reply.minor <= be32toh(msg.minor)) {
58eb9381 989 conn->minor = reply.minor;
d3e2ba59 990 } else {
58eb9381 991 conn->minor = be32toh(msg.minor);
d3e2ba59
JD
992 }
993
c4e361a4 994 if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_COMMAND) {
58eb9381 995 conn->type = RELAY_VIEWER_COMMAND;
c4e361a4 996 } else if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_NOTIFICATION) {
58eb9381 997 conn->type = RELAY_VIEWER_NOTIFICATION;
d3e2ba59
JD
998 } else {
999 ERR("Unknown connection type : %u", be32toh(msg.type));
1000 ret = -1;
1001 goto end;
1002 }
1003
1004 reply.major = htobe32(reply.major);
1005 reply.minor = htobe32(reply.minor);
58eb9381 1006 if (conn->type == RELAY_VIEWER_COMMAND) {
93b4787b 1007 /*
7591bab1
MD
1008 * Increment outside of htobe64 macro, because the argument can
1009 * be used more than once within the macro, and thus the
1010 * operation may be undefined.
93b4787b 1011 */
7591bab1 1012 pthread_mutex_lock(&last_relay_viewer_session_id_lock);
93b4787b 1013 last_relay_viewer_session_id++;
7591bab1 1014 pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
93b4787b 1015 reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
d3e2ba59 1016 }
eea7556c
MD
1017
1018 health_code_update();
1019
58eb9381 1020 ret = send_response(conn->sock, &reply, sizeof(reply));
d3e2ba59 1021 if (ret < 0) {
2f8f53af 1022 goto end;
d3e2ba59
JD
1023 }
1024
eea7556c
MD
1025 health_code_update();
1026
58eb9381 1027 DBG("Version check done using protocol %u.%u", conn->major, conn->minor);
d3e2ba59
JD
1028 ret = 0;
1029
1030end:
1031 return ret;
1032}
1033
1034/*
1035 * Send the viewer the list of current sessions.
7591bab1
MD
1036 * We need to create a copy of the hash table content because otherwise
1037 * we cannot assume the number of entries stays the same between getting
1038 * the number of HT elements and iteration over the HT.
d3e2ba59
JD
1039 *
1040 * Return 0 on success or else a negative value.
1041 */
1042static
58eb9381 1043int viewer_list_sessions(struct relay_connection *conn)
d3e2ba59 1044{
5f10c6b1 1045 int ret = 0;
d3e2ba59 1046 struct lttng_viewer_list_sessions session_list;
d3e2ba59 1047 struct lttng_ht_iter iter;
d3e2ba59 1048 struct relay_session *session;
7591bab1
MD
1049 struct lttng_viewer_session *send_session_buf = NULL;
1050 uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
1051 uint32_t count = 0;
d3e2ba59 1052
ac497a37 1053 send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
7591bab1
MD
1054 if (!send_session_buf) {
1055 return -1;
d3e2ba59
JD
1056 }
1057
7591bab1
MD
1058 rcu_read_lock();
1059 cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
2f8f53af 1060 session_n.node) {
7591bab1 1061 struct lttng_viewer_session *send_session;
d3e2ba59 1062
eea7556c
MD
1063 health_code_update();
1064
d995f382 1065 pthread_mutex_lock(&session->lock);
7e0a4379
JR
1066 if (session->connection_closed) {
1067 /* Skip closed session */
d995f382
JG
1068 goto next_session;
1069 }
7e0a4379 1070
7591bab1
MD
1071 if (count >= buf_count) {
1072 struct lttng_viewer_session *newbuf;
1073 uint32_t new_buf_count = buf_count << 1;
eea7556c 1074
ac497a37 1075 newbuf = (lttng_viewer_session *) realloc(send_session_buf,
7591bab1
MD
1076 new_buf_count * sizeof(*send_session_buf));
1077 if (!newbuf) {
1078 ret = -1;
d995f382 1079 goto break_loop;
7591bab1
MD
1080 }
1081 send_session_buf = newbuf;
1082 buf_count = new_buf_count;
6763619c 1083 }
7591bab1 1084 send_session = &send_session_buf[count];
bfc3fb17
MD
1085 if (lttng_strncpy(send_session->session_name,
1086 session->session_name,
1087 sizeof(send_session->session_name))) {
1088 ret = -1;
d995f382 1089 goto break_loop;
bfc3fb17
MD
1090 }
1091 if (lttng_strncpy(send_session->hostname, session->hostname,
1092 sizeof(send_session->hostname))) {
1093 ret = -1;
d995f382 1094 goto break_loop;
bfc3fb17 1095 }
7591bab1
MD
1096 send_session->id = htobe64(session->id);
1097 send_session->live_timer = htobe32(session->live_timer);
1098 if (session->viewer_attached) {
1099 send_session->clients = htobe32(1);
1100 } else {
1101 send_session->clients = htobe32(0);
d227d5bd 1102 }
7591bab1
MD
1103 send_session->streams = htobe32(session->stream_count);
1104 count++;
d995f382
JG
1105 next_session:
1106 pthread_mutex_unlock(&session->lock);
1107 continue;
1108 break_loop:
1109 pthread_mutex_unlock(&session->lock);
1110 break;
7591bab1
MD
1111 }
1112 rcu_read_unlock();
5f10c6b1
JG
1113 if (ret < 0) {
1114 goto end_free;
1115 }
d227d5bd 1116
7591bab1 1117 session_list.sessions_count = htobe32(count);
d227d5bd 1118
7591bab1 1119 health_code_update();
d227d5bd 1120
7591bab1
MD
1121 ret = send_response(conn->sock, &session_list, sizeof(session_list));
1122 if (ret < 0) {
1123 goto end_free;
d227d5bd 1124 }
d227d5bd 1125
7591bab1 1126 health_code_update();
d227d5bd 1127
7591bab1
MD
1128 ret = send_response(conn->sock, send_session_buf,
1129 count * sizeof(*send_session_buf));
1130 if (ret < 0) {
1131 goto end_free;
d227d5bd 1132 }
7591bab1 1133 health_code_update();
d227d5bd 1134
7591bab1
MD
1135 ret = 0;
1136end_free:
1137 free(send_session_buf);
1138 return ret;
d227d5bd
JD
1139}
1140
80e8027a 1141/*
7591bab1 1142 * Send the viewer the list of current streams.
80e8027a
JD
1143 */
1144static
58eb9381 1145int viewer_get_new_streams(struct relay_connection *conn)
80e8027a
JD
1146{
1147 int ret, send_streams = 0;
7591bab1 1148 uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
80e8027a
JD
1149 struct lttng_viewer_new_streams_request request;
1150 struct lttng_viewer_new_streams_response response;
b66a15d1 1151 struct relay_session *session = NULL;
6763619c 1152 uint64_t session_id;
bddf80e4 1153 bool closed = false;
80e8027a 1154
a0377dfe 1155 LTTNG_ASSERT(conn);
80e8027a 1156
80e8027a
JD
1157 health_code_update();
1158
2f8f53af 1159 /* Receive the request from the connected client. */
58eb9381 1160 ret = recv_request(conn->sock, &request, sizeof(request));
2f8f53af 1161 if (ret < 0) {
80e8027a
JD
1162 goto error;
1163 }
6763619c 1164 session_id = be64toh(request.session_id);
80e8027a
JD
1165
1166 health_code_update();
1167
53efb85a
MD
1168 memset(&response, 0, sizeof(response));
1169
7591bab1 1170 session = session_get_by_id(session_id);
2f8f53af 1171 if (!session) {
6763619c 1172 DBG("Relay session %" PRIu64 " not found", session_id);
c4e361a4 1173 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
80e8027a
JD
1174 goto send_reply;
1175 }
1176
7591bab1 1177 if (!viewer_session_is_attached(conn->viewer_session, session)) {
c4e361a4 1178 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
80e8027a
JD
1179 goto send_reply;
1180 }
1181
c876d657
JR
1182 /*
1183 * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since
1184 * that at this point the client is already attached to the session.Aany
1185 * initial stream will have been created with the seek type at attach
1186 * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach).
1187 * Otherwise any event happening in a new stream between the attach and
1188 * a call to viewer_get_new_streams will be "lost" (never received) from
1189 * the viewer's point of view.
1190 */
c06fdd95 1191 pthread_mutex_lock(&session->lock);
8cd15f6a
MD
1192 /*
1193 * If a session rotation is ongoing, do not attempt to open any
1194 * stream, because the chunk can be in an intermediate state
1195 * due to directory renaming.
1196 */
1197 if (session->ongoing_rotation) {
1198 DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
1199 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
1200 goto send_reply_unlock;
1201 }
b66a15d1 1202 ret = make_viewer_streams(session,
9edaf114 1203 conn->viewer_session,
c876d657 1204 LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
bddf80e4 1205 &nb_created, &closed);
2f8f53af 1206 if (ret < 0) {
b66a15d1 1207 goto error_unlock_session;
2f8f53af 1208 }
3d0bbd40
JG
1209 send_streams = 1;
1210 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
b66a15d1 1211
2f8f53af
DG
1212 /* Only send back the newly created streams with the unsent ones. */
1213 nb_streams = nb_created + nb_unsent;
80e8027a
JD
1214 response.streams_count = htobe32(nb_streams);
1215
4479f682 1216 /*
2229a09c
MD
1217 * If the session is closed, HUP when there are no more streams
1218 * with data.
4479f682 1219 */
bddf80e4 1220 if (closed && nb_total == 0) {
4479f682 1221 send_streams = 0;
bddf80e4 1222 response.streams_count = 0;
4479f682 1223 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
3d0bbd40 1224 goto send_reply_unlock;
4479f682 1225 }
3d0bbd40
JG
1226send_reply_unlock:
1227 pthread_mutex_unlock(&session->lock);
4479f682 1228
80e8027a
JD
1229send_reply:
1230 health_code_update();
58eb9381 1231 ret = send_response(conn->sock, &response, sizeof(response));
80e8027a 1232 if (ret < 0) {
7591bab1 1233 goto end_put_session;
80e8027a
JD
1234 }
1235 health_code_update();
1236
1237 /*
7591bab1
MD
1238 * Unknown or empty session, just return gracefully, the viewer
1239 * knows what is happening.
80e8027a
JD
1240 */
1241 if (!send_streams || !nb_streams) {
1242 ret = 0;
7591bab1 1243 goto end_put_session;
80e8027a
JD
1244 }
1245
2f8f53af 1246 /*
7591bab1
MD
1247 * Send stream and *DON'T* ignore the sent flag so every viewer
1248 * streams that were not sent from that point will be sent to
1249 * the viewer.
2f8f53af 1250 */
c06fdd95 1251 ret = send_viewer_streams(conn->sock, session_id, 0);
2f8f53af 1252 if (ret < 0) {
7591bab1 1253 goto end_put_session;
80e8027a
JD
1254 }
1255
7591bab1
MD
1256end_put_session:
1257 if (session) {
1258 session_put(session);
1259 }
80e8027a
JD
1260error:
1261 return ret;
b66a15d1
JG
1262error_unlock_session:
1263 pthread_mutex_unlock(&session->lock);
1264 session_put(session);
1265 return ret;
80e8027a
JD
1266}
1267
d3e2ba59
JD
1268/*
1269 * Send the viewer the list of current sessions.
1270 */
1271static
58eb9381 1272int viewer_attach_session(struct relay_connection *conn)
d3e2ba59 1273{
2f8f53af
DG
1274 int send_streams = 0;
1275 ssize_t ret;
80e8027a 1276 uint32_t nb_streams = 0;
2f8f53af 1277 enum lttng_viewer_seek seek_type;
d3e2ba59
JD
1278 struct lttng_viewer_attach_session_request request;
1279 struct lttng_viewer_attach_session_response response;
7591bab1 1280 struct relay_session *session = NULL;
dbd6665b 1281 enum lttng_viewer_attach_return_code viewer_attach_status;
bddf80e4 1282 bool closed = false;
c06fdd95 1283 uint64_t session_id;
d3e2ba59 1284
a0377dfe 1285 LTTNG_ASSERT(conn);
d3e2ba59 1286
eea7556c
MD
1287 health_code_update();
1288
2f8f53af 1289 /* Receive the request from the connected client. */
58eb9381 1290 ret = recv_request(conn->sock, &request, sizeof(request));
2f8f53af 1291 if (ret < 0) {
d3e2ba59
JD
1292 goto error;
1293 }
1294
c06fdd95 1295 session_id = be64toh(request.session_id);
eea7556c
MD
1296 health_code_update();
1297
53efb85a
MD
1298 memset(&response, 0, sizeof(response));
1299
c3b7390b
JD
1300 if (!conn->viewer_session) {
1301 DBG("Client trying to attach before creating a live viewer session");
1302 response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
1303 goto send_reply;
1304 }
1305
c06fdd95 1306 session = session_get_by_id(session_id);
2f8f53af 1307 if (!session) {
c06fdd95 1308 DBG("Relay session %" PRIu64 " not found", session_id);
c4e361a4 1309 response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
d3e2ba59
JD
1310 goto send_reply;
1311 }
c06fdd95 1312 DBG("Attach session ID %" PRIu64 " received", session_id);
d3e2ba59 1313
c06fdd95 1314 pthread_mutex_lock(&session->lock);
7591bab1 1315 if (session->live_timer == 0) {
d3e2ba59 1316 DBG("Not live session");
c4e361a4 1317 response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
d3e2ba59 1318 goto send_reply;
7591bab1
MD
1319 }
1320
1321 send_streams = 1;
dbd6665b
JG
1322 viewer_attach_status = viewer_session_attach(conn->viewer_session,
1323 session);
1324 if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
1325 response.status = htobe32(viewer_attach_status);
7591bab1 1326 goto send_reply;
d3e2ba59
JD
1327 }
1328
1329 switch (be32toh(request.seek)) {
c4e361a4
JD
1330 case LTTNG_VIEWER_SEEK_BEGINNING:
1331 case LTTNG_VIEWER_SEEK_LAST:
7591bab1 1332 response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
ac497a37 1333 seek_type = (lttng_viewer_seek) be32toh(request.seek);
d3e2ba59
JD
1334 break;
1335 default:
1336 ERR("Wrong seek parameter");
c4e361a4 1337 response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
d3e2ba59
JD
1338 send_streams = 0;
1339 goto send_reply;
1340 }
1341
8cd15f6a
MD
1342 /*
1343 * If a session rotation is ongoing, do not attempt to open any
1344 * stream, because the chunk can be in an intermediate state
1345 * due to directory renaming.
1346 */
1347 if (session->ongoing_rotation) {
1348 DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
1349 send_streams = 0;
1350 goto send_reply;
1351 }
1352
b66a15d1 1353 ret = make_viewer_streams(session,
9edaf114 1354 conn->viewer_session, seek_type,
b66a15d1 1355 &nb_streams, NULL, NULL, &closed);
2f8f53af 1356 if (ret < 0) {
7591bab1 1357 goto end_put_session;
d3e2ba59 1358 }
c06fdd95
JG
1359 pthread_mutex_unlock(&session->lock);
1360 session_put(session);
1361 session = NULL;
1362
1363 response.streams_count = htobe32(nb_streams);
bddf80e4
MD
1364 /*
1365 * If the session is closed when the viewer is attaching, it
1366 * means some of the streams may have been concurrently removed,
1367 * so we don't allow the viewer to attach, even if there are
1368 * streams available.
1369 */
1370 if (closed) {
1371 send_streams = 0;
1372 response.streams_count = 0;
06079f15 1373 response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
bddf80e4
MD
1374 goto send_reply;
1375 }
1376
d3e2ba59 1377send_reply:
eea7556c 1378 health_code_update();
58eb9381 1379 ret = send_response(conn->sock, &response, sizeof(response));
d3e2ba59 1380 if (ret < 0) {
7591bab1 1381 goto end_put_session;
d3e2ba59 1382 }
eea7556c 1383 health_code_update();
d3e2ba59
JD
1384
1385 /*
7591bab1
MD
1386 * Unknown or empty session, just return gracefully, the viewer
1387 * knows what is happening.
d3e2ba59 1388 */
157df586 1389 if (!send_streams || !nb_streams) {
d3e2ba59 1390 ret = 0;
7591bab1 1391 goto end_put_session;
d3e2ba59
JD
1392 }
1393
2f8f53af 1394 /* Send stream and ignore the sent flag. */
c06fdd95 1395 ret = send_viewer_streams(conn->sock, session_id, 1);
2f8f53af 1396 if (ret < 0) {
7591bab1 1397 goto end_put_session;
d3e2ba59 1398 }
d3e2ba59 1399
7591bab1
MD
1400end_put_session:
1401 if (session) {
c06fdd95 1402 pthread_mutex_unlock(&session->lock);
7591bab1
MD
1403 session_put(session);
1404 }
4a9daf17
JD
1405error:
1406 return ret;
1407}
1408
878c34cf
DG
1409/*
1410 * Open the index file if needed for the given vstream.
1411 *
f8f3885c
MD
1412 * If an index file is successfully opened, the vstream will set it as its
1413 * current index file.
878c34cf
DG
1414 *
1415 * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
7591bab1
MD
1416 *
1417 * Called with rstream lock held.
878c34cf
DG
1418 */
1419static int try_open_index(struct relay_viewer_stream *vstream,
1420 struct relay_stream *rstream)
1421{
1422 int ret = 0;
ebb29c10
JG
1423 const uint32_t connection_major = rstream->trace->session->major;
1424 const uint32_t connection_minor = rstream->trace->session->minor;
3ff5c5db 1425 enum lttng_trace_chunk_status chunk_status;
878c34cf 1426
f8f3885c 1427 if (vstream->index_file) {
878c34cf
DG
1428 goto end;
1429 }
1430
1431 /*
7591bab1 1432 * First time, we open the index file and at least one index is ready.
878c34cf 1433 */
80516611
JG
1434 if (rstream->index_received_seqcount == 0 ||
1435 !vstream->stream_file.trace_chunk) {
878c34cf
DG
1436 ret = -ENOENT;
1437 goto end;
1438 }
80516611 1439
3ff5c5db 1440 chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
b66a15d1 1441 vstream->stream_file.trace_chunk, rstream->path_name,
ebb29c10
JG
1442 rstream->channel_name, rstream->tracefile_size,
1443 vstream->current_tracefile_id,
1444 lttng_to_index_major(connection_major, connection_minor),
3ff5c5db
MD
1445 lttng_to_index_minor(connection_major, connection_minor),
1446 true, &vstream->index_file);
1447 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1448 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
1449 ret = -ENOENT;
1450 } else {
1451 ret = -1;
1452 }
878c34cf
DG
1453 }
1454
1455end:
1456 return ret;
1457}
1458
1459/*
7591bab1
MD
1460 * Check the status of the index for the given stream. This function
1461 * updates the index structure if needed and can put (close) the vstream
1462 * in the HUP situation.
1463 *
1464 * Return 0 means that we can proceed with the index. A value of 1 means
1465 * that the index has been updated and is ready to be sent to the
1466 * client. A negative value indicates an error that can't be handled.
878c34cf 1467 *
7591bab1 1468 * Called with rstream lock held.
878c34cf
DG
1469 */
1470static int check_index_status(struct relay_viewer_stream *vstream,
1471 struct relay_stream *rstream, struct ctf_trace *trace,
1472 struct lttng_viewer_index *index)
1473{
1474 int ret;
1475
68c40154
MD
1476 DBG("Check index status: index_received_seqcount %" PRIu64 " "
1477 "index_sent_seqcount %" PRIu64 " "
1478 "for stream %" PRIu64,
1479 rstream->index_received_seqcount,
1480 vstream->index_sent_seqcount,
1481 vstream->stream->stream_handle);
797bc362 1482 if ((trace->session->connection_closed || rstream->closed)
a44ca2ca
MD
1483 && rstream->index_received_seqcount
1484 == vstream->index_sent_seqcount) {
797bc362
MD
1485 /*
1486 * Last index sent and session connection or relay
1487 * stream are closed.
1488 */
7591bab1
MD
1489 index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1490 goto hup;
1491 } else if (rstream->beacon_ts_end != -1ULL &&
b0d240a2
MD
1492 (rstream->index_received_seqcount == 0 ||
1493 (vstream->index_sent_seqcount != 0 &&
a44ca2ca 1494 rstream->index_received_seqcount
b0d240a2 1495 <= vstream->index_sent_seqcount))) {
7591bab1
MD
1496 /*
1497 * We've received a synchronization beacon and the last index
1498 * available has been sent, the index for now is inactive.
1499 *
1500 * In this case, we have received a beacon which allows us to
1501 * inform the client of a time interval during which we can
1502 * guarantee that there are no events to read (and never will
1503 * be).
b0d240a2
MD
1504 *
1505 * The sent seqcount can grow higher than receive seqcount on
1506 * clear because the rotation performed by clear will push
1507 * the index_sent_seqcount ahead (see
1508 * viewer_stream_sync_tracefile_array_tail) and skip over
1509 * packet sequence numbers.
7591bab1
MD
1510 */
1511 index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
1512 index->timestamp_end = htobe64(rstream->beacon_ts_end);
1513 index->stream_id = htobe64(rstream->ctf_stream_id);
68c40154
MD
1514 DBG("Check index status: inactive with beacon, for stream %" PRIu64,
1515 vstream->stream->stream_handle);
7591bab1 1516 goto index_ready;
b0d240a2
MD
1517 } else if (rstream->index_received_seqcount == 0 ||
1518 (vstream->index_sent_seqcount != 0 &&
1519 rstream->index_received_seqcount
1520 <= vstream->index_sent_seqcount)) {
7591bab1 1521 /*
b0d240a2 1522 * This checks whether received <= sent seqcount. In
a44ca2ca
MD
1523 * this case, we have not received a beacon. Therefore,
1524 * we can only ask the client to retry later.
b0d240a2
MD
1525 *
1526 * The sent seqcount can grow higher than receive seqcount on
1527 * clear because the rotation performed by clear will push
1528 * the index_sent_seqcount ahead (see
1529 * viewer_stream_sync_tracefile_array_tail) and skip over
1530 * packet sequence numbers.
7591bab1
MD
1531 */
1532 index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
68c40154
MD
1533 DBG("Check index status: retry for stream %" PRIu64,
1534 vstream->stream->stream_handle);
7591bab1 1535 goto index_ready;
a44ca2ca
MD
1536 } else if (!tracefile_array_seq_in_file(rstream->tfa,
1537 vstream->current_tracefile_id,
1538 vstream->index_sent_seqcount)) {
7591bab1 1539 /*
a44ca2ca
MD
1540 * The next index we want to send cannot be read either
1541 * because we need to perform a rotation, or due to
1542 * the producer having overwritten its trace file.
7591bab1 1543 */
a44ca2ca 1544 DBG("Viewer stream %" PRIu64 " rotation",
7591bab1
MD
1545 vstream->stream->stream_handle);
1546 ret = viewer_stream_rotate(vstream);
b0d240a2 1547 if (ret == 1) {
7591bab1
MD
1548 /* EOF across entire stream. */
1549 index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1550 goto hup;
1551 }
7591bab1 1552 /*
a44ca2ca
MD
1553 * If we have been pushed due to overwrite, it
1554 * necessarily means there is data that can be read in
1555 * the stream. If we rotated because we reached the end
1556 * of a tracefile, it means the following tracefile
1557 * needs to contain at least one index, else we would
1558 * have already returned LTTNG_VIEWER_INDEX_RETRY to the
1559 * viewer. The updated index_sent_seqcount needs to
1560 * point to a readable index entry now.
1561 *
1562 * In the case where we "rotate" on a single file, we
1563 * can end up in a case where the requested index is
1564 * still unavailable.
7591bab1 1565 */
a44ca2ca
MD
1566 if (rstream->tracefile_count == 1 &&
1567 !tracefile_array_seq_in_file(
1568 rstream->tfa,
1569 vstream->current_tracefile_id,
1570 vstream->index_sent_seqcount)) {
1571 index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
68c40154
MD
1572 DBG("Check index status: retry: "
1573 "tracefile array sequence number %" PRIu64
1574 " not in file for stream %" PRIu64,
1575 vstream->index_sent_seqcount,
1576 vstream->stream->stream_handle);
a44ca2ca 1577 goto index_ready;
878c34cf 1578 }
a0377dfe 1579 LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
a44ca2ca
MD
1580 vstream->current_tracefile_id,
1581 vstream->index_sent_seqcount));
878c34cf 1582 }
a44ca2ca
MD
1583 /* ret == 0 means successful so we continue. */
1584 ret = 0;
878c34cf
DG
1585 return ret;
1586
1587hup:
7591bab1 1588 viewer_stream_put(vstream);
878c34cf
DG
1589index_ready:
1590 return 1;
1591}
1592
80516611
JG
1593static
1594void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
1595 struct lttng_trace_chunk *new_trace_chunk)
1596{
1597 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
1598
1599 if (new_trace_chunk) {
1600 const bool acquired_reference = lttng_trace_chunk_get(
1601 new_trace_chunk);
1602
a0377dfe 1603 LTTNG_ASSERT(acquired_reference);
80516611
JG
1604 }
1605
1606 vstream->stream_file.trace_chunk = new_trace_chunk;
1607 viewer_stream_sync_tracefile_array_tail(vstream);
1608 viewer_stream_close_files(vstream);
1609}
1610
d3e2ba59
JD
1611/*
1612 * Send the next index for a stream.
1613 *
1614 * Return 0 on success or else a negative value.
1615 */
1616static
58eb9381 1617int viewer_get_next_index(struct relay_connection *conn)
d3e2ba59
JD
1618{
1619 int ret;
1620 struct lttng_viewer_get_next_index request_index;
1621 struct lttng_viewer_index viewer_index;
50adc264 1622 struct ctf_packet_index packet_index;
7591bab1
MD
1623 struct relay_viewer_stream *vstream = NULL;
1624 struct relay_stream *rstream = NULL;
1625 struct ctf_trace *ctf_trace = NULL;
1626 struct relay_viewer_stream *metadata_viewer_stream = NULL;
bb1dcf01
MD
1627 bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
1628 uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
1629 enum lttng_trace_chunk_status status;
d3e2ba59 1630
a0377dfe 1631 LTTNG_ASSERT(conn);
d3e2ba59 1632
7591bab1 1633 memset(&viewer_index, 0, sizeof(viewer_index));
eea7556c 1634 health_code_update();
2f8f53af 1635
58eb9381 1636 ret = recv_request(conn->sock, &request_index, sizeof(request_index));
2f8f53af 1637 if (ret < 0) {
d3e2ba59
JD
1638 goto end;
1639 }
eea7556c 1640 health_code_update();
d3e2ba59 1641
7591bab1 1642 vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
6763619c 1643 if (!vstream) {
a44ca2ca 1644 DBG("Client requested index of unknown stream id %" PRIu64,
9b9f9f94 1645 (uint64_t) be64toh(request_index.stream_id));
f1883937
MD
1646 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1647 goto send_reply;
2a174661
DG
1648 }
1649
7591bab1
MD
1650 /* Use back. ref. Protected by refcounts. */
1651 rstream = vstream->stream;
1652 ctf_trace = rstream->trace;
d3e2ba59 1653
7591bab1
MD
1654 /* metadata_viewer_stream may be NULL. */
1655 metadata_viewer_stream =
1656 ctf_trace_get_viewer_metadata_stream(ctf_trace);
2a174661 1657
b6ae2a95
MD
1658 /*
1659 * Hold the session lock to protect against concurrent changes
1660 * to the chunk files (e.g. rename done by clear), which are
1661 * protected by the session ongoing rotation state. Those are
1662 * synchronized with the session lock.
1663 */
1664 pthread_mutex_lock(&rstream->trace->session->lock);
7591bab1 1665 pthread_mutex_lock(&rstream->lock);
d3e2ba59
JD
1666
1667 /*
1668 * The viewer should not ask for index on metadata stream.
1669 */
7591bab1 1670 if (rstream->is_metadata) {
c4e361a4 1671 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
d3e2ba59
JD
1672 goto send_reply;
1673 }
1674
b0d240a2
MD
1675 if (rstream->ongoing_rotation.is_set) {
1676 /* Rotation is ongoing, try again later. */
1677 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1678 goto send_reply;
1679 }
1680
1681 if (rstream->trace->session->ongoing_rotation) {
1682 /* Rotation is ongoing, try again later. */
1683 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1684 goto send_reply;
1685 }
1686
80516611
JG
1687 /*
1688 * Transition the viewer session into the newest trace chunk available.
1689 */
1690 if (!lttng_trace_chunk_ids_equal(
ad8bec24
JG
1691 conn->viewer_session->current_trace_chunk,
1692 rstream->trace_chunk)) {
9a9c8637 1693 DBG("Relay stream and viewer chunk ids differ");
b0d240a2 1694
ad8bec24
JG
1695 ret = viewer_session_set_trace_chunk_copy(
1696 conn->viewer_session,
1697 rstream->trace_chunk);
1698 if (ret) {
b0d240a2
MD
1699 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1700 goto send_reply;
1701 }
b0d240a2 1702 }
b0d240a2 1703
80516611
JG
1704 /*
1705 * Transition the viewer stream into the latest trace chunk available.
1706 *
1707 * Note that the stream must _not_ rotate in one precise condition:
1708 * the relay stream has rotated to a NULL trace chunk and the viewer
1709 * stream is consuming the trace chunk that was active just before
1710 * that rotation to NULL.
1711 *
1712 * This allows clients to consume all the packets of a trace chunk
1713 * after a session's destruction.
1714 */
bb1dcf01
MD
1715 if (vstream->stream_file.trace_chunk) {
1716 status = lttng_trace_chunk_get_id(
1717 vstream->stream_file.trace_chunk,
1718 &stream_file_chunk_id);
1719 LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
1720 }
1721 if (conn->viewer_session->current_trace_chunk) {
1722 status = lttng_trace_chunk_get_id(
b0d240a2 1723 conn->viewer_session->current_trace_chunk,
bb1dcf01
MD
1724 &viewer_session_chunk_id);
1725 LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
1726 }
1727
1728 viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
1729 conn->viewer_session->current_trace_chunk,
1730 vstream->stream_file.trace_chunk);
1731 viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
1732 vstream->last_seen_rotation_count + 1;
1733
1734 if (viewer_stream_and_session_in_same_chunk) {
1735 DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
1736 vstream->stream_file.trace_chunk ?
1737 std::to_string(stream_file_chunk_id).c_str() :
1738 "None",
1739 conn->viewer_session->current_trace_chunk ?
1740 std::to_string(viewer_session_chunk_id).c_str() :
1741 "None");
1742 } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
1743 DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
1744 vstream->stream_file.trace_chunk ?
1745 std::to_string(stream_file_chunk_id).c_str() :
1746 "None",
1747 conn->viewer_session->current_trace_chunk ?
1748 std::to_string(viewer_session_chunk_id).c_str() :
1749 "None");
1750 } else {
1751 DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
1752 vstream->stream_file.trace_chunk ?
1753 std::to_string(stream_file_chunk_id).c_str() :
1754 "None",
1755 conn->viewer_session->current_trace_chunk ?
1756 std::to_string(viewer_session_chunk_id).c_str() :
1757 "None");
1758
80516611
JG
1759 viewer_stream_rotate_to_trace_chunk(vstream,
1760 conn->viewer_session->current_trace_chunk);
1761 vstream->last_seen_rotation_count =
1762 rstream->completed_rotation_count;
d3e2ba59
JD
1763 }
1764
878c34cf 1765 ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
878c34cf 1766 if (ret < 0) {
7591bab1 1767 goto error_put;
878c34cf
DG
1768 } else if (ret == 1) {
1769 /*
7591bab1
MD
1770 * We have no index to send and check_index_status has populated
1771 * viewer_index's status.
878c34cf 1772 */
d3e2ba59
JD
1773 goto send_reply;
1774 }
7591bab1 1775 /* At this point, ret is 0 thus we will be able to read the index. */
a0377dfe 1776 LTTNG_ASSERT(!ret);
d3e2ba59 1777
b0d240a2
MD
1778 /* Try to open an index if one is needed for that stream. */
1779 ret = try_open_index(vstream, rstream);
1780 if (ret == -ENOENT) {
1781 if (rstream->closed) {
1782 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1783 goto send_reply;
1784 } else {
1785 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1786 goto send_reply;
1787 }
1788 }
1789 if (ret < 0) {
1790 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1791 goto send_reply;
1792 }
1793
7591bab1
MD
1794 /*
1795 * vstream->stream_fd may be NULL if it has been closed by
1796 * tracefile rotation, or if we are at the beginning of the
1797 * stream. We open the data stream file here to protect against
1798 * overwrite caused by tracefile rotation (in association with
1799 * unlink performed before overwrite).
1800 */
8bb66c3c 1801 if (!vstream->stream_file.handle) {
ebb29c10 1802 char file_path[LTTNG_PATH_MAX];
8bb66c3c 1803 struct fs_handle *fs_handle;
ebb29c10
JG
1804
1805 ret = utils_stream_file_path(rstream->path_name,
1806 rstream->channel_name, rstream->tracefile_size,
1807 vstream->current_tracefile_id, NULL, file_path,
1808 sizeof(file_path));
7591bab1
MD
1809 if (ret < 0) {
1810 goto error_put;
1811 }
ebb29c10 1812
3ff5c5db
MD
1813 /*
1814 * It is possible the the file we are trying to open is
1815 * missing if the stream has been closed (application exits with
1816 * per-pid buffers) and a clear command has been performed.
1817 */
8bb66c3c 1818 status = lttng_trace_chunk_open_fs_handle(
ebb29c10 1819 vstream->stream_file.trace_chunk,
8bb66c3c 1820 file_path, O_RDONLY, 0, &fs_handle, true);
ebb29c10 1821 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
b0d240a2
MD
1822 if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
1823 rstream->closed) {
1824 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1825 goto send_reply;
1826 }
ebb29c10 1827 PERROR("Failed to open trace file for viewer stream");
7591bab1
MD
1828 goto error_put;
1829 }
8bb66c3c 1830 vstream->stream_file.handle = fs_handle;
d3e2ba59
JD
1831 }
1832
f04a971b 1833 ret = check_new_streams(conn);
4a9daf17 1834 if (ret < 0) {
7591bab1
MD
1835 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1836 goto send_reply;
4a9daf17
JD
1837 } else if (ret == 1) {
1838 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
1839 }
1840
f8f3885c
MD
1841 ret = lttng_index_file_read(vstream->index_file, &packet_index);
1842 if (ret) {
8bb66c3c 1843 ERR("Relay error reading index file");
7591bab1 1844 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
6b6b9a5a 1845 goto send_reply;
d3e2ba59 1846 } else {
c4e361a4 1847 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
a44ca2ca 1848 vstream->index_sent_seqcount++;
d3e2ba59
JD
1849 }
1850
1851 /*
1852 * Indexes are stored in big endian, no need to switch before sending.
1853 */
7591bab1
MD
1854 DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
1855 rstream->stream_handle,
9b9f9f94 1856 (uint64_t) be64toh(packet_index.offset));
d3e2ba59
JD
1857 viewer_index.offset = packet_index.offset;
1858 viewer_index.packet_size = packet_index.packet_size;
1859 viewer_index.content_size = packet_index.content_size;
1860 viewer_index.timestamp_begin = packet_index.timestamp_begin;
1861 viewer_index.timestamp_end = packet_index.timestamp_end;
1862 viewer_index.events_discarded = packet_index.events_discarded;
1863 viewer_index.stream_id = packet_index.stream_id;
1864
1865send_reply:
f1883937
MD
1866 if (rstream) {
1867 pthread_mutex_unlock(&rstream->lock);
b6ae2a95 1868 pthread_mutex_unlock(&rstream->trace->session->lock);
f1883937 1869 }
7591bab1
MD
1870
1871 if (metadata_viewer_stream) {
1872 pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
1873 DBG("get next index metadata check: recv %" PRIu64
1874 " sent %" PRIu64,
1875 metadata_viewer_stream->stream->metadata_received,
1876 metadata_viewer_stream->metadata_sent);
1877 if (!metadata_viewer_stream->stream->metadata_received ||
1878 metadata_viewer_stream->stream->metadata_received >
1879 metadata_viewer_stream->metadata_sent) {
1880 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
1881 }
1882 pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
1883 }
1884
d3e2ba59 1885 viewer_index.flags = htobe32(viewer_index.flags);
eea7556c 1886 health_code_update();
2f8f53af 1887
58eb9381 1888 ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
d3e2ba59 1889 if (ret < 0) {
7591bab1 1890 goto end;
d3e2ba59 1891 }
eea7556c 1892 health_code_update();
d3e2ba59 1893
9237e6a1
MD
1894 if (vstream) {
1895 DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
a44ca2ca 1896 vstream->index_sent_seqcount,
9237e6a1
MD
1897 vstream->stream->stream_handle);
1898 }
d3e2ba59 1899end:
7591bab1
MD
1900 if (metadata_viewer_stream) {
1901 viewer_stream_put(metadata_viewer_stream);
1902 }
1903 if (vstream) {
1904 viewer_stream_put(vstream);
1905 }
1906 return ret;
1907
1908error_put:
1909 pthread_mutex_unlock(&rstream->lock);
b6ae2a95 1910 pthread_mutex_unlock(&rstream->trace->session->lock);
7591bab1
MD
1911 if (metadata_viewer_stream) {
1912 viewer_stream_put(metadata_viewer_stream);
1913 }
1914 viewer_stream_put(vstream);
d3e2ba59
JD
1915 return ret;
1916}
1917
1918/*
1919 * Send the next index for a stream
1920 *
1921 * Return 0 on success or else a negative value.
1922 */
1923static
58eb9381 1924int viewer_get_packet(struct relay_connection *conn)
d3e2ba59 1925{
b6025e94 1926 int ret;
a5df8828 1927 off_t lseek_ret;
553b2adb 1928 char *reply = NULL;
d3e2ba59 1929 struct lttng_viewer_get_packet get_packet_info;
553b2adb 1930 struct lttng_viewer_trace_packet reply_header;
7591bab1 1931 struct relay_viewer_stream *vstream = NULL;
553b2adb 1932 uint32_t reply_size = sizeof(reply_header);
b6025e94
JR
1933 uint32_t packet_data_len = 0;
1934 ssize_t read_len;
8bb66c3c 1935 uint64_t stream_id;
d3e2ba59 1936
eea7556c 1937 health_code_update();
2f8f53af 1938
7591bab1
MD
1939 ret = recv_request(conn->sock, &get_packet_info,
1940 sizeof(get_packet_info));
2f8f53af 1941 if (ret < 0) {
d3e2ba59
JD
1942 goto end;
1943 }
eea7556c 1944 health_code_update();
d3e2ba59 1945
0233a6a5 1946 /* From this point on, the error label can be reached. */
553b2adb 1947 memset(&reply_header, 0, sizeof(reply_header));
8bb66c3c 1948 stream_id = (uint64_t) be64toh(get_packet_info.stream_id);
0233a6a5 1949
8bb66c3c 1950 vstream = viewer_stream_get_by_id(stream_id);
7591bab1 1951 if (!vstream) {
a44ca2ca 1952 DBG("Client requested packet of unknown stream id %" PRIu64,
8bb66c3c 1953 stream_id);
553b2adb
JG
1954 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
1955 goto send_reply_nolock;
b6025e94
JR
1956 } else {
1957 packet_data_len = be32toh(get_packet_info.len);
553b2adb 1958 reply_size += packet_data_len;
d3e2ba59 1959 }
2a174661 1960
ac497a37 1961 reply = (char *) zmalloc(reply_size);
553b2adb
JG
1962 if (!reply) {
1963 PERROR("packet reply zmalloc");
1964 reply_size = sizeof(reply_header);
d3e2ba59
JD
1965 goto error;
1966 }
1967
b6025e94 1968 pthread_mutex_lock(&vstream->stream->lock);
8bb66c3c 1969 lseek_ret = fs_handle_seek(vstream->stream_file.handle,
ebb29c10 1970 be64toh(get_packet_info.offset), SEEK_SET);
a5df8828 1971 if (lseek_ret < 0) {
8bb66c3c
JG
1972 PERROR("Failed to seek file system handle of viewer stream %" PRIu64
1973 " to offset %" PRIu64,
1974 stream_id,
ebb29c10 1975 (uint64_t) be64toh(get_packet_info.offset));
7591bab1 1976 goto error;
d3e2ba59 1977 }
8bb66c3c 1978 read_len = fs_handle_read(vstream->stream_file.handle,
ebb29c10 1979 reply + sizeof(reply_header), packet_data_len);
b6025e94 1980 if (read_len < packet_data_len) {
8bb66c3c
JG
1981 PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
1982 ", offset: %" PRIu64,
1983 stream_id,
9b9f9f94 1984 (uint64_t) be64toh(get_packet_info.offset));
7591bab1 1985 goto error;
d3e2ba59 1986 }
553b2adb
JG
1987 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
1988 reply_header.len = htobe32(packet_data_len);
d3e2ba59
JD
1989 goto send_reply;
1990
1991error:
f28ecb10
JG
1992 /* No payload to send on error. */
1993 reply_size = sizeof(reply_header);
553b2adb 1994 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
d3e2ba59
JD
1995
1996send_reply:
7591bab1
MD
1997 if (vstream) {
1998 pthread_mutex_unlock(&vstream->stream->lock);
1999 }
2000send_reply_nolock:
eea7556c
MD
2001
2002 health_code_update();
2f8f53af 2003
553b2adb
JG
2004 if (reply) {
2005 memcpy(reply, &reply_header, sizeof(reply_header));
2006 ret = send_response(conn->sock, reply, reply_size);
2007 } else {
2008 /* No reply to send. */
2009 ret = send_response(conn->sock, &reply_header,
2010 reply_size);
2011 }
2012
b6025e94 2013 health_code_update();
d3e2ba59 2014 if (ret < 0) {
b6025e94 2015 PERROR("sendmsg of packet data failed");
7591bab1 2016 goto end_free;
d3e2ba59 2017 }
d3e2ba59 2018
8bb66c3c 2019 DBG("Sent %u bytes for stream %" PRIu64, reply_size, stream_id);
d3e2ba59 2020
7591bab1 2021end_free:
553b2adb 2022 free(reply);
d3e2ba59 2023end:
7591bab1
MD
2024 if (vstream) {
2025 viewer_stream_put(vstream);
2026 }
d3e2ba59
JD
2027 return ret;
2028}
2029
2030/*
2031 * Send the session's metadata
2032 *
2033 * Return 0 on success else a negative value.
2034 */
2035static
58eb9381 2036int viewer_get_metadata(struct relay_connection *conn)
d3e2ba59
JD
2037{
2038 int ret = 0;
8bb66c3c 2039 int fd = -1;
d3e2ba59
JD
2040 ssize_t read_len;
2041 uint64_t len = 0;
2042 char *data = NULL;
2043 struct lttng_viewer_get_metadata request;
2044 struct lttng_viewer_metadata_packet reply;
7591bab1 2045 struct relay_viewer_stream *vstream = NULL;
d3e2ba59 2046
a0377dfe 2047 LTTNG_ASSERT(conn);
d3e2ba59 2048
eea7556c 2049 health_code_update();
2f8f53af 2050
58eb9381 2051 ret = recv_request(conn->sock, &request, sizeof(request));
2f8f53af 2052 if (ret < 0) {
d3e2ba59
JD
2053 goto end;
2054 }
eea7556c 2055 health_code_update();
d3e2ba59 2056
53efb85a
MD
2057 memset(&reply, 0, sizeof(reply));
2058
7591bab1
MD
2059 vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
2060 if (!vstream) {
3b463131
MD
2061 /*
2062 * The metadata stream can be closed by a CLOSE command
2063 * just before we attach. It can also be closed by
2064 * per-pid tracing during tracing. Therefore, it is
2065 * possible that we cannot find this viewer stream.
2066 * Reply back to the client with an error if we cannot
2067 * find it.
2068 */
a44ca2ca 2069 DBG("Client requested metadata of unknown stream id %" PRIu64,
9b9f9f94 2070 (uint64_t) be64toh(request.stream_id));
3b463131 2071 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
7591bab1 2072 goto send_reply;
d3e2ba59 2073 }
7591bab1
MD
2074 pthread_mutex_lock(&vstream->stream->lock);
2075 if (!vstream->stream->is_metadata) {
2076 ERR("Invalid metadata stream");
6763619c
JD
2077 goto error;
2078 }
2079
b0d240a2 2080 if (vstream->metadata_sent >= vstream->stream->metadata_received) {
94f73d08
MD
2081 /*
2082 * The live viewers expect to receive a NO_NEW_METADATA
2083 * status before a stream disappears, otherwise they abort the
2084 * entire live connection when receiving an error status.
b0d240a2
MD
2085 *
2086 * Clear feature resets the metadata_sent to 0 until the
2087 * same metadata is received again.
94f73d08 2088 */
c4e361a4 2089 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
94f73d08
MD
2090 /*
2091 * The live viewer considers a closed 0 byte metadata stream as
2092 * an error.
2093 */
2094 if (vstream->metadata_sent > 0) {
e8b269fa 2095 if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
8f141dbd
JG
2096 /*
2097 * Release ownership for the viewer metadata
2098 * stream. Note that this reference is the
2099 * viewer's reference. The vstream still exists
2100 * until the end of the function as
2101 * viewer_stream_get_by_id() took a reference.
2102 */
94f73d08
MD
2103 viewer_stream_put(vstream);
2104 }
8f141dbd 2105
e8b269fa 2106 vstream->stream->no_new_metadata_notified = true;
94f73d08 2107 }
d3e2ba59
JD
2108 goto send_reply;
2109 }
2110
ad8bec24
JG
2111 if (vstream->stream->trace_chunk &&
2112 !lttng_trace_chunk_ids_equal(
2113 conn->viewer_session->current_trace_chunk,
2114 vstream->stream->trace_chunk)) {
2115 /* A rotation has occurred on the relay stream. */
2116 DBG("Metadata relay stream and viewer chunk ids differ");
2117
2118 ret = viewer_session_set_trace_chunk_copy(
2119 conn->viewer_session,
2120 vstream->stream->trace_chunk);
2121 if (ret) {
2122 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2123 goto send_reply;
2124 }
2125 }
2126
87250ba1 2127 if (conn->viewer_session->current_trace_chunk &&
d8f644d9
MD
2128 !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
2129 vstream->stream_file.trace_chunk)) {
ad8bec24
JG
2130 bool acquired_reference;
2131
2132 DBG("Viewer session and viewer stream chunk differ: "
2133 "vsession chunk %p vstream chunk %p",
2134 conn->viewer_session->current_trace_chunk,
2135 vstream->stream_file.trace_chunk);
2136 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
2137 acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
a0377dfe 2138 LTTNG_ASSERT(acquired_reference);
ad8bec24
JG
2139 vstream->stream_file.trace_chunk =
2140 conn->viewer_session->current_trace_chunk;
2141 viewer_stream_close_files(vstream);
2142 }
2143
b0d240a2
MD
2144 len = vstream->stream->metadata_received - vstream->metadata_sent;
2145
87250ba1
JG
2146 if (!vstream->stream_file.trace_chunk) {
2147 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2148 len = 0;
2149 goto send_reply;
2150 } else if (vstream->stream_file.trace_chunk &&
2151 !vstream->stream_file.handle && len > 0) {
2152 /*
2153 * Either this is the first time the metadata file is read, or a
2154 * rotation of the corresponding relay stream has occurred.
2155 */
8bb66c3c 2156 struct fs_handle *fs_handle;
ebb29c10
JG
2157 char file_path[LTTNG_PATH_MAX];
2158 enum lttng_trace_chunk_status status;
2159 struct relay_stream *rstream = vstream->stream;
2160
2161 ret = utils_stream_file_path(rstream->path_name,
2162 rstream->channel_name, rstream->tracefile_size,
2163 vstream->current_tracefile_id, NULL, file_path,
2164 sizeof(file_path));
d3e2ba59
JD
2165 if (ret < 0) {
2166 goto error;
2167 }
ebb29c10 2168
3ff5c5db
MD
2169 /*
2170 * It is possible the the metadata file we are trying to open is
2171 * missing if the stream has been closed (application exits with
2172 * per-pid buffers) and a clear command has been performed.
2173 */
8bb66c3c 2174 status = lttng_trace_chunk_open_fs_handle(
ebb29c10 2175 vstream->stream_file.trace_chunk,
8bb66c3c 2176 file_path, O_RDONLY, 0, &fs_handle, true);
ebb29c10 2177 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
b0d240a2
MD
2178 if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
2179 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2180 len = 0;
2181 if (vstream->stream->closed) {
2182 viewer_stream_put(vstream);
2183 }
2184 goto send_reply;
2185 }
ebb29c10 2186 PERROR("Failed to open metadata file for viewer stream");
d3e2ba59
JD
2187 goto error;
2188 }
8bb66c3c 2189 vstream->stream_file.handle = fs_handle;
ad8bec24
JG
2190
2191 if (vstream->metadata_sent != 0) {
2192 /*
2193 * The client does not expect to receive any metadata
2194 * it has received and metadata files in successive
2195 * chunks must be a strict superset of one another.
2196 *
2197 * Skip the first `metadata_sent` bytes to ensure
2198 * they are not sent a second time to the client.
2199 *
2200 * Baring a block layer error or an internal error,
2201 * this seek should not fail as
2202 * `vstream->stream->metadata_received` is reset when
2203 * a relay stream is rotated. If this is reached, it is
2204 * safe to assume that
2205 * `metadata_received` > `metadata_sent`.
2206 */
2207 const off_t seek_ret = fs_handle_seek(fs_handle,
2208 vstream->metadata_sent, SEEK_SET);
2209
2210 if (seek_ret < 0) {
2211 PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64,
2212 vstream->metadata_sent);
2213 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2214 goto send_reply;
2215 }
2216 }
d3e2ba59
JD
2217 }
2218
2219 reply.len = htobe64(len);
ac497a37 2220 data = (char *) zmalloc(len);
d3e2ba59
JD
2221 if (!data) {
2222 PERROR("viewer metadata zmalloc");
2223 goto error;
2224 }
2225
8bb66c3c
JG
2226 fd = fs_handle_get_fd(vstream->stream_file.handle);
2227 if (fd < 0) {
2228 ERR("Failed to restore viewer stream file system handle");
2229 goto error;
2230 }
2231 read_len = lttng_read(fd, data, len);
2232 fs_handle_put_fd(vstream->stream_file.handle);
2233 fd = -1;
6cd525e8 2234 if (read_len < len) {
ad8bec24
JG
2235 if (read_len < 0) {
2236 PERROR("Failed to read metadata file");
2237 goto error;
2238 } else {
2239 /*
2240 * A clear has been performed which prevents the relay
2241 * from sending `len` bytes of metadata.
2242 *
2243 * It is important not to send any metadata if we
2244 * couldn't read all the available metadata in one shot:
2245 * sending partial metadata can cause the client to
2246 * attempt to parse an incomplete (incoherent) metadata
2247 * stream, which would result in an error.
2248 */
2249 const off_t seek_ret = fs_handle_seek(
2250 vstream->stream_file.handle, -read_len,
2251 SEEK_CUR);
2252
cfa1e2c2 2253 DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd",
ad8bec24
JG
2254 len, read_len);
2255 read_len = 0;
2256 len = 0;
2257 if (seek_ret < 0) {
2258 PERROR("Failed to restore metadata file position after partial read");
2259 ret = -1;
2260 goto error;
2261 }
2262 }
d3e2ba59 2263 }
7591bab1 2264 vstream->metadata_sent += read_len;
c4e361a4 2265 reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
7591bab1 2266
d3e2ba59
JD
2267 goto send_reply;
2268
2269error:
c4e361a4 2270 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
d3e2ba59
JD
2271
2272send_reply:
eea7556c 2273 health_code_update();
7591bab1
MD
2274 if (vstream) {
2275 pthread_mutex_unlock(&vstream->stream->lock);
2276 }
58eb9381 2277 ret = send_response(conn->sock, &reply, sizeof(reply));
d3e2ba59 2278 if (ret < 0) {
7591bab1 2279 goto end_free;
d3e2ba59 2280 }
eea7556c 2281 health_code_update();
d3e2ba59
JD
2282
2283 if (len > 0) {
58eb9381 2284 ret = send_response(conn->sock, data, len);
d3e2ba59 2285 if (ret < 0) {
7591bab1 2286 goto end_free;
d3e2ba59
JD
2287 }
2288 }
2289
2290 DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
9b9f9f94 2291 (uint64_t) be64toh(request.stream_id));
d3e2ba59
JD
2292
2293 DBG("Metadata sent");
2294
7591bab1 2295end_free:
d3e2ba59 2296 free(data);
d3e2ba59 2297end:
7591bab1
MD
2298 if (vstream) {
2299 viewer_stream_put(vstream);
2300 }
d3e2ba59
JD
2301 return ret;
2302}
2303
c3b7390b
JD
2304/*
2305 * Create a viewer session.
2306 *
2307 * Return 0 on success or else a negative value.
2308 */
2309static
2310int viewer_create_session(struct relay_connection *conn)
2311{
2312 int ret;
2313 struct lttng_viewer_create_session_response resp;
2314
53efb85a 2315 memset(&resp, 0, sizeof(resp));
c3b7390b 2316 resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
7591bab1 2317 conn->viewer_session = viewer_session_create();
c3b7390b
JD
2318 if (!conn->viewer_session) {
2319 ERR("Allocation viewer session");
2320 resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
2321 goto send_reply;
2322 }
c3b7390b
JD
2323
2324send_reply:
2325 health_code_update();
2326 ret = send_response(conn->sock, &resp, sizeof(resp));
2327 if (ret < 0) {
2328 goto end;
2329 }
2330 health_code_update();
2331 ret = 0;
2332
2333end:
2334 return ret;
2335}
2336
d62023be
JD
2337/*
2338 * Detach a viewer session.
2339 *
2340 * Return 0 on success or else a negative value.
2341 */
2342static
2343int viewer_detach_session(struct relay_connection *conn)
2344{
2345 int ret;
2346 struct lttng_viewer_detach_session_response response;
2347 struct lttng_viewer_detach_session_request request;
2348 struct relay_session *session = NULL;
2349 uint64_t viewer_session_to_close;
2350
a0377dfe 2351 LTTNG_ASSERT(conn);
d62023be
JD
2352
2353 health_code_update();
2354
2355 /* Receive the request from the connected client. */
2356 ret = recv_request(conn->sock, &request, sizeof(request));
2357 if (ret < 0) {
2358 goto end;
2359 }
2360 viewer_session_to_close = be64toh(request.session_id);
2361
2362 if (!conn->viewer_session) {
2363 DBG("Client trying to detach before creating a live viewer session");
2364 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR);
2365 goto send_reply;
2366 }
2367
2368 health_code_update();
2369
2370 memset(&response, 0, sizeof(response));
2371 DBG("Detaching from session ID %" PRIu64, viewer_session_to_close);
2372
2373 session = session_get_by_id(be64toh(request.session_id));
2374 if (!session) {
2375 DBG("Relay session %" PRIu64 " not found",
9b9f9f94 2376 (uint64_t) be64toh(request.session_id));
d62023be
JD
2377 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
2378 goto send_reply;
2379 }
2380
2381 ret = viewer_session_is_attached(conn->viewer_session, session);
2382 if (ret != 1) {
2383 DBG("Not attached to this session");
2384 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR);
2385 goto send_reply_put;
2386 }
2387
2388 viewer_session_close_one_session(conn->viewer_session, session);
2389 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_OK);
2390 DBG("Session %" PRIu64 " detached.", viewer_session_to_close);
2391
2392send_reply_put:
2393 session_put(session);
2394
2395send_reply:
2396 health_code_update();
2397 ret = send_response(conn->sock, &response, sizeof(response));
2398 if (ret < 0) {
2399 goto end;
2400 }
2401 health_code_update();
2402 ret = 0;
2403
2404end:
2405 return ret;
2406}
c3b7390b 2407
d3e2ba59
JD
2408/*
2409 * live_relay_unknown_command: send -1 if received unknown command
2410 */
2411static
58eb9381 2412void live_relay_unknown_command(struct relay_connection *conn)
d3e2ba59
JD
2413{
2414 struct lttcomm_relayd_generic_reply reply;
d3e2ba59 2415
53efb85a 2416 memset(&reply, 0, sizeof(reply));
d3e2ba59 2417 reply.ret_code = htobe32(LTTNG_ERR_UNK);
58eb9381 2418 (void) send_response(conn->sock, &reply, sizeof(reply));
d3e2ba59
JD
2419}
2420
2421/*
2422 * Process the commands received on the control socket
2423 */
2424static
2425int process_control(struct lttng_viewer_cmd *recv_hdr,
58eb9381 2426 struct relay_connection *conn)
d3e2ba59
JD
2427{
2428 int ret = 0;
7201079f
FD
2429 lttng_viewer_command cmd =
2430 (lttng_viewer_command) be32toh(recv_hdr->cmd);
2f8f53af
DG
2431
2432 /*
7201079f
FD
2433 * Make sure we've done the version check before any command other then
2434 * a new client connection.
2f8f53af 2435 */
7201079f
FD
2436 if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
2437 ERR("Viewer on connection %d requested %s command before version check",
2438 conn->sock->fd, lttng_viewer_command_str(cmd));
2f8f53af
DG
2439 ret = -1;
2440 goto end;
2441 }
d3e2ba59 2442
7201079f
FD
2443 DBG("Processing %s viewer command from connection %d",
2444 lttng_viewer_command_str(cmd), conn->sock->fd);
2445
2446 switch (cmd) {
c4e361a4 2447 case LTTNG_VIEWER_CONNECT:
58eb9381 2448 ret = viewer_connect(conn);
d3e2ba59 2449 break;
c4e361a4 2450 case LTTNG_VIEWER_LIST_SESSIONS:
58eb9381 2451 ret = viewer_list_sessions(conn);
d3e2ba59 2452 break;
c4e361a4 2453 case LTTNG_VIEWER_ATTACH_SESSION:
58eb9381 2454 ret = viewer_attach_session(conn);
d3e2ba59 2455 break;
c4e361a4 2456 case LTTNG_VIEWER_GET_NEXT_INDEX:
58eb9381 2457 ret = viewer_get_next_index(conn);
d3e2ba59 2458 break;
c4e361a4 2459 case LTTNG_VIEWER_GET_PACKET:
58eb9381 2460 ret = viewer_get_packet(conn);
d3e2ba59 2461 break;
c4e361a4 2462 case LTTNG_VIEWER_GET_METADATA:
58eb9381 2463 ret = viewer_get_metadata(conn);
d3e2ba59 2464 break;
c4e361a4 2465 case LTTNG_VIEWER_GET_NEW_STREAMS:
58eb9381 2466 ret = viewer_get_new_streams(conn);
80e8027a 2467 break;
c3b7390b
JD
2468 case LTTNG_VIEWER_CREATE_SESSION:
2469 ret = viewer_create_session(conn);
2470 break;
d62023be
JD
2471 case LTTNG_VIEWER_DETACH_SESSION:
2472 ret = viewer_detach_session(conn);
2473 break;
d3e2ba59 2474 default:
7591bab1
MD
2475 ERR("Received unknown viewer command (%u)",
2476 be32toh(recv_hdr->cmd));
58eb9381 2477 live_relay_unknown_command(conn);
d3e2ba59
JD
2478 ret = -1;
2479 goto end;
2480 }
2481
2482end:
2483 return ret;
2484}
2485
2486static
58eb9381 2487void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
d3e2ba59
JD
2488{
2489 int ret;
2490
58eb9381 2491 (void) lttng_poll_del(events, pollfd);
d3e2ba59 2492
29555a78
JG
2493 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
2494 fd_tracker_util_close_fd, NULL);
d3e2ba59
JD
2495 if (ret < 0) {
2496 ERR("Closing pollfd %d", pollfd);
2497 }
2498}
2499
d3e2ba59
JD
2500/*
2501 * This thread does the actual work
2502 */
2503static
2504void *thread_worker(void *data)
2505{
2506 int ret, err = -1;
2507 uint32_t nb_fd;
d3e2ba59 2508 struct lttng_poll_event events;
7591bab1 2509 struct lttng_ht *viewer_connections_ht;
d3e2ba59
JD
2510 struct lttng_ht_iter iter;
2511 struct lttng_viewer_cmd recv_hdr;
302d8906 2512 struct relay_connection *destroy_conn;
d3e2ba59
JD
2513
2514 DBG("[thread] Live viewer relay worker started");
2515
2516 rcu_register_thread();
2517
eea7556c
MD
2518 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_WORKER);
2519
9b5e0863
MD
2520 if (testpoint(relayd_thread_live_worker)) {
2521 goto error_testpoint;
2522 }
2523
d3e2ba59 2524 /* table of connections indexed on socket */
7591bab1
MD
2525 viewer_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2526 if (!viewer_connections_ht) {
2527 goto viewer_connections_ht_error;
d3e2ba59
JD
2528 }
2529
35bc1f58
JG
2530 ret = create_named_thread_poll_set(&events, 2,
2531 "Live viewer worker thread epoll");
d3e2ba59
JD
2532 if (ret < 0) {
2533 goto error_poll_create;
2534 }
2535
58eb9381 2536 ret = lttng_poll_add(&events, live_conn_pipe[0], LPOLLIN | LPOLLRDHUP);
d3e2ba59
JD
2537 if (ret < 0) {
2538 goto error;
2539 }
2540
2541restart:
2542 while (1) {
2543 int i;
2544
eea7556c
MD
2545 health_code_update();
2546
d3e2ba59
JD
2547 /* Infinite blocking call, waiting for transmission */
2548 DBG3("Relayd live viewer worker thread polling...");
eea7556c 2549 health_poll_entry();
d3e2ba59 2550 ret = lttng_poll_wait(&events, -1);
eea7556c 2551 health_poll_exit();
d3e2ba59
JD
2552 if (ret < 0) {
2553 /*
2554 * Restart interrupted system call.
2555 */
2556 if (errno == EINTR) {
2557 goto restart;
2558 }
2559 goto error;
2560 }
2561
2562 nb_fd = ret;
2563
2564 /*
2565 * Process control. The control connection is prioritised so we don't
2566 * starve it with high throughput tracing data on the data
2567 * connection.
2568 */
2569 for (i = 0; i < nb_fd; i++) {
2570 /* Fetch once the poll data */
2571 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
2572 int pollfd = LTTNG_POLL_GETFD(&events, i);
2573
eea7556c
MD
2574 health_code_update();
2575
d3e2ba59 2576 /* Thread quit pipe has been closed. Killing thread. */
bcf4a440 2577 ret = check_thread_quit_pipe(pollfd, revents);
d3e2ba59
JD
2578 if (ret) {
2579 err = 0;
2580 goto exit;
2581 }
2582
7591bab1 2583 /* Inspect the relay conn pipe for new connection. */
58eb9381 2584 if (pollfd == live_conn_pipe[0]) {
03e43155 2585 if (revents & LPOLLIN) {
302d8906
JG
2586 struct relay_connection *conn;
2587
7591bab1
MD
2588 ret = lttng_read(live_conn_pipe[0],
2589 &conn, sizeof(conn));
d3e2ba59
JD
2590 if (ret < 0) {
2591 goto error;
2592 }
73039936
FD
2593 ret = lttng_poll_add(&events,
2594 conn->sock->fd,
58eb9381 2595 LPOLLIN | LPOLLRDHUP);
73039936
FD
2596 if (ret) {
2597 ERR("Failed to add new live connection file descriptor to poll set");
2598 goto error;
2599 }
7591bab1
MD
2600 connection_ht_add(viewer_connections_ht, conn);
2601 DBG("Connection socket %d added to poll", conn->sock->fd);
03e43155
MD
2602 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2603 ERR("Relay live pipe error");
2604 goto error;
2605 } else {
2606 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2607 goto error;
d3e2ba59 2608 }
58eb9381 2609 } else {
7591bab1 2610 /* Connection activity. */
302d8906
JG
2611 struct relay_connection *conn;
2612
7591bab1
MD
2613 conn = connection_get_by_sock(viewer_connections_ht, pollfd);
2614 if (!conn) {
2615 continue;
2616 }
58eb9381 2617
03e43155 2618 if (revents & LPOLLIN) {
58eb9381
DG
2619 ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
2620 sizeof(recv_hdr), 0);
d3e2ba59 2621 if (ret <= 0) {
7591bab1 2622 /* Connection closed. */
58eb9381 2623 cleanup_connection_pollfd(&events, pollfd);
7591bab1
MD
2624 /* Put "create" ownership reference. */
2625 connection_put(conn);
58eb9381 2626 DBG("Viewer control conn closed with %d", pollfd);
d3e2ba59 2627 } else {
58eb9381 2628 ret = process_control(&recv_hdr, conn);
d3e2ba59
JD
2629 if (ret < 0) {
2630 /* Clear the session on error. */
58eb9381 2631 cleanup_connection_pollfd(&events, pollfd);
7591bab1
MD
2632 /* Put "create" ownership reference. */
2633 connection_put(conn);
d3e2ba59
JD
2634 DBG("Viewer connection closed with %d", pollfd);
2635 }
2636 }
03e43155
MD
2637 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2638 cleanup_connection_pollfd(&events, pollfd);
2639 /* Put "create" ownership reference. */
2640 connection_put(conn);
2641 } else {
2642 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2643 connection_put(conn);
2644 goto error;
d3e2ba59 2645 }
7591bab1
MD
2646 /* Put local "get_by_sock" reference. */
2647 connection_put(conn);
d3e2ba59
JD
2648 }
2649 }
2650 }
2651
2652exit:
2653error:
35bc1f58 2654 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
d3e2ba59 2655
71efa8ef 2656 /* Cleanup remaining connection object. */
d3e2ba59 2657 rcu_read_lock();
7591bab1 2658 cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter,
302d8906 2659 destroy_conn,
58eb9381 2660 sock_n.node) {
eea7556c 2661 health_code_update();
7591bab1 2662 connection_put(destroy_conn);
d3e2ba59
JD
2663 }
2664 rcu_read_unlock();
2665error_poll_create:
7591bab1
MD
2666 lttng_ht_destroy(viewer_connections_ht);
2667viewer_connections_ht_error:
58eb9381 2668 /* Close relay conn pipes */
87bcbe91 2669 (void) fd_tracker_util_pipe_close(the_fd_tracker, live_conn_pipe);
d3e2ba59
JD
2670 if (err) {
2671 DBG("Viewer worker thread exited with error");
2672 }
2673 DBG("Viewer worker thread cleanup complete");
9b5e0863 2674error_testpoint:
eea7556c
MD
2675 if (err) {
2676 health_error();
2677 ERR("Health error occurred in %s", __func__);
2678 }
2679 health_unregister(health_relayd);
b4aacfdc
MD
2680 if (lttng_relay_stop_threads()) {
2681 ERR("Error stopping threads");
178a0557 2682 }
d3e2ba59
JD
2683 rcu_unregister_thread();
2684 return NULL;
2685}
2686
2687/*
2688 * Create the relay command pipe to wake thread_manage_apps.
2689 * Closed in cleanup().
2690 */
58eb9381 2691static int create_conn_pipe(void)
d3e2ba59 2692{
87bcbe91
JG
2693 return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
2694 "Live connection pipe", live_conn_pipe);
178a0557 2695}
d3e2ba59 2696
178a0557 2697int relayd_live_join(void)
d3e2ba59 2698{
178a0557 2699 int ret, retval = 0;
d3e2ba59
JD
2700 void *status;
2701
d3e2ba59 2702 ret = pthread_join(live_listener_thread, &status);
178a0557
MD
2703 if (ret) {
2704 errno = ret;
d3e2ba59 2705 PERROR("pthread_join live listener");
178a0557 2706 retval = -1;
d3e2ba59
JD
2707 }
2708
2709 ret = pthread_join(live_worker_thread, &status);
178a0557
MD
2710 if (ret) {
2711 errno = ret;
d3e2ba59 2712 PERROR("pthread_join live worker");
178a0557 2713 retval = -1;
d3e2ba59
JD
2714 }
2715
2716 ret = pthread_join(live_dispatcher_thread, &status);
178a0557
MD
2717 if (ret) {
2718 errno = ret;
d3e2ba59 2719 PERROR("pthread_join live dispatcher");
178a0557 2720 retval = -1;
d3e2ba59
JD
2721 }
2722
178a0557 2723 cleanup_relayd_live();
d3e2ba59 2724
178a0557 2725 return retval;
d3e2ba59
JD
2726}
2727
2728/*
2729 * main
2730 */
7591bab1 2731int relayd_live_create(struct lttng_uri *uri)
d3e2ba59 2732{
178a0557 2733 int ret = 0, retval = 0;
d3e2ba59
JD
2734 void *status;
2735 int is_root;
2736
178a0557
MD
2737 if (!uri) {
2738 retval = -1;
2739 goto exit_init_data;
2740 }
d3e2ba59
JD
2741 live_uri = uri;
2742
d3e2ba59
JD
2743 /* Check if daemon is UID = 0 */
2744 is_root = !getuid();
2745
2746 if (!is_root) {
2747 if (live_uri->port < 1024) {
2748 ERR("Need to be root to use ports < 1024");
178a0557
MD
2749 retval = -1;
2750 goto exit_init_data;
d3e2ba59
JD
2751 }
2752 }
2753
2754 /* Setup the thread apps communication pipe. */
178a0557
MD
2755 if (create_conn_pipe()) {
2756 retval = -1;
2757 goto exit_init_data;
d3e2ba59
JD
2758 }
2759
2760 /* Init relay command queue. */
8bdee6e2 2761 cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
d3e2ba59
JD
2762
2763 /* Set up max poll set size */
25b397f9
MD
2764 if (lttng_poll_set_max_size()) {
2765 retval = -1;
2766 goto exit_init_data;
2767 }
d3e2ba59
JD
2768
2769 /* Setup the dispatcher thread */
1a1a34b4 2770 ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(),
d3e2ba59 2771 thread_dispatcher, (void *) NULL);
178a0557
MD
2772 if (ret) {
2773 errno = ret;
d3e2ba59 2774 PERROR("pthread_create viewer dispatcher");
178a0557
MD
2775 retval = -1;
2776 goto exit_dispatcher_thread;
d3e2ba59
JD
2777 }
2778
2779 /* Setup the worker thread */
1a1a34b4 2780 ret = pthread_create(&live_worker_thread, default_pthread_attr(),
7591bab1 2781 thread_worker, NULL);
178a0557
MD
2782 if (ret) {
2783 errno = ret;
d3e2ba59 2784 PERROR("pthread_create viewer worker");
178a0557
MD
2785 retval = -1;
2786 goto exit_worker_thread;
d3e2ba59
JD
2787 }
2788
2789 /* Setup the listener thread */
1a1a34b4 2790 ret = pthread_create(&live_listener_thread, default_pthread_attr(),
d3e2ba59 2791 thread_listener, (void *) NULL);
178a0557
MD
2792 if (ret) {
2793 errno = ret;
d3e2ba59 2794 PERROR("pthread_create viewer listener");
178a0557
MD
2795 retval = -1;
2796 goto exit_listener_thread;
d3e2ba59
JD
2797 }
2798
178a0557
MD
2799 /*
2800 * All OK, started all threads.
2801 */
2802 return retval;
2803
9911d21b
JG
2804 /*
2805 * Join on the live_listener_thread should anything be added after
2806 * the live_listener thread's creation.
2807 */
d3e2ba59 2808
178a0557 2809exit_listener_thread:
d3e2ba59 2810
d3e2ba59 2811 ret = pthread_join(live_worker_thread, &status);
178a0557
MD
2812 if (ret) {
2813 errno = ret;
d3e2ba59 2814 PERROR("pthread_join live worker");
178a0557 2815 retval = -1;
d3e2ba59 2816 }
178a0557 2817exit_worker_thread:
d3e2ba59 2818
d3e2ba59 2819 ret = pthread_join(live_dispatcher_thread, &status);
178a0557
MD
2820 if (ret) {
2821 errno = ret;
d3e2ba59 2822 PERROR("pthread_join live dispatcher");
178a0557 2823 retval = -1;
d3e2ba59 2824 }
178a0557 2825exit_dispatcher_thread:
d3e2ba59 2826
178a0557
MD
2827exit_init_data:
2828 cleanup_relayd_live();
d3e2ba59 2829
178a0557 2830 return retval;
d3e2ba59 2831}
This page took 0.214766 seconds and 4 git commands to generate.