Fix: relayd: rotation failure for multi-domain session
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <fcntl.h>
12 #include <getopt.h>
13 #include <grp.h>
14 #include <inttypes.h>
15 #include <limits.h>
16 #include <pthread.h>
17 #include <signal.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/mman.h>
22 #include <sys/mount.h>
23 #include <sys/resource.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 #include <sys/types.h>
27 #include <sys/wait.h>
28 #include <unistd.h>
29 #include <urcu/futex.h>
30 #include <urcu/rculist.h>
31 #include <urcu/uatomic.h>
32 #include <string>
33
34 #include <common/common.h>
35 #include <common/compat/endian.h>
36 #include <common/compat/poll.h>
37 #include <common/compat/socket.h>
38 #include <common/defaults.h>
39 #include <common/fd-tracker/utils.h>
40 #include <common/fs-handle.h>
41 #include <common/futex.h>
42 #include <common/index/index.h>
43 #include <common/sessiond-comm/inet.h>
44 #include <common/sessiond-comm/relayd.h>
45 #include <common/sessiond-comm/sessiond-comm.h>
46 #include <common/uri.h>
47 #include <common/utils.h>
48 #include <lttng/lttng.h>
49
50 #include "cmd.h"
51 #include "connection.h"
52 #include "ctf-trace.h"
53 #include "health-relayd.h"
54 #include "live.h"
55 #include "lttng-relayd.h"
56 #include "session.h"
57 #include "stream.h"
58 #include "testpoint.h"
59 #include "utils.h"
60 #include "viewer-session.h"
61 #include "viewer-stream.h"
62
63 #define SESSION_BUF_DEFAULT_COUNT 16
64
65 static struct lttng_uri *live_uri;
66
67 /*
68 * This pipe is used to inform the worker thread that a command is queued and
69 * ready to be processed.
70 */
71 static int live_conn_pipe[2] = { -1, -1 };
72
73 /* Shared between threads */
74 static int live_dispatch_thread_exit;
75
76 static pthread_t live_listener_thread;
77 static pthread_t live_dispatcher_thread;
78 static pthread_t live_worker_thread;
79
80 /*
81 * Relay command queue.
82 *
83 * The live_thread_listener and live_thread_dispatcher communicate with this
84 * queue.
85 */
86 static struct relay_conn_queue viewer_conn_queue;
87
88 static uint64_t last_relay_viewer_session_id;
89 static pthread_mutex_t last_relay_viewer_session_id_lock =
90 PTHREAD_MUTEX_INITIALIZER;
91
92 static
93 const char *lttng_viewer_command_str(lttng_viewer_command cmd)
94 {
95 switch (cmd) {
96 case LTTNG_VIEWER_CONNECT:
97 return "CONNECT";
98 case LTTNG_VIEWER_LIST_SESSIONS:
99 return "LIST_SESSIONS";
100 case LTTNG_VIEWER_ATTACH_SESSION:
101 return "ATTACH_SESSION";
102 case LTTNG_VIEWER_GET_NEXT_INDEX:
103 return "GET_NEXT_INDEX";
104 case LTTNG_VIEWER_GET_PACKET:
105 return "GET_PACKET";
106 case LTTNG_VIEWER_GET_METADATA:
107 return "GET_METADATA";
108 case LTTNG_VIEWER_GET_NEW_STREAMS:
109 return "GET_NEW_STREAMS";
110 case LTTNG_VIEWER_CREATE_SESSION:
111 return "CREATE_SESSION";
112 case LTTNG_VIEWER_DETACH_SESSION:
113 return "DETACH_SESSION";
114 default:
115 abort();
116 }
117 }
118
119 /*
120 * Cleanup the daemon
121 */
122 static
123 void cleanup_relayd_live(void)
124 {
125 DBG("Cleaning up");
126
127 free(live_uri);
128 }
129
130 /*
131 * Receive a request buffer using a given socket, destination allocated buffer
132 * of length size.
133 *
134 * Return the size of the received message or else a negative value on error
135 * with errno being set by recvmsg() syscall.
136 */
137 static
138 ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
139 {
140 ssize_t ret;
141
142 ret = sock->ops->recvmsg(sock, buf, size, 0);
143 if (ret < 0 || ret != size) {
144 if (ret == 0) {
145 /* Orderly shutdown. Not necessary to print an error. */
146 DBG("Socket %d did an orderly shutdown", sock->fd);
147 } else {
148 ERR("Relay failed to receive request.");
149 }
150 ret = -1;
151 }
152
153 return ret;
154 }
155
156 /*
157 * Send a response buffer using a given socket, source allocated buffer of
158 * length size.
159 *
160 * Return the size of the sent message or else a negative value on error with
161 * errno being set by sendmsg() syscall.
162 */
163 static
164 ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
165 {
166 ssize_t ret;
167
168 ret = sock->ops->sendmsg(sock, buf, size, 0);
169 if (ret < 0) {
170 ERR("Relayd failed to send response.");
171 }
172
173 return ret;
174 }
175
176 /*
177 * Atomically check if new streams got added in one of the sessions attached
178 * and reset the flag to 0.
179 *
180 * Returns 1 if new streams got added, 0 if nothing changed, a negative value
181 * on error.
182 */
183 static
184 int check_new_streams(struct relay_connection *conn)
185 {
186 struct relay_session *session;
187 unsigned long current_val;
188 int ret = 0;
189
190 if (!conn->viewer_session) {
191 goto end;
192 }
193 rcu_read_lock();
194 cds_list_for_each_entry_rcu(session,
195 &conn->viewer_session->session_list,
196 viewer_session_node) {
197 if (!session_get(session)) {
198 continue;
199 }
200 current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
201 ret = current_val;
202 session_put(session);
203 if (ret == 1) {
204 goto end;
205 }
206 }
207 end:
208 rcu_read_unlock();
209 return ret;
210 }
211
212 /*
213 * Send viewer streams to the given socket. The ignore_sent_flag indicates if
214 * this function should ignore the sent flag or not.
215 *
216 * Return 0 on success or else a negative value.
217 */
218 static
219 ssize_t send_viewer_streams(struct lttcomm_sock *sock,
220 uint64_t session_id, unsigned int ignore_sent_flag)
221 {
222 ssize_t ret;
223 struct lttng_ht_iter iter;
224 struct relay_viewer_stream *vstream;
225
226 rcu_read_lock();
227
228 cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
229 stream_n.node) {
230 struct ctf_trace *ctf_trace;
231 struct lttng_viewer_stream send_stream = {};
232
233 health_code_update();
234
235 if (!viewer_stream_get(vstream)) {
236 continue;
237 }
238
239 pthread_mutex_lock(&vstream->stream->lock);
240 /* Ignore if not the same session. */
241 if (vstream->stream->trace->session->id != session_id ||
242 (!ignore_sent_flag && vstream->sent_flag)) {
243 pthread_mutex_unlock(&vstream->stream->lock);
244 viewer_stream_put(vstream);
245 continue;
246 }
247
248 ctf_trace = vstream->stream->trace;
249 send_stream.id = htobe64(vstream->stream->stream_handle);
250 send_stream.ctf_trace_id = htobe64(ctf_trace->id);
251 send_stream.metadata_flag = htobe32(
252 vstream->stream->is_metadata);
253 if (lttng_strncpy(send_stream.path_name, vstream->path_name,
254 sizeof(send_stream.path_name))) {
255 pthread_mutex_unlock(&vstream->stream->lock);
256 viewer_stream_put(vstream);
257 ret = -1; /* Error. */
258 goto end_unlock;
259 }
260 if (lttng_strncpy(send_stream.channel_name,
261 vstream->channel_name,
262 sizeof(send_stream.channel_name))) {
263 pthread_mutex_unlock(&vstream->stream->lock);
264 viewer_stream_put(vstream);
265 ret = -1; /* Error. */
266 goto end_unlock;
267 }
268
269 DBG("Sending stream %" PRIu64 " to viewer",
270 vstream->stream->stream_handle);
271 vstream->sent_flag = 1;
272 pthread_mutex_unlock(&vstream->stream->lock);
273
274 ret = send_response(sock, &send_stream, sizeof(send_stream));
275 viewer_stream_put(vstream);
276 if (ret < 0) {
277 goto end_unlock;
278 }
279 }
280
281 ret = 0;
282
283 end_unlock:
284 rcu_read_unlock();
285 return ret;
286 }
287
288 /*
289 * Create every viewer stream possible for the given session with the seek
290 * type. Three counters *can* be return which are in order the total amount of
291 * viewer stream of the session, the number of unsent stream and the number of
292 * stream created. Those counters can be NULL and thus will be ignored.
293 *
294 * session must be locked to ensure that we see either none or all initial
295 * streams for a session, but no intermediate state..
296 *
297 * Return 0 on success or else a negative value.
298 */
299 static int make_viewer_streams(struct relay_session *relay_session,
300 struct relay_viewer_session *viewer_session,
301 enum lttng_viewer_seek seek_t,
302 uint32_t *nb_total,
303 uint32_t *nb_unsent,
304 uint32_t *nb_created,
305 bool *closed)
306 {
307 int ret;
308 struct lttng_ht_iter iter;
309 struct ctf_trace *ctf_trace;
310 struct relay_stream *relay_stream = NULL;
311
312 LTTNG_ASSERT(relay_session);
313 ASSERT_LOCKED(relay_session->lock);
314
315 if (relay_session->connection_closed) {
316 *closed = true;
317 }
318
319 /*
320 * Create viewer streams for relay streams that are ready to be
321 * used for a the given session id only.
322 */
323 rcu_read_lock();
324 cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter,
325 ctf_trace, node.node) {
326 bool trace_has_metadata_stream = false;
327
328 health_code_update();
329
330 if (!ctf_trace_get(ctf_trace)) {
331 continue;
332 }
333
334 /*
335 * Iterate over all the streams of the trace to see if we have a
336 * metadata stream.
337 */
338 cds_list_for_each_entry_rcu(relay_stream,
339 &ctf_trace->stream_list, stream_node)
340 {
341 bool is_metadata_stream;
342
343 pthread_mutex_lock(&relay_stream->lock);
344 is_metadata_stream = relay_stream->is_metadata;
345 pthread_mutex_unlock(&relay_stream->lock);
346
347 if (is_metadata_stream) {
348 trace_has_metadata_stream = true;
349 break;
350 }
351 }
352
353 relay_stream = NULL;
354
355 /*
356 * If there is no metadata stream in this trace at the moment
357 * and we never sent one to the viewer, skip the trace. We
358 * accept that the viewer will not see this trace at all.
359 */
360 if (!trace_has_metadata_stream &&
361 !ctf_trace->metadata_stream_sent_to_viewer) {
362 ctf_trace_put(ctf_trace);
363 continue;
364 }
365
366 cds_list_for_each_entry_rcu(relay_stream,
367 &ctf_trace->stream_list, stream_node)
368 {
369 struct relay_viewer_stream *viewer_stream;
370
371 if (!stream_get(relay_stream)) {
372 continue;
373 }
374
375 pthread_mutex_lock(&relay_stream->lock);
376 /*
377 * stream published is protected by the session lock.
378 */
379 if (!relay_stream->published) {
380 goto next;
381 }
382 viewer_stream = viewer_stream_get_by_id(
383 relay_stream->stream_handle);
384 if (!viewer_stream) {
385 struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
386
387 /*
388 * Save that we sent the metadata stream to the
389 * viewer. So that we know what trace the viewer
390 * is aware of.
391 */
392 if (relay_stream->is_metadata) {
393 ctf_trace->metadata_stream_sent_to_viewer = true;
394 }
395
396 /*
397 * If a rotation is ongoing, use a copy of the
398 * relay stream's chunk to ensure the stream
399 * files exist.
400 *
401 * Otherwise, the viewer session's current trace
402 * chunk can be used safely.
403 */
404 if ((relay_stream->ongoing_rotation.is_set ||
405 relay_session->ongoing_rotation) &&
406 relay_stream->trace_chunk) {
407 viewer_stream_trace_chunk = lttng_trace_chunk_copy(
408 relay_stream->trace_chunk);
409 if (!viewer_stream_trace_chunk) {
410 ret = -1;
411 ctf_trace_put(ctf_trace);
412 goto error_unlock;
413 }
414 } else {
415 /*
416 * Transition the viewer session into the newest trace chunk available.
417 */
418 if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk,
419 relay_stream->trace_chunk)) {
420
421 ret = viewer_session_set_trace_chunk_copy(
422 viewer_session,
423 relay_stream->trace_chunk);
424 if (ret) {
425 ret = -1;
426 ctf_trace_put(ctf_trace);
427 goto error_unlock;
428 }
429 }
430
431 if (relay_stream->trace_chunk) {
432 /*
433 * If the corresponding relay
434 * stream's trace chunk is set,
435 * the viewer stream will be
436 * created under it.
437 *
438 * Note that a relay stream can
439 * have a NULL output trace
440 * chunk (for instance, after a
441 * clear against a stopped
442 * session).
443 */
444 const bool reference_acquired = lttng_trace_chunk_get(
445 viewer_session->current_trace_chunk);
446
447 LTTNG_ASSERT(reference_acquired);
448 viewer_stream_trace_chunk =
449 viewer_session->current_trace_chunk;
450 }
451 }
452
453 viewer_stream = viewer_stream_create(
454 relay_stream,
455 viewer_stream_trace_chunk,
456 seek_t);
457 lttng_trace_chunk_put(viewer_stream_trace_chunk);
458 viewer_stream_trace_chunk = NULL;
459 if (!viewer_stream) {
460 ret = -1;
461 ctf_trace_put(ctf_trace);
462 goto error_unlock;
463 }
464
465 if (nb_created) {
466 /* Update number of created stream counter. */
467 (*nb_created)++;
468 }
469 /*
470 * Ensure a self-reference is preserved even
471 * after we have put our local reference.
472 */
473 if (!viewer_stream_get(viewer_stream)) {
474 ERR("Unable to get self-reference on viewer stream, logic error.");
475 abort();
476 }
477 } else {
478 if (!viewer_stream->sent_flag && nb_unsent) {
479 /* Update number of unsent stream counter. */
480 (*nb_unsent)++;
481 }
482 }
483 /* Update number of total stream counter. */
484 if (nb_total) {
485 if (relay_stream->is_metadata) {
486 if (!relay_stream->closed ||
487 relay_stream->metadata_received >
488 viewer_stream->metadata_sent) {
489 (*nb_total)++;
490 }
491 } else {
492 if (!relay_stream->closed ||
493 !(((int64_t)(relay_stream->prev_data_seq -
494 relay_stream->last_net_seq_num)) >=
495 0)) {
496 (*nb_total)++;
497 }
498 }
499 }
500 /* Put local reference. */
501 viewer_stream_put(viewer_stream);
502 next:
503 pthread_mutex_unlock(&relay_stream->lock);
504 stream_put(relay_stream);
505 }
506 relay_stream = NULL;
507 ctf_trace_put(ctf_trace);
508 }
509
510 ret = 0;
511
512 error_unlock:
513 rcu_read_unlock();
514
515 if (relay_stream) {
516 pthread_mutex_unlock(&relay_stream->lock);
517 stream_put(relay_stream);
518 }
519
520 return ret;
521 }
522
523 int relayd_live_stop(void)
524 {
525 /* Stop dispatch thread */
526 CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
527 futex_nto1_wake(&viewer_conn_queue.futex);
528 return 0;
529 }
530
531 /*
532 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
533 */
534 static
535 int create_named_thread_poll_set(struct lttng_poll_event *events,
536 int size, const char *name)
537 {
538 int ret;
539
540 if (events == NULL || size == 0) {
541 ret = -1;
542 goto error;
543 }
544
545 ret = fd_tracker_util_poll_create(the_fd_tracker,
546 name, events, 1, LTTNG_CLOEXEC);
547 if (ret) {
548 PERROR("Failed to create \"%s\" poll file descriptor", name);
549 goto error;
550 }
551
552 /* Add quit pipe */
553 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
554 if (ret < 0) {
555 goto error;
556 }
557
558 return 0;
559
560 error:
561 return ret;
562 }
563
564 /*
565 * Check if the thread quit pipe was triggered.
566 *
567 * Return 1 if it was triggered else 0;
568 */
569 static
570 int check_thread_quit_pipe(int fd, uint32_t events)
571 {
572 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
573 return 1;
574 }
575
576 return 0;
577 }
578
579 static
580 int create_sock(void *data, int *out_fd)
581 {
582 int ret;
583 struct lttcomm_sock *sock = (lttcomm_sock *) data;
584
585 ret = lttcomm_create_sock(sock);
586 if (ret < 0) {
587 goto end;
588 }
589
590 *out_fd = sock->fd;
591 end:
592 return ret;
593 }
594
595 static
596 int close_sock(void *data, int *in_fd)
597 {
598 struct lttcomm_sock *sock = (lttcomm_sock *) data;
599
600 return sock->ops->close(sock);
601 }
602
603 static int accept_sock(void *data, int *out_fd)
604 {
605 int ret = 0;
606 /* Socks is an array of in_sock, out_sock. */
607 struct lttcomm_sock **socks = (lttcomm_sock **) data;
608 struct lttcomm_sock *in_sock = socks[0];
609
610 socks[1] = in_sock->ops->accept(in_sock);
611 if (!socks[1]) {
612 ret = -1;
613 goto end;
614 }
615 *out_fd = socks[1]->fd;
616 end:
617 return ret;
618 }
619
620 static
621 struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
622 const char *name)
623 {
624 int out_fd, ret;
625 struct lttcomm_sock *socks[2] = { listening_sock, NULL };
626 struct lttcomm_sock *new_sock = NULL;
627
628 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd,
629 (const char **) &name, 1, accept_sock, &socks);
630 if (ret) {
631 goto end;
632 }
633 new_sock = socks[1];
634 DBG("%s accepted, socket %d", name, new_sock->fd);
635 end:
636 return new_sock;
637 }
638
639 /*
640 * Create and init socket from uri.
641 */
642 static
643 struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
644 {
645 int ret, sock_fd;
646 struct lttcomm_sock *sock = NULL;
647 char uri_str[LTTNG_PATH_MAX];
648 char *formated_name = NULL;
649
650 sock = lttcomm_alloc_sock_from_uri(uri);
651 if (sock == NULL) {
652 ERR("Allocating socket");
653 goto error;
654 }
655
656 /*
657 * Don't fail to create the socket if the name can't be built as it is
658 * only used for debugging purposes.
659 */
660 ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
661 uri_str[sizeof(uri_str) - 1] = '\0';
662 if (ret >= 0) {
663 ret = asprintf(&formated_name, "%s socket @ %s", name,
664 uri_str);
665 if (ret < 0) {
666 formated_name = NULL;
667 }
668 }
669
670 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
671 (const char **) (formated_name ? &formated_name : NULL),
672 1, create_sock, sock);
673 if (ret) {
674 PERROR("Failed to create \"%s\" socket",
675 formated_name ?: "Unknown");
676 goto error;
677 }
678 DBG("Listening on %s socket %d", name, sock->fd);
679
680 ret = sock->ops->bind(sock);
681 if (ret < 0) {
682 PERROR("Failed to bind lttng-live socket");
683 goto error;
684 }
685
686 ret = sock->ops->listen(sock, -1);
687 if (ret < 0) {
688 goto error;
689
690 }
691
692 free(formated_name);
693 return sock;
694
695 error:
696 if (sock) {
697 lttcomm_destroy_sock(sock);
698 }
699 free(formated_name);
700 return NULL;
701 }
702
703 /*
704 * This thread manages the listening for new connections on the network
705 */
706 static
707 void *thread_listener(void *data)
708 {
709 int i, ret, pollfd, err = -1;
710 uint32_t revents, nb_fd;
711 struct lttng_poll_event events;
712 struct lttcomm_sock *live_control_sock;
713
714 DBG("[thread] Relay live listener started");
715
716 rcu_register_thread();
717 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_LISTENER);
718
719 health_code_update();
720
721 live_control_sock = init_socket(live_uri, "Live listener");
722 if (!live_control_sock) {
723 goto error_sock_control;
724 }
725
726 /* Pass 2 as size here for the thread quit pipe and control sockets. */
727 ret = create_named_thread_poll_set(&events, 2,
728 "Live listener thread epoll");
729 if (ret < 0) {
730 goto error_create_poll;
731 }
732
733 /* Add the control socket */
734 ret = lttng_poll_add(&events, live_control_sock->fd, LPOLLIN | LPOLLRDHUP);
735 if (ret < 0) {
736 goto error_poll_add;
737 }
738
739 lttng_relay_notify_ready();
740
741 if (testpoint(relayd_thread_live_listener)) {
742 goto error_testpoint;
743 }
744
745 while (1) {
746 health_code_update();
747
748 DBG("Listener accepting live viewers connections");
749
750 restart:
751 health_poll_entry();
752 ret = lttng_poll_wait(&events, -1);
753 health_poll_exit();
754 if (ret < 0) {
755 /*
756 * Restart interrupted system call.
757 */
758 if (errno == EINTR) {
759 goto restart;
760 }
761 goto error;
762 }
763 nb_fd = ret;
764
765 DBG("Relay new viewer connection received");
766 for (i = 0; i < nb_fd; i++) {
767 health_code_update();
768
769 /* Fetch once the poll data */
770 revents = LTTNG_POLL_GETEV(&events, i);
771 pollfd = LTTNG_POLL_GETFD(&events, i);
772
773 /* Thread quit pipe has been closed. Killing thread. */
774 ret = check_thread_quit_pipe(pollfd, revents);
775 if (ret) {
776 err = 0;
777 goto exit;
778 }
779
780 if (revents & LPOLLIN) {
781 /*
782 * A new connection is requested, therefore a
783 * viewer connection is allocated in this
784 * thread, enqueued to a global queue and
785 * dequeued (and freed) in the worker thread.
786 */
787 int val = 1;
788 struct relay_connection *new_conn;
789 struct lttcomm_sock *newsock;
790
791 newsock = accept_live_sock(live_control_sock,
792 "Live socket to client");
793 if (!newsock) {
794 PERROR("accepting control sock");
795 goto error;
796 }
797 DBG("Relay viewer connection accepted socket %d", newsock->fd);
798
799 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
800 sizeof(val));
801 if (ret < 0) {
802 PERROR("setsockopt inet");
803 lttcomm_destroy_sock(newsock);
804 goto error;
805 }
806 new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
807 if (!new_conn) {
808 lttcomm_destroy_sock(newsock);
809 goto error;
810 }
811 /* Ownership assumed by the connection. */
812 newsock = NULL;
813
814 /* Enqueue request for the dispatcher thread. */
815 cds_wfcq_head_ptr_t head;
816 head.h = &viewer_conn_queue.head;
817 cds_wfcq_enqueue(head, &viewer_conn_queue.tail,
818 &new_conn->qnode);
819
820 /*
821 * Wake the dispatch queue futex.
822 * Implicit memory barrier with the
823 * exchange in cds_wfcq_enqueue.
824 */
825 futex_nto1_wake(&viewer_conn_queue.futex);
826 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
827 ERR("socket poll error");
828 goto error;
829 } else {
830 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
831 goto error;
832 }
833 }
834 }
835
836 exit:
837 error:
838 error_poll_add:
839 error_testpoint:
840 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
841 error_create_poll:
842 if (live_control_sock->fd >= 0) {
843 int sock_fd = live_control_sock->fd;
844
845 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
846 &sock_fd, 1, close_sock,
847 live_control_sock);
848 if (ret) {
849 PERROR("close");
850 }
851 live_control_sock->fd = -1;
852 }
853 lttcomm_destroy_sock(live_control_sock);
854 error_sock_control:
855 if (err) {
856 health_error();
857 DBG("Live viewer listener thread exited with error");
858 }
859 health_unregister(health_relayd);
860 rcu_unregister_thread();
861 DBG("Live viewer listener thread cleanup complete");
862 if (lttng_relay_stop_threads()) {
863 ERR("Error stopping threads");
864 }
865 return NULL;
866 }
867
868 /*
869 * This thread manages the dispatching of the requests to worker threads
870 */
871 static
872 void *thread_dispatcher(void *data)
873 {
874 int err = -1;
875 ssize_t ret;
876 struct cds_wfcq_node *node;
877 struct relay_connection *conn = NULL;
878
879 DBG("[thread] Live viewer relay dispatcher started");
880
881 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_DISPATCHER);
882
883 if (testpoint(relayd_thread_live_dispatcher)) {
884 goto error_testpoint;
885 }
886
887 health_code_update();
888
889 for (;;) {
890 health_code_update();
891
892 /* Atomically prepare the queue futex */
893 futex_nto1_prepare(&viewer_conn_queue.futex);
894
895 if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
896 break;
897 }
898
899 do {
900 health_code_update();
901
902 /* Dequeue commands */
903 node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
904 &viewer_conn_queue.tail);
905 if (node == NULL) {
906 DBG("Woken up but nothing in the live-viewer "
907 "relay command queue");
908 /* Continue thread execution */
909 break;
910 }
911 conn = caa_container_of(node, struct relay_connection, qnode);
912 DBG("Dispatching viewer request waiting on sock %d",
913 conn->sock->fd);
914
915 /*
916 * Inform worker thread of the new request. This
917 * call is blocking so we can be assured that
918 * the data will be read at some point in time
919 * or wait to the end of the world :)
920 */
921 ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
922 if (ret < 0) {
923 PERROR("write conn pipe");
924 connection_put(conn);
925 goto error;
926 }
927 } while (node != NULL);
928
929 /* Futex wait on queue. Blocking call on futex() */
930 health_poll_entry();
931 futex_nto1_wait(&viewer_conn_queue.futex);
932 health_poll_exit();
933 }
934
935 /* Normal exit, no error */
936 err = 0;
937
938 error:
939 error_testpoint:
940 if (err) {
941 health_error();
942 ERR("Health error occurred in %s", __func__);
943 }
944 health_unregister(health_relayd);
945 DBG("Live viewer dispatch thread dying");
946 if (lttng_relay_stop_threads()) {
947 ERR("Error stopping threads");
948 }
949 return NULL;
950 }
951
952 /*
953 * Establish connection with the viewer and check the versions.
954 *
955 * Return 0 on success or else negative value.
956 */
957 static
958 int viewer_connect(struct relay_connection *conn)
959 {
960 int ret;
961 struct lttng_viewer_connect reply, msg;
962
963 conn->version_check_done = 1;
964
965 health_code_update();
966
967 ret = recv_request(conn->sock, &msg, sizeof(msg));
968 if (ret < 0) {
969 goto end;
970 }
971
972 health_code_update();
973
974 memset(&reply, 0, sizeof(reply));
975 reply.major = RELAYD_VERSION_COMM_MAJOR;
976 reply.minor = RELAYD_VERSION_COMM_MINOR;
977
978 /* Major versions must be the same */
979 if (reply.major != be32toh(msg.major)) {
980 DBG("Incompatible major versions ([relayd] %u vs [client] %u)",
981 reply.major, be32toh(msg.major));
982 ret = -1;
983 goto end;
984 }
985
986 conn->major = reply.major;
987 /* We adapt to the lowest compatible version */
988 if (reply.minor <= be32toh(msg.minor)) {
989 conn->minor = reply.minor;
990 } else {
991 conn->minor = be32toh(msg.minor);
992 }
993
994 if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_COMMAND) {
995 conn->type = RELAY_VIEWER_COMMAND;
996 } else if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_NOTIFICATION) {
997 conn->type = RELAY_VIEWER_NOTIFICATION;
998 } else {
999 ERR("Unknown connection type : %u", be32toh(msg.type));
1000 ret = -1;
1001 goto end;
1002 }
1003
1004 reply.major = htobe32(reply.major);
1005 reply.minor = htobe32(reply.minor);
1006 if (conn->type == RELAY_VIEWER_COMMAND) {
1007 /*
1008 * Increment outside of htobe64 macro, because the argument can
1009 * be used more than once within the macro, and thus the
1010 * operation may be undefined.
1011 */
1012 pthread_mutex_lock(&last_relay_viewer_session_id_lock);
1013 last_relay_viewer_session_id++;
1014 pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
1015 reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
1016 }
1017
1018 health_code_update();
1019
1020 ret = send_response(conn->sock, &reply, sizeof(reply));
1021 if (ret < 0) {
1022 goto end;
1023 }
1024
1025 health_code_update();
1026
1027 DBG("Version check done using protocol %u.%u", conn->major, conn->minor);
1028 ret = 0;
1029
1030 end:
1031 return ret;
1032 }
1033
1034 /*
1035 * Send the viewer the list of current sessions.
1036 * We need to create a copy of the hash table content because otherwise
1037 * we cannot assume the number of entries stays the same between getting
1038 * the number of HT elements and iteration over the HT.
1039 *
1040 * Return 0 on success or else a negative value.
1041 */
1042 static
1043 int viewer_list_sessions(struct relay_connection *conn)
1044 {
1045 int ret = 0;
1046 struct lttng_viewer_list_sessions session_list;
1047 struct lttng_ht_iter iter;
1048 struct relay_session *session;
1049 struct lttng_viewer_session *send_session_buf = NULL;
1050 uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
1051 uint32_t count = 0;
1052
1053 send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
1054 if (!send_session_buf) {
1055 return -1;
1056 }
1057
1058 rcu_read_lock();
1059 cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
1060 session_n.node) {
1061 struct lttng_viewer_session *send_session;
1062
1063 health_code_update();
1064
1065 pthread_mutex_lock(&session->lock);
1066 if (session->connection_closed) {
1067 /* Skip closed session */
1068 goto next_session;
1069 }
1070
1071 if (count >= buf_count) {
1072 struct lttng_viewer_session *newbuf;
1073 uint32_t new_buf_count = buf_count << 1;
1074
1075 newbuf = (lttng_viewer_session *) realloc(send_session_buf,
1076 new_buf_count * sizeof(*send_session_buf));
1077 if (!newbuf) {
1078 ret = -1;
1079 goto break_loop;
1080 }
1081 send_session_buf = newbuf;
1082 buf_count = new_buf_count;
1083 }
1084 send_session = &send_session_buf[count];
1085 if (lttng_strncpy(send_session->session_name,
1086 session->session_name,
1087 sizeof(send_session->session_name))) {
1088 ret = -1;
1089 goto break_loop;
1090 }
1091 if (lttng_strncpy(send_session->hostname, session->hostname,
1092 sizeof(send_session->hostname))) {
1093 ret = -1;
1094 goto break_loop;
1095 }
1096 send_session->id = htobe64(session->id);
1097 send_session->live_timer = htobe32(session->live_timer);
1098 if (session->viewer_attached) {
1099 send_session->clients = htobe32(1);
1100 } else {
1101 send_session->clients = htobe32(0);
1102 }
1103 send_session->streams = htobe32(session->stream_count);
1104 count++;
1105 next_session:
1106 pthread_mutex_unlock(&session->lock);
1107 continue;
1108 break_loop:
1109 pthread_mutex_unlock(&session->lock);
1110 break;
1111 }
1112 rcu_read_unlock();
1113 if (ret < 0) {
1114 goto end_free;
1115 }
1116
1117 session_list.sessions_count = htobe32(count);
1118
1119 health_code_update();
1120
1121 ret = send_response(conn->sock, &session_list, sizeof(session_list));
1122 if (ret < 0) {
1123 goto end_free;
1124 }
1125
1126 health_code_update();
1127
1128 ret = send_response(conn->sock, send_session_buf,
1129 count * sizeof(*send_session_buf));
1130 if (ret < 0) {
1131 goto end_free;
1132 }
1133 health_code_update();
1134
1135 ret = 0;
1136 end_free:
1137 free(send_session_buf);
1138 return ret;
1139 }
1140
1141 /*
1142 * Send the viewer the list of current streams.
1143 */
1144 static
1145 int viewer_get_new_streams(struct relay_connection *conn)
1146 {
1147 int ret, send_streams = 0;
1148 uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
1149 struct lttng_viewer_new_streams_request request;
1150 struct lttng_viewer_new_streams_response response;
1151 struct relay_session *session = NULL;
1152 uint64_t session_id;
1153 bool closed = false;
1154
1155 LTTNG_ASSERT(conn);
1156
1157 health_code_update();
1158
1159 /* Receive the request from the connected client. */
1160 ret = recv_request(conn->sock, &request, sizeof(request));
1161 if (ret < 0) {
1162 goto error;
1163 }
1164 session_id = be64toh(request.session_id);
1165
1166 health_code_update();
1167
1168 memset(&response, 0, sizeof(response));
1169
1170 session = session_get_by_id(session_id);
1171 if (!session) {
1172 DBG("Relay session %" PRIu64 " not found", session_id);
1173 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
1174 goto send_reply;
1175 }
1176
1177 if (!viewer_session_is_attached(conn->viewer_session, session)) {
1178 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
1179 goto send_reply;
1180 }
1181
1182 /*
1183 * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since
1184 * that at this point the client is already attached to the session.Aany
1185 * initial stream will have been created with the seek type at attach
1186 * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach).
1187 * Otherwise any event happening in a new stream between the attach and
1188 * a call to viewer_get_new_streams will be "lost" (never received) from
1189 * the viewer's point of view.
1190 */
1191 pthread_mutex_lock(&session->lock);
1192 /*
1193 * If a session rotation is ongoing, do not attempt to open any
1194 * stream, because the chunk can be in an intermediate state
1195 * due to directory renaming.
1196 */
1197 if (session->ongoing_rotation) {
1198 DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
1199 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
1200 goto send_reply_unlock;
1201 }
1202 ret = make_viewer_streams(session,
1203 conn->viewer_session,
1204 LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
1205 &nb_created, &closed);
1206 if (ret < 0) {
1207 goto error_unlock_session;
1208 }
1209 send_streams = 1;
1210 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
1211
1212 /* Only send back the newly created streams with the unsent ones. */
1213 nb_streams = nb_created + nb_unsent;
1214 response.streams_count = htobe32(nb_streams);
1215
1216 /*
1217 * If the session is closed, HUP when there are no more streams
1218 * with data.
1219 */
1220 if (closed && nb_total == 0) {
1221 send_streams = 0;
1222 response.streams_count = 0;
1223 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
1224 goto send_reply_unlock;
1225 }
1226 send_reply_unlock:
1227 pthread_mutex_unlock(&session->lock);
1228
1229 send_reply:
1230 health_code_update();
1231 ret = send_response(conn->sock, &response, sizeof(response));
1232 if (ret < 0) {
1233 goto end_put_session;
1234 }
1235 health_code_update();
1236
1237 /*
1238 * Unknown or empty session, just return gracefully, the viewer
1239 * knows what is happening.
1240 */
1241 if (!send_streams || !nb_streams) {
1242 ret = 0;
1243 goto end_put_session;
1244 }
1245
1246 /*
1247 * Send stream and *DON'T* ignore the sent flag so every viewer
1248 * streams that were not sent from that point will be sent to
1249 * the viewer.
1250 */
1251 ret = send_viewer_streams(conn->sock, session_id, 0);
1252 if (ret < 0) {
1253 goto end_put_session;
1254 }
1255
1256 end_put_session:
1257 if (session) {
1258 session_put(session);
1259 }
1260 error:
1261 return ret;
1262 error_unlock_session:
1263 pthread_mutex_unlock(&session->lock);
1264 session_put(session);
1265 return ret;
1266 }
1267
1268 /*
1269 * Send the viewer the list of current sessions.
1270 */
1271 static
1272 int viewer_attach_session(struct relay_connection *conn)
1273 {
1274 int send_streams = 0;
1275 ssize_t ret;
1276 uint32_t nb_streams = 0;
1277 enum lttng_viewer_seek seek_type;
1278 struct lttng_viewer_attach_session_request request;
1279 struct lttng_viewer_attach_session_response response;
1280 struct relay_session *session = NULL;
1281 enum lttng_viewer_attach_return_code viewer_attach_status;
1282 bool closed = false;
1283 uint64_t session_id;
1284
1285 LTTNG_ASSERT(conn);
1286
1287 health_code_update();
1288
1289 /* Receive the request from the connected client. */
1290 ret = recv_request(conn->sock, &request, sizeof(request));
1291 if (ret < 0) {
1292 goto error;
1293 }
1294
1295 session_id = be64toh(request.session_id);
1296 health_code_update();
1297
1298 memset(&response, 0, sizeof(response));
1299
1300 if (!conn->viewer_session) {
1301 DBG("Client trying to attach before creating a live viewer session");
1302 response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
1303 goto send_reply;
1304 }
1305
1306 session = session_get_by_id(session_id);
1307 if (!session) {
1308 DBG("Relay session %" PRIu64 " not found", session_id);
1309 response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
1310 goto send_reply;
1311 }
1312 DBG("Attach session ID %" PRIu64 " received", session_id);
1313
1314 pthread_mutex_lock(&session->lock);
1315 if (session->live_timer == 0) {
1316 DBG("Not live session");
1317 response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
1318 goto send_reply;
1319 }
1320
1321 send_streams = 1;
1322 viewer_attach_status = viewer_session_attach(conn->viewer_session,
1323 session);
1324 if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
1325 response.status = htobe32(viewer_attach_status);
1326 goto send_reply;
1327 }
1328
1329 switch (be32toh(request.seek)) {
1330 case LTTNG_VIEWER_SEEK_BEGINNING:
1331 case LTTNG_VIEWER_SEEK_LAST:
1332 response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
1333 seek_type = (lttng_viewer_seek) be32toh(request.seek);
1334 break;
1335 default:
1336 ERR("Wrong seek parameter");
1337 response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
1338 send_streams = 0;
1339 goto send_reply;
1340 }
1341
1342 /*
1343 * If a session rotation is ongoing, do not attempt to open any
1344 * stream, because the chunk can be in an intermediate state
1345 * due to directory renaming.
1346 */
1347 if (session->ongoing_rotation) {
1348 DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
1349 send_streams = 0;
1350 goto send_reply;
1351 }
1352
1353 ret = make_viewer_streams(session,
1354 conn->viewer_session, seek_type,
1355 &nb_streams, NULL, NULL, &closed);
1356 if (ret < 0) {
1357 goto end_put_session;
1358 }
1359 pthread_mutex_unlock(&session->lock);
1360 session_put(session);
1361 session = NULL;
1362
1363 response.streams_count = htobe32(nb_streams);
1364 /*
1365 * If the session is closed when the viewer is attaching, it
1366 * means some of the streams may have been concurrently removed,
1367 * so we don't allow the viewer to attach, even if there are
1368 * streams available.
1369 */
1370 if (closed) {
1371 send_streams = 0;
1372 response.streams_count = 0;
1373 response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
1374 goto send_reply;
1375 }
1376
1377 send_reply:
1378 health_code_update();
1379 ret = send_response(conn->sock, &response, sizeof(response));
1380 if (ret < 0) {
1381 goto end_put_session;
1382 }
1383 health_code_update();
1384
1385 /*
1386 * Unknown or empty session, just return gracefully, the viewer
1387 * knows what is happening.
1388 */
1389 if (!send_streams || !nb_streams) {
1390 ret = 0;
1391 goto end_put_session;
1392 }
1393
1394 /* Send stream and ignore the sent flag. */
1395 ret = send_viewer_streams(conn->sock, session_id, 1);
1396 if (ret < 0) {
1397 goto end_put_session;
1398 }
1399
1400 end_put_session:
1401 if (session) {
1402 pthread_mutex_unlock(&session->lock);
1403 session_put(session);
1404 }
1405 error:
1406 return ret;
1407 }
1408
1409 /*
1410 * Open the index file if needed for the given vstream.
1411 *
1412 * If an index file is successfully opened, the vstream will set it as its
1413 * current index file.
1414 *
1415 * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
1416 *
1417 * Called with rstream lock held.
1418 */
1419 static int try_open_index(struct relay_viewer_stream *vstream,
1420 struct relay_stream *rstream)
1421 {
1422 int ret = 0;
1423 const uint32_t connection_major = rstream->trace->session->major;
1424 const uint32_t connection_minor = rstream->trace->session->minor;
1425 enum lttng_trace_chunk_status chunk_status;
1426
1427 if (vstream->index_file) {
1428 goto end;
1429 }
1430
1431 /*
1432 * First time, we open the index file and at least one index is ready.
1433 */
1434 if (rstream->index_received_seqcount == 0 ||
1435 !vstream->stream_file.trace_chunk) {
1436 ret = -ENOENT;
1437 goto end;
1438 }
1439
1440 chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
1441 vstream->stream_file.trace_chunk, rstream->path_name,
1442 rstream->channel_name, rstream->tracefile_size,
1443 vstream->current_tracefile_id,
1444 lttng_to_index_major(connection_major, connection_minor),
1445 lttng_to_index_minor(connection_major, connection_minor),
1446 true, &vstream->index_file);
1447 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1448 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
1449 ret = -ENOENT;
1450 } else {
1451 ret = -1;
1452 }
1453 }
1454
1455 end:
1456 return ret;
1457 }
1458
1459 /*
1460 * Check the status of the index for the given stream. This function
1461 * updates the index structure if needed and can put (close) the vstream
1462 * in the HUP situation.
1463 *
1464 * Return 0 means that we can proceed with the index. A value of 1 means
1465 * that the index has been updated and is ready to be sent to the
1466 * client. A negative value indicates an error that can't be handled.
1467 *
1468 * Called with rstream lock held.
1469 */
1470 static int check_index_status(struct relay_viewer_stream *vstream,
1471 struct relay_stream *rstream, struct ctf_trace *trace,
1472 struct lttng_viewer_index *index)
1473 {
1474 int ret;
1475
1476 DBG("Check index status: index_received_seqcount %" PRIu64 " "
1477 "index_sent_seqcount %" PRIu64 " "
1478 "for stream %" PRIu64,
1479 rstream->index_received_seqcount,
1480 vstream->index_sent_seqcount,
1481 vstream->stream->stream_handle);
1482 if ((trace->session->connection_closed || rstream->closed)
1483 && rstream->index_received_seqcount
1484 == vstream->index_sent_seqcount) {
1485 /*
1486 * Last index sent and session connection or relay
1487 * stream are closed.
1488 */
1489 index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1490 goto hup;
1491 } else if (rstream->beacon_ts_end != -1ULL &&
1492 (rstream->index_received_seqcount == 0 ||
1493 (vstream->index_sent_seqcount != 0 &&
1494 rstream->index_received_seqcount
1495 <= vstream->index_sent_seqcount))) {
1496 /*
1497 * We've received a synchronization beacon and the last index
1498 * available has been sent, the index for now is inactive.
1499 *
1500 * In this case, we have received a beacon which allows us to
1501 * inform the client of a time interval during which we can
1502 * guarantee that there are no events to read (and never will
1503 * be).
1504 *
1505 * The sent seqcount can grow higher than receive seqcount on
1506 * clear because the rotation performed by clear will push
1507 * the index_sent_seqcount ahead (see
1508 * viewer_stream_sync_tracefile_array_tail) and skip over
1509 * packet sequence numbers.
1510 */
1511 index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
1512 index->timestamp_end = htobe64(rstream->beacon_ts_end);
1513 index->stream_id = htobe64(rstream->ctf_stream_id);
1514 DBG("Check index status: inactive with beacon, for stream %" PRIu64,
1515 vstream->stream->stream_handle);
1516 goto index_ready;
1517 } else if (rstream->index_received_seqcount == 0 ||
1518 (vstream->index_sent_seqcount != 0 &&
1519 rstream->index_received_seqcount
1520 <= vstream->index_sent_seqcount)) {
1521 /*
1522 * This checks whether received <= sent seqcount. In
1523 * this case, we have not received a beacon. Therefore,
1524 * we can only ask the client to retry later.
1525 *
1526 * The sent seqcount can grow higher than receive seqcount on
1527 * clear because the rotation performed by clear will push
1528 * the index_sent_seqcount ahead (see
1529 * viewer_stream_sync_tracefile_array_tail) and skip over
1530 * packet sequence numbers.
1531 */
1532 index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1533 DBG("Check index status: retry for stream %" PRIu64,
1534 vstream->stream->stream_handle);
1535 goto index_ready;
1536 } else if (!tracefile_array_seq_in_file(rstream->tfa,
1537 vstream->current_tracefile_id,
1538 vstream->index_sent_seqcount)) {
1539 /*
1540 * The next index we want to send cannot be read either
1541 * because we need to perform a rotation, or due to
1542 * the producer having overwritten its trace file.
1543 */
1544 DBG("Viewer stream %" PRIu64 " rotation",
1545 vstream->stream->stream_handle);
1546 ret = viewer_stream_rotate(vstream);
1547 if (ret == 1) {
1548 /* EOF across entire stream. */
1549 index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1550 goto hup;
1551 }
1552 /*
1553 * If we have been pushed due to overwrite, it
1554 * necessarily means there is data that can be read in
1555 * the stream. If we rotated because we reached the end
1556 * of a tracefile, it means the following tracefile
1557 * needs to contain at least one index, else we would
1558 * have already returned LTTNG_VIEWER_INDEX_RETRY to the
1559 * viewer. The updated index_sent_seqcount needs to
1560 * point to a readable index entry now.
1561 *
1562 * In the case where we "rotate" on a single file, we
1563 * can end up in a case where the requested index is
1564 * still unavailable.
1565 */
1566 if (rstream->tracefile_count == 1 &&
1567 !tracefile_array_seq_in_file(
1568 rstream->tfa,
1569 vstream->current_tracefile_id,
1570 vstream->index_sent_seqcount)) {
1571 index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1572 DBG("Check index status: retry: "
1573 "tracefile array sequence number %" PRIu64
1574 " not in file for stream %" PRIu64,
1575 vstream->index_sent_seqcount,
1576 vstream->stream->stream_handle);
1577 goto index_ready;
1578 }
1579 LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
1580 vstream->current_tracefile_id,
1581 vstream->index_sent_seqcount));
1582 }
1583 /* ret == 0 means successful so we continue. */
1584 ret = 0;
1585 return ret;
1586
1587 hup:
1588 viewer_stream_put(vstream);
1589 index_ready:
1590 return 1;
1591 }
1592
1593 static
1594 void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
1595 struct lttng_trace_chunk *new_trace_chunk)
1596 {
1597 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
1598
1599 if (new_trace_chunk) {
1600 const bool acquired_reference = lttng_trace_chunk_get(
1601 new_trace_chunk);
1602
1603 LTTNG_ASSERT(acquired_reference);
1604 }
1605
1606 vstream->stream_file.trace_chunk = new_trace_chunk;
1607 viewer_stream_sync_tracefile_array_tail(vstream);
1608 viewer_stream_close_files(vstream);
1609 }
1610
1611 /*
1612 * Send the next index for a stream.
1613 *
1614 * Return 0 on success or else a negative value.
1615 */
1616 static
1617 int viewer_get_next_index(struct relay_connection *conn)
1618 {
1619 int ret;
1620 struct lttng_viewer_get_next_index request_index;
1621 struct lttng_viewer_index viewer_index;
1622 struct ctf_packet_index packet_index;
1623 struct relay_viewer_stream *vstream = NULL;
1624 struct relay_stream *rstream = NULL;
1625 struct ctf_trace *ctf_trace = NULL;
1626 struct relay_viewer_stream *metadata_viewer_stream = NULL;
1627 bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
1628 uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
1629 enum lttng_trace_chunk_status status;
1630
1631 LTTNG_ASSERT(conn);
1632
1633 memset(&viewer_index, 0, sizeof(viewer_index));
1634 health_code_update();
1635
1636 ret = recv_request(conn->sock, &request_index, sizeof(request_index));
1637 if (ret < 0) {
1638 goto end;
1639 }
1640 health_code_update();
1641
1642 vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
1643 if (!vstream) {
1644 DBG("Client requested index of unknown stream id %" PRIu64,
1645 (uint64_t) be64toh(request_index.stream_id));
1646 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1647 goto send_reply;
1648 }
1649
1650 /* Use back. ref. Protected by refcounts. */
1651 rstream = vstream->stream;
1652 ctf_trace = rstream->trace;
1653
1654 /* metadata_viewer_stream may be NULL. */
1655 metadata_viewer_stream =
1656 ctf_trace_get_viewer_metadata_stream(ctf_trace);
1657
1658 /*
1659 * Hold the session lock to protect against concurrent changes
1660 * to the chunk files (e.g. rename done by clear), which are
1661 * protected by the session ongoing rotation state. Those are
1662 * synchronized with the session lock.
1663 */
1664 pthread_mutex_lock(&rstream->trace->session->lock);
1665 pthread_mutex_lock(&rstream->lock);
1666
1667 /*
1668 * The viewer should not ask for index on metadata stream.
1669 */
1670 if (rstream->is_metadata) {
1671 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1672 goto send_reply;
1673 }
1674
1675 if (rstream->ongoing_rotation.is_set) {
1676 /* Rotation is ongoing, try again later. */
1677 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1678 goto send_reply;
1679 }
1680
1681 if (rstream->trace->session->ongoing_rotation) {
1682 /* Rotation is ongoing, try again later. */
1683 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1684 goto send_reply;
1685 }
1686
1687 /*
1688 * Transition the viewer session into the newest trace chunk available.
1689 */
1690 if (!lttng_trace_chunk_ids_equal(
1691 conn->viewer_session->current_trace_chunk,
1692 rstream->trace_chunk)) {
1693 DBG("Relay stream and viewer chunk ids differ");
1694
1695 ret = viewer_session_set_trace_chunk_copy(
1696 conn->viewer_session,
1697 rstream->trace_chunk);
1698 if (ret) {
1699 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1700 goto send_reply;
1701 }
1702 }
1703
1704 /*
1705 * Transition the viewer stream into the latest trace chunk available.
1706 *
1707 * Note that the stream must _not_ rotate in one precise condition:
1708 * the relay stream has rotated to a NULL trace chunk and the viewer
1709 * stream is consuming the trace chunk that was active just before
1710 * that rotation to NULL.
1711 *
1712 * This allows clients to consume all the packets of a trace chunk
1713 * after a session's destruction.
1714 */
1715 if (vstream->stream_file.trace_chunk) {
1716 status = lttng_trace_chunk_get_id(
1717 vstream->stream_file.trace_chunk,
1718 &stream_file_chunk_id);
1719 LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
1720 }
1721 if (conn->viewer_session->current_trace_chunk) {
1722 status = lttng_trace_chunk_get_id(
1723 conn->viewer_session->current_trace_chunk,
1724 &viewer_session_chunk_id);
1725 LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
1726 }
1727
1728 viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
1729 conn->viewer_session->current_trace_chunk,
1730 vstream->stream_file.trace_chunk);
1731 viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
1732 vstream->last_seen_rotation_count + 1;
1733
1734 if (viewer_stream_and_session_in_same_chunk) {
1735 DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
1736 vstream->stream_file.trace_chunk ?
1737 std::to_string(stream_file_chunk_id).c_str() :
1738 "None",
1739 conn->viewer_session->current_trace_chunk ?
1740 std::to_string(viewer_session_chunk_id).c_str() :
1741 "None");
1742 } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
1743 DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
1744 vstream->stream_file.trace_chunk ?
1745 std::to_string(stream_file_chunk_id).c_str() :
1746 "None",
1747 conn->viewer_session->current_trace_chunk ?
1748 std::to_string(viewer_session_chunk_id).c_str() :
1749 "None");
1750 } else {
1751 DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
1752 vstream->stream_file.trace_chunk ?
1753 std::to_string(stream_file_chunk_id).c_str() :
1754 "None",
1755 conn->viewer_session->current_trace_chunk ?
1756 std::to_string(viewer_session_chunk_id).c_str() :
1757 "None");
1758
1759 viewer_stream_rotate_to_trace_chunk(vstream,
1760 conn->viewer_session->current_trace_chunk);
1761 vstream->last_seen_rotation_count =
1762 rstream->completed_rotation_count;
1763 }
1764
1765 ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
1766 if (ret < 0) {
1767 goto error_put;
1768 } else if (ret == 1) {
1769 /*
1770 * We have no index to send and check_index_status has populated
1771 * viewer_index's status.
1772 */
1773 goto send_reply;
1774 }
1775 /* At this point, ret is 0 thus we will be able to read the index. */
1776 LTTNG_ASSERT(!ret);
1777
1778 /* Try to open an index if one is needed for that stream. */
1779 ret = try_open_index(vstream, rstream);
1780 if (ret == -ENOENT) {
1781 if (rstream->closed) {
1782 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1783 goto send_reply;
1784 } else {
1785 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1786 goto send_reply;
1787 }
1788 }
1789 if (ret < 0) {
1790 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1791 goto send_reply;
1792 }
1793
1794 /*
1795 * vstream->stream_fd may be NULL if it has been closed by
1796 * tracefile rotation, or if we are at the beginning of the
1797 * stream. We open the data stream file here to protect against
1798 * overwrite caused by tracefile rotation (in association with
1799 * unlink performed before overwrite).
1800 */
1801 if (!vstream->stream_file.handle) {
1802 char file_path[LTTNG_PATH_MAX];
1803 struct fs_handle *fs_handle;
1804
1805 ret = utils_stream_file_path(rstream->path_name,
1806 rstream->channel_name, rstream->tracefile_size,
1807 vstream->current_tracefile_id, NULL, file_path,
1808 sizeof(file_path));
1809 if (ret < 0) {
1810 goto error_put;
1811 }
1812
1813 /*
1814 * It is possible the the file we are trying to open is
1815 * missing if the stream has been closed (application exits with
1816 * per-pid buffers) and a clear command has been performed.
1817 */
1818 status = lttng_trace_chunk_open_fs_handle(
1819 vstream->stream_file.trace_chunk,
1820 file_path, O_RDONLY, 0, &fs_handle, true);
1821 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1822 if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
1823 rstream->closed) {
1824 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1825 goto send_reply;
1826 }
1827 PERROR("Failed to open trace file for viewer stream");
1828 goto error_put;
1829 }
1830 vstream->stream_file.handle = fs_handle;
1831 }
1832
1833 ret = check_new_streams(conn);
1834 if (ret < 0) {
1835 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1836 goto send_reply;
1837 } else if (ret == 1) {
1838 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
1839 }
1840
1841 ret = lttng_index_file_read(vstream->index_file, &packet_index);
1842 if (ret) {
1843 ERR("Relay error reading index file");
1844 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1845 goto send_reply;
1846 } else {
1847 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
1848 vstream->index_sent_seqcount++;
1849 }
1850
1851 /*
1852 * Indexes are stored in big endian, no need to switch before sending.
1853 */
1854 DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
1855 rstream->stream_handle,
1856 (uint64_t) be64toh(packet_index.offset));
1857 viewer_index.offset = packet_index.offset;
1858 viewer_index.packet_size = packet_index.packet_size;
1859 viewer_index.content_size = packet_index.content_size;
1860 viewer_index.timestamp_begin = packet_index.timestamp_begin;
1861 viewer_index.timestamp_end = packet_index.timestamp_end;
1862 viewer_index.events_discarded = packet_index.events_discarded;
1863 viewer_index.stream_id = packet_index.stream_id;
1864
1865 send_reply:
1866 if (rstream) {
1867 pthread_mutex_unlock(&rstream->lock);
1868 pthread_mutex_unlock(&rstream->trace->session->lock);
1869 }
1870
1871 if (metadata_viewer_stream) {
1872 pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
1873 DBG("get next index metadata check: recv %" PRIu64
1874 " sent %" PRIu64,
1875 metadata_viewer_stream->stream->metadata_received,
1876 metadata_viewer_stream->metadata_sent);
1877 if (!metadata_viewer_stream->stream->metadata_received ||
1878 metadata_viewer_stream->stream->metadata_received >
1879 metadata_viewer_stream->metadata_sent) {
1880 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
1881 }
1882 pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
1883 }
1884
1885 viewer_index.flags = htobe32(viewer_index.flags);
1886 health_code_update();
1887
1888 ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
1889 if (ret < 0) {
1890 goto end;
1891 }
1892 health_code_update();
1893
1894 if (vstream) {
1895 DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
1896 vstream->index_sent_seqcount,
1897 vstream->stream->stream_handle);
1898 }
1899 end:
1900 if (metadata_viewer_stream) {
1901 viewer_stream_put(metadata_viewer_stream);
1902 }
1903 if (vstream) {
1904 viewer_stream_put(vstream);
1905 }
1906 return ret;
1907
1908 error_put:
1909 pthread_mutex_unlock(&rstream->lock);
1910 pthread_mutex_unlock(&rstream->trace->session->lock);
1911 if (metadata_viewer_stream) {
1912 viewer_stream_put(metadata_viewer_stream);
1913 }
1914 viewer_stream_put(vstream);
1915 return ret;
1916 }
1917
1918 /*
1919 * Send the next index for a stream
1920 *
1921 * Return 0 on success or else a negative value.
1922 */
1923 static
1924 int viewer_get_packet(struct relay_connection *conn)
1925 {
1926 int ret;
1927 off_t lseek_ret;
1928 char *reply = NULL;
1929 struct lttng_viewer_get_packet get_packet_info;
1930 struct lttng_viewer_trace_packet reply_header;
1931 struct relay_viewer_stream *vstream = NULL;
1932 uint32_t reply_size = sizeof(reply_header);
1933 uint32_t packet_data_len = 0;
1934 ssize_t read_len;
1935 uint64_t stream_id;
1936
1937 health_code_update();
1938
1939 ret = recv_request(conn->sock, &get_packet_info,
1940 sizeof(get_packet_info));
1941 if (ret < 0) {
1942 goto end;
1943 }
1944 health_code_update();
1945
1946 /* From this point on, the error label can be reached. */
1947 memset(&reply_header, 0, sizeof(reply_header));
1948 stream_id = (uint64_t) be64toh(get_packet_info.stream_id);
1949
1950 vstream = viewer_stream_get_by_id(stream_id);
1951 if (!vstream) {
1952 DBG("Client requested packet of unknown stream id %" PRIu64,
1953 stream_id);
1954 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
1955 goto send_reply_nolock;
1956 } else {
1957 packet_data_len = be32toh(get_packet_info.len);
1958 reply_size += packet_data_len;
1959 }
1960
1961 reply = (char *) zmalloc(reply_size);
1962 if (!reply) {
1963 PERROR("packet reply zmalloc");
1964 reply_size = sizeof(reply_header);
1965 goto error;
1966 }
1967
1968 pthread_mutex_lock(&vstream->stream->lock);
1969 lseek_ret = fs_handle_seek(vstream->stream_file.handle,
1970 be64toh(get_packet_info.offset), SEEK_SET);
1971 if (lseek_ret < 0) {
1972 PERROR("Failed to seek file system handle of viewer stream %" PRIu64
1973 " to offset %" PRIu64,
1974 stream_id,
1975 (uint64_t) be64toh(get_packet_info.offset));
1976 goto error;
1977 }
1978 read_len = fs_handle_read(vstream->stream_file.handle,
1979 reply + sizeof(reply_header), packet_data_len);
1980 if (read_len < packet_data_len) {
1981 PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
1982 ", offset: %" PRIu64,
1983 stream_id,
1984 (uint64_t) be64toh(get_packet_info.offset));
1985 goto error;
1986 }
1987 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
1988 reply_header.len = htobe32(packet_data_len);
1989 goto send_reply;
1990
1991 error:
1992 /* No payload to send on error. */
1993 reply_size = sizeof(reply_header);
1994 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
1995
1996 send_reply:
1997 if (vstream) {
1998 pthread_mutex_unlock(&vstream->stream->lock);
1999 }
2000 send_reply_nolock:
2001
2002 health_code_update();
2003
2004 if (reply) {
2005 memcpy(reply, &reply_header, sizeof(reply_header));
2006 ret = send_response(conn->sock, reply, reply_size);
2007 } else {
2008 /* No reply to send. */
2009 ret = send_response(conn->sock, &reply_header,
2010 reply_size);
2011 }
2012
2013 health_code_update();
2014 if (ret < 0) {
2015 PERROR("sendmsg of packet data failed");
2016 goto end_free;
2017 }
2018
2019 DBG("Sent %u bytes for stream %" PRIu64, reply_size, stream_id);
2020
2021 end_free:
2022 free(reply);
2023 end:
2024 if (vstream) {
2025 viewer_stream_put(vstream);
2026 }
2027 return ret;
2028 }
2029
2030 /*
2031 * Send the session's metadata
2032 *
2033 * Return 0 on success else a negative value.
2034 */
2035 static
2036 int viewer_get_metadata(struct relay_connection *conn)
2037 {
2038 int ret = 0;
2039 int fd = -1;
2040 ssize_t read_len;
2041 uint64_t len = 0;
2042 char *data = NULL;
2043 struct lttng_viewer_get_metadata request;
2044 struct lttng_viewer_metadata_packet reply;
2045 struct relay_viewer_stream *vstream = NULL;
2046
2047 LTTNG_ASSERT(conn);
2048
2049 health_code_update();
2050
2051 ret = recv_request(conn->sock, &request, sizeof(request));
2052 if (ret < 0) {
2053 goto end;
2054 }
2055 health_code_update();
2056
2057 memset(&reply, 0, sizeof(reply));
2058
2059 vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
2060 if (!vstream) {
2061 /*
2062 * The metadata stream can be closed by a CLOSE command
2063 * just before we attach. It can also be closed by
2064 * per-pid tracing during tracing. Therefore, it is
2065 * possible that we cannot find this viewer stream.
2066 * Reply back to the client with an error if we cannot
2067 * find it.
2068 */
2069 DBG("Client requested metadata of unknown stream id %" PRIu64,
2070 (uint64_t) be64toh(request.stream_id));
2071 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2072 goto send_reply;
2073 }
2074 pthread_mutex_lock(&vstream->stream->lock);
2075 if (!vstream->stream->is_metadata) {
2076 ERR("Invalid metadata stream");
2077 goto error;
2078 }
2079
2080 if (vstream->metadata_sent >= vstream->stream->metadata_received) {
2081 /*
2082 * The live viewers expect to receive a NO_NEW_METADATA
2083 * status before a stream disappears, otherwise they abort the
2084 * entire live connection when receiving an error status.
2085 *
2086 * Clear feature resets the metadata_sent to 0 until the
2087 * same metadata is received again.
2088 */
2089 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2090 /*
2091 * The live viewer considers a closed 0 byte metadata stream as
2092 * an error.
2093 */
2094 if (vstream->metadata_sent > 0) {
2095 if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
2096 /* Release ownership for the viewer metadata stream. */
2097 viewer_stream_put(vstream);
2098 }
2099 vstream->stream->no_new_metadata_notified = true;
2100 }
2101 goto send_reply;
2102 }
2103
2104 if (vstream->stream->trace_chunk &&
2105 !lttng_trace_chunk_ids_equal(
2106 conn->viewer_session->current_trace_chunk,
2107 vstream->stream->trace_chunk)) {
2108 /* A rotation has occurred on the relay stream. */
2109 DBG("Metadata relay stream and viewer chunk ids differ");
2110
2111 ret = viewer_session_set_trace_chunk_copy(
2112 conn->viewer_session,
2113 vstream->stream->trace_chunk);
2114 if (ret) {
2115 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2116 goto send_reply;
2117 }
2118 }
2119
2120 if (conn->viewer_session->current_trace_chunk &&
2121 !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
2122 vstream->stream_file.trace_chunk)) {
2123 bool acquired_reference;
2124
2125 DBG("Viewer session and viewer stream chunk differ: "
2126 "vsession chunk %p vstream chunk %p",
2127 conn->viewer_session->current_trace_chunk,
2128 vstream->stream_file.trace_chunk);
2129 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
2130 acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
2131 LTTNG_ASSERT(acquired_reference);
2132 vstream->stream_file.trace_chunk =
2133 conn->viewer_session->current_trace_chunk;
2134 viewer_stream_close_files(vstream);
2135 }
2136
2137 len = vstream->stream->metadata_received - vstream->metadata_sent;
2138
2139 if (!vstream->stream_file.trace_chunk) {
2140 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2141 len = 0;
2142 goto send_reply;
2143 } else if (vstream->stream_file.trace_chunk &&
2144 !vstream->stream_file.handle && len > 0) {
2145 /*
2146 * Either this is the first time the metadata file is read, or a
2147 * rotation of the corresponding relay stream has occurred.
2148 */
2149 struct fs_handle *fs_handle;
2150 char file_path[LTTNG_PATH_MAX];
2151 enum lttng_trace_chunk_status status;
2152 struct relay_stream *rstream = vstream->stream;
2153
2154 ret = utils_stream_file_path(rstream->path_name,
2155 rstream->channel_name, rstream->tracefile_size,
2156 vstream->current_tracefile_id, NULL, file_path,
2157 sizeof(file_path));
2158 if (ret < 0) {
2159 goto error;
2160 }
2161
2162 /*
2163 * It is possible the the metadata file we are trying to open is
2164 * missing if the stream has been closed (application exits with
2165 * per-pid buffers) and a clear command has been performed.
2166 */
2167 status = lttng_trace_chunk_open_fs_handle(
2168 vstream->stream_file.trace_chunk,
2169 file_path, O_RDONLY, 0, &fs_handle, true);
2170 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
2171 if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
2172 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2173 len = 0;
2174 if (vstream->stream->closed) {
2175 viewer_stream_put(vstream);
2176 }
2177 goto send_reply;
2178 }
2179 PERROR("Failed to open metadata file for viewer stream");
2180 goto error;
2181 }
2182 vstream->stream_file.handle = fs_handle;
2183
2184 if (vstream->metadata_sent != 0) {
2185 /*
2186 * The client does not expect to receive any metadata
2187 * it has received and metadata files in successive
2188 * chunks must be a strict superset of one another.
2189 *
2190 * Skip the first `metadata_sent` bytes to ensure
2191 * they are not sent a second time to the client.
2192 *
2193 * Baring a block layer error or an internal error,
2194 * this seek should not fail as
2195 * `vstream->stream->metadata_received` is reset when
2196 * a relay stream is rotated. If this is reached, it is
2197 * safe to assume that
2198 * `metadata_received` > `metadata_sent`.
2199 */
2200 const off_t seek_ret = fs_handle_seek(fs_handle,
2201 vstream->metadata_sent, SEEK_SET);
2202
2203 if (seek_ret < 0) {
2204 PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64,
2205 vstream->metadata_sent);
2206 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2207 goto send_reply;
2208 }
2209 }
2210 }
2211
2212 reply.len = htobe64(len);
2213 data = (char *) zmalloc(len);
2214 if (!data) {
2215 PERROR("viewer metadata zmalloc");
2216 goto error;
2217 }
2218
2219 fd = fs_handle_get_fd(vstream->stream_file.handle);
2220 if (fd < 0) {
2221 ERR("Failed to restore viewer stream file system handle");
2222 goto error;
2223 }
2224 read_len = lttng_read(fd, data, len);
2225 fs_handle_put_fd(vstream->stream_file.handle);
2226 fd = -1;
2227 if (read_len < len) {
2228 if (read_len < 0) {
2229 PERROR("Failed to read metadata file");
2230 goto error;
2231 } else {
2232 /*
2233 * A clear has been performed which prevents the relay
2234 * from sending `len` bytes of metadata.
2235 *
2236 * It is important not to send any metadata if we
2237 * couldn't read all the available metadata in one shot:
2238 * sending partial metadata can cause the client to
2239 * attempt to parse an incomplete (incoherent) metadata
2240 * stream, which would result in an error.
2241 */
2242 const off_t seek_ret = fs_handle_seek(
2243 vstream->stream_file.handle, -read_len,
2244 SEEK_CUR);
2245
2246 DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd",
2247 len, read_len);
2248 read_len = 0;
2249 len = 0;
2250 if (seek_ret < 0) {
2251 PERROR("Failed to restore metadata file position after partial read");
2252 ret = -1;
2253 goto error;
2254 }
2255 }
2256 }
2257 vstream->metadata_sent += read_len;
2258 reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
2259
2260 goto send_reply;
2261
2262 error:
2263 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2264
2265 send_reply:
2266 health_code_update();
2267 if (vstream) {
2268 pthread_mutex_unlock(&vstream->stream->lock);
2269 }
2270 ret = send_response(conn->sock, &reply, sizeof(reply));
2271 if (ret < 0) {
2272 goto end_free;
2273 }
2274 health_code_update();
2275
2276 if (len > 0) {
2277 ret = send_response(conn->sock, data, len);
2278 if (ret < 0) {
2279 goto end_free;
2280 }
2281 }
2282
2283 DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
2284 (uint64_t) be64toh(request.stream_id));
2285
2286 DBG("Metadata sent");
2287
2288 end_free:
2289 free(data);
2290 end:
2291 if (vstream) {
2292 viewer_stream_put(vstream);
2293 }
2294 return ret;
2295 }
2296
2297 /*
2298 * Create a viewer session.
2299 *
2300 * Return 0 on success or else a negative value.
2301 */
2302 static
2303 int viewer_create_session(struct relay_connection *conn)
2304 {
2305 int ret;
2306 struct lttng_viewer_create_session_response resp;
2307
2308 memset(&resp, 0, sizeof(resp));
2309 resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
2310 conn->viewer_session = viewer_session_create();
2311 if (!conn->viewer_session) {
2312 ERR("Allocation viewer session");
2313 resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
2314 goto send_reply;
2315 }
2316
2317 send_reply:
2318 health_code_update();
2319 ret = send_response(conn->sock, &resp, sizeof(resp));
2320 if (ret < 0) {
2321 goto end;
2322 }
2323 health_code_update();
2324 ret = 0;
2325
2326 end:
2327 return ret;
2328 }
2329
2330 /*
2331 * Detach a viewer session.
2332 *
2333 * Return 0 on success or else a negative value.
2334 */
2335 static
2336 int viewer_detach_session(struct relay_connection *conn)
2337 {
2338 int ret;
2339 struct lttng_viewer_detach_session_response response;
2340 struct lttng_viewer_detach_session_request request;
2341 struct relay_session *session = NULL;
2342 uint64_t viewer_session_to_close;
2343
2344 LTTNG_ASSERT(conn);
2345
2346 health_code_update();
2347
2348 /* Receive the request from the connected client. */
2349 ret = recv_request(conn->sock, &request, sizeof(request));
2350 if (ret < 0) {
2351 goto end;
2352 }
2353 viewer_session_to_close = be64toh(request.session_id);
2354
2355 if (!conn->viewer_session) {
2356 DBG("Client trying to detach before creating a live viewer session");
2357 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR);
2358 goto send_reply;
2359 }
2360
2361 health_code_update();
2362
2363 memset(&response, 0, sizeof(response));
2364 DBG("Detaching from session ID %" PRIu64, viewer_session_to_close);
2365
2366 session = session_get_by_id(be64toh(request.session_id));
2367 if (!session) {
2368 DBG("Relay session %" PRIu64 " not found",
2369 (uint64_t) be64toh(request.session_id));
2370 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
2371 goto send_reply;
2372 }
2373
2374 ret = viewer_session_is_attached(conn->viewer_session, session);
2375 if (ret != 1) {
2376 DBG("Not attached to this session");
2377 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR);
2378 goto send_reply_put;
2379 }
2380
2381 viewer_session_close_one_session(conn->viewer_session, session);
2382 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_OK);
2383 DBG("Session %" PRIu64 " detached.", viewer_session_to_close);
2384
2385 send_reply_put:
2386 session_put(session);
2387
2388 send_reply:
2389 health_code_update();
2390 ret = send_response(conn->sock, &response, sizeof(response));
2391 if (ret < 0) {
2392 goto end;
2393 }
2394 health_code_update();
2395 ret = 0;
2396
2397 end:
2398 return ret;
2399 }
2400
2401 /*
2402 * live_relay_unknown_command: send -1 if received unknown command
2403 */
2404 static
2405 void live_relay_unknown_command(struct relay_connection *conn)
2406 {
2407 struct lttcomm_relayd_generic_reply reply;
2408
2409 memset(&reply, 0, sizeof(reply));
2410 reply.ret_code = htobe32(LTTNG_ERR_UNK);
2411 (void) send_response(conn->sock, &reply, sizeof(reply));
2412 }
2413
2414 /*
2415 * Process the commands received on the control socket
2416 */
2417 static
2418 int process_control(struct lttng_viewer_cmd *recv_hdr,
2419 struct relay_connection *conn)
2420 {
2421 int ret = 0;
2422 lttng_viewer_command cmd =
2423 (lttng_viewer_command) be32toh(recv_hdr->cmd);
2424
2425 /*
2426 * Make sure we've done the version check before any command other then
2427 * a new client connection.
2428 */
2429 if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
2430 ERR("Viewer on connection %d requested %s command before version check",
2431 conn->sock->fd, lttng_viewer_command_str(cmd));
2432 ret = -1;
2433 goto end;
2434 }
2435
2436 DBG("Processing %s viewer command from connection %d",
2437 lttng_viewer_command_str(cmd), conn->sock->fd);
2438
2439 switch (cmd) {
2440 case LTTNG_VIEWER_CONNECT:
2441 ret = viewer_connect(conn);
2442 break;
2443 case LTTNG_VIEWER_LIST_SESSIONS:
2444 ret = viewer_list_sessions(conn);
2445 break;
2446 case LTTNG_VIEWER_ATTACH_SESSION:
2447 ret = viewer_attach_session(conn);
2448 break;
2449 case LTTNG_VIEWER_GET_NEXT_INDEX:
2450 ret = viewer_get_next_index(conn);
2451 break;
2452 case LTTNG_VIEWER_GET_PACKET:
2453 ret = viewer_get_packet(conn);
2454 break;
2455 case LTTNG_VIEWER_GET_METADATA:
2456 ret = viewer_get_metadata(conn);
2457 break;
2458 case LTTNG_VIEWER_GET_NEW_STREAMS:
2459 ret = viewer_get_new_streams(conn);
2460 break;
2461 case LTTNG_VIEWER_CREATE_SESSION:
2462 ret = viewer_create_session(conn);
2463 break;
2464 case LTTNG_VIEWER_DETACH_SESSION:
2465 ret = viewer_detach_session(conn);
2466 break;
2467 default:
2468 ERR("Received unknown viewer command (%u)",
2469 be32toh(recv_hdr->cmd));
2470 live_relay_unknown_command(conn);
2471 ret = -1;
2472 goto end;
2473 }
2474
2475 end:
2476 return ret;
2477 }
2478
2479 static
2480 void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
2481 {
2482 int ret;
2483
2484 (void) lttng_poll_del(events, pollfd);
2485
2486 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
2487 fd_tracker_util_close_fd, NULL);
2488 if (ret < 0) {
2489 ERR("Closing pollfd %d", pollfd);
2490 }
2491 }
2492
2493 /*
2494 * This thread does the actual work
2495 */
2496 static
2497 void *thread_worker(void *data)
2498 {
2499 int ret, err = -1;
2500 uint32_t nb_fd;
2501 struct lttng_poll_event events;
2502 struct lttng_ht *viewer_connections_ht;
2503 struct lttng_ht_iter iter;
2504 struct lttng_viewer_cmd recv_hdr;
2505 struct relay_connection *destroy_conn;
2506
2507 DBG("[thread] Live viewer relay worker started");
2508
2509 rcu_register_thread();
2510
2511 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_WORKER);
2512
2513 if (testpoint(relayd_thread_live_worker)) {
2514 goto error_testpoint;
2515 }
2516
2517 /* table of connections indexed on socket */
2518 viewer_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2519 if (!viewer_connections_ht) {
2520 goto viewer_connections_ht_error;
2521 }
2522
2523 ret = create_named_thread_poll_set(&events, 2,
2524 "Live viewer worker thread epoll");
2525 if (ret < 0) {
2526 goto error_poll_create;
2527 }
2528
2529 ret = lttng_poll_add(&events, live_conn_pipe[0], LPOLLIN | LPOLLRDHUP);
2530 if (ret < 0) {
2531 goto error;
2532 }
2533
2534 restart:
2535 while (1) {
2536 int i;
2537
2538 health_code_update();
2539
2540 /* Infinite blocking call, waiting for transmission */
2541 DBG3("Relayd live viewer worker thread polling...");
2542 health_poll_entry();
2543 ret = lttng_poll_wait(&events, -1);
2544 health_poll_exit();
2545 if (ret < 0) {
2546 /*
2547 * Restart interrupted system call.
2548 */
2549 if (errno == EINTR) {
2550 goto restart;
2551 }
2552 goto error;
2553 }
2554
2555 nb_fd = ret;
2556
2557 /*
2558 * Process control. The control connection is prioritised so we don't
2559 * starve it with high throughput tracing data on the data
2560 * connection.
2561 */
2562 for (i = 0; i < nb_fd; i++) {
2563 /* Fetch once the poll data */
2564 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
2565 int pollfd = LTTNG_POLL_GETFD(&events, i);
2566
2567 health_code_update();
2568
2569 /* Thread quit pipe has been closed. Killing thread. */
2570 ret = check_thread_quit_pipe(pollfd, revents);
2571 if (ret) {
2572 err = 0;
2573 goto exit;
2574 }
2575
2576 /* Inspect the relay conn pipe for new connection. */
2577 if (pollfd == live_conn_pipe[0]) {
2578 if (revents & LPOLLIN) {
2579 struct relay_connection *conn;
2580
2581 ret = lttng_read(live_conn_pipe[0],
2582 &conn, sizeof(conn));
2583 if (ret < 0) {
2584 goto error;
2585 }
2586 ret = lttng_poll_add(&events,
2587 conn->sock->fd,
2588 LPOLLIN | LPOLLRDHUP);
2589 if (ret) {
2590 ERR("Failed to add new live connection file descriptor to poll set");
2591 goto error;
2592 }
2593 connection_ht_add(viewer_connections_ht, conn);
2594 DBG("Connection socket %d added to poll", conn->sock->fd);
2595 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2596 ERR("Relay live pipe error");
2597 goto error;
2598 } else {
2599 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2600 goto error;
2601 }
2602 } else {
2603 /* Connection activity. */
2604 struct relay_connection *conn;
2605
2606 conn = connection_get_by_sock(viewer_connections_ht, pollfd);
2607 if (!conn) {
2608 continue;
2609 }
2610
2611 if (revents & LPOLLIN) {
2612 ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
2613 sizeof(recv_hdr), 0);
2614 if (ret <= 0) {
2615 /* Connection closed. */
2616 cleanup_connection_pollfd(&events, pollfd);
2617 /* Put "create" ownership reference. */
2618 connection_put(conn);
2619 DBG("Viewer control conn closed with %d", pollfd);
2620 } else {
2621 ret = process_control(&recv_hdr, conn);
2622 if (ret < 0) {
2623 /* Clear the session on error. */
2624 cleanup_connection_pollfd(&events, pollfd);
2625 /* Put "create" ownership reference. */
2626 connection_put(conn);
2627 DBG("Viewer connection closed with %d", pollfd);
2628 }
2629 }
2630 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2631 cleanup_connection_pollfd(&events, pollfd);
2632 /* Put "create" ownership reference. */
2633 connection_put(conn);
2634 } else {
2635 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2636 connection_put(conn);
2637 goto error;
2638 }
2639 /* Put local "get_by_sock" reference. */
2640 connection_put(conn);
2641 }
2642 }
2643 }
2644
2645 exit:
2646 error:
2647 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
2648
2649 /* Cleanup remaining connection object. */
2650 rcu_read_lock();
2651 cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter,
2652 destroy_conn,
2653 sock_n.node) {
2654 health_code_update();
2655 connection_put(destroy_conn);
2656 }
2657 rcu_read_unlock();
2658 error_poll_create:
2659 lttng_ht_destroy(viewer_connections_ht);
2660 viewer_connections_ht_error:
2661 /* Close relay conn pipes */
2662 (void) fd_tracker_util_pipe_close(the_fd_tracker, live_conn_pipe);
2663 if (err) {
2664 DBG("Viewer worker thread exited with error");
2665 }
2666 DBG("Viewer worker thread cleanup complete");
2667 error_testpoint:
2668 if (err) {
2669 health_error();
2670 ERR("Health error occurred in %s", __func__);
2671 }
2672 health_unregister(health_relayd);
2673 if (lttng_relay_stop_threads()) {
2674 ERR("Error stopping threads");
2675 }
2676 rcu_unregister_thread();
2677 return NULL;
2678 }
2679
2680 /*
2681 * Create the relay command pipe to wake thread_manage_apps.
2682 * Closed in cleanup().
2683 */
2684 static int create_conn_pipe(void)
2685 {
2686 return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
2687 "Live connection pipe", live_conn_pipe);
2688 }
2689
2690 int relayd_live_join(void)
2691 {
2692 int ret, retval = 0;
2693 void *status;
2694
2695 ret = pthread_join(live_listener_thread, &status);
2696 if (ret) {
2697 errno = ret;
2698 PERROR("pthread_join live listener");
2699 retval = -1;
2700 }
2701
2702 ret = pthread_join(live_worker_thread, &status);
2703 if (ret) {
2704 errno = ret;
2705 PERROR("pthread_join live worker");
2706 retval = -1;
2707 }
2708
2709 ret = pthread_join(live_dispatcher_thread, &status);
2710 if (ret) {
2711 errno = ret;
2712 PERROR("pthread_join live dispatcher");
2713 retval = -1;
2714 }
2715
2716 cleanup_relayd_live();
2717
2718 return retval;
2719 }
2720
2721 /*
2722 * main
2723 */
2724 int relayd_live_create(struct lttng_uri *uri)
2725 {
2726 int ret = 0, retval = 0;
2727 void *status;
2728 int is_root;
2729
2730 if (!uri) {
2731 retval = -1;
2732 goto exit_init_data;
2733 }
2734 live_uri = uri;
2735
2736 /* Check if daemon is UID = 0 */
2737 is_root = !getuid();
2738
2739 if (!is_root) {
2740 if (live_uri->port < 1024) {
2741 ERR("Need to be root to use ports < 1024");
2742 retval = -1;
2743 goto exit_init_data;
2744 }
2745 }
2746
2747 /* Setup the thread apps communication pipe. */
2748 if (create_conn_pipe()) {
2749 retval = -1;
2750 goto exit_init_data;
2751 }
2752
2753 /* Init relay command queue. */
2754 cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
2755
2756 /* Set up max poll set size */
2757 if (lttng_poll_set_max_size()) {
2758 retval = -1;
2759 goto exit_init_data;
2760 }
2761
2762 /* Setup the dispatcher thread */
2763 ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(),
2764 thread_dispatcher, (void *) NULL);
2765 if (ret) {
2766 errno = ret;
2767 PERROR("pthread_create viewer dispatcher");
2768 retval = -1;
2769 goto exit_dispatcher_thread;
2770 }
2771
2772 /* Setup the worker thread */
2773 ret = pthread_create(&live_worker_thread, default_pthread_attr(),
2774 thread_worker, NULL);
2775 if (ret) {
2776 errno = ret;
2777 PERROR("pthread_create viewer worker");
2778 retval = -1;
2779 goto exit_worker_thread;
2780 }
2781
2782 /* Setup the listener thread */
2783 ret = pthread_create(&live_listener_thread, default_pthread_attr(),
2784 thread_listener, (void *) NULL);
2785 if (ret) {
2786 errno = ret;
2787 PERROR("pthread_create viewer listener");
2788 retval = -1;
2789 goto exit_listener_thread;
2790 }
2791
2792 /*
2793 * All OK, started all threads.
2794 */
2795 return retval;
2796
2797 /*
2798 * Join on the live_listener_thread should anything be added after
2799 * the live_listener thread's creation.
2800 */
2801
2802 exit_listener_thread:
2803
2804 ret = pthread_join(live_worker_thread, &status);
2805 if (ret) {
2806 errno = ret;
2807 PERROR("pthread_join live worker");
2808 retval = -1;
2809 }
2810 exit_worker_thread:
2811
2812 ret = pthread_join(live_dispatcher_thread, &status);
2813 if (ret) {
2814 errno = ret;
2815 PERROR("pthread_join live dispatcher");
2816 retval = -1;
2817 }
2818 exit_dispatcher_thread:
2819
2820 exit_init_data:
2821 cleanup_relayd_live();
2822
2823 return retval;
2824 }
This page took 0.105182 seconds and 4 git commands to generate.