Docs: relayd: live: clarify ownership of vstream after viewer release
[lttng-tools.git] / src / bin / lttng-relayd / live.cpp
... / ...
CommitLineData
1/*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10#define _LGPL_SOURCE
11#include <fcntl.h>
12#include <getopt.h>
13#include <grp.h>
14#include <inttypes.h>
15#include <limits.h>
16#include <pthread.h>
17#include <signal.h>
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <sys/mman.h>
22#include <sys/mount.h>
23#include <sys/resource.h>
24#include <sys/socket.h>
25#include <sys/stat.h>
26#include <sys/types.h>
27#include <sys/wait.h>
28#include <unistd.h>
29#include <urcu/futex.h>
30#include <urcu/rculist.h>
31#include <urcu/uatomic.h>
32#include <string>
33
34#include <common/common.h>
35#include <common/compat/endian.h>
36#include <common/compat/poll.h>
37#include <common/compat/socket.h>
38#include <common/defaults.h>
39#include <common/fd-tracker/utils.h>
40#include <common/fs-handle.h>
41#include <common/futex.h>
42#include <common/index/index.h>
43#include <common/sessiond-comm/inet.h>
44#include <common/sessiond-comm/relayd.h>
45#include <common/sessiond-comm/sessiond-comm.h>
46#include <common/uri.h>
47#include <common/utils.h>
48#include <lttng/lttng.h>
49
50#include "cmd.h"
51#include "connection.h"
52#include "ctf-trace.h"
53#include "health-relayd.h"
54#include "live.h"
55#include "lttng-relayd.h"
56#include "session.h"
57#include "stream.h"
58#include "testpoint.h"
59#include "utils.h"
60#include "viewer-session.h"
61#include "viewer-stream.h"
62
63#define SESSION_BUF_DEFAULT_COUNT 16
64
65static struct lttng_uri *live_uri;
66
67/*
68 * This pipe is used to inform the worker thread that a command is queued and
69 * ready to be processed.
70 */
71static int live_conn_pipe[2] = { -1, -1 };
72
73/* Shared between threads */
74static int live_dispatch_thread_exit;
75
76static pthread_t live_listener_thread;
77static pthread_t live_dispatcher_thread;
78static pthread_t live_worker_thread;
79
80/*
81 * Relay command queue.
82 *
83 * The live_thread_listener and live_thread_dispatcher communicate with this
84 * queue.
85 */
86static struct relay_conn_queue viewer_conn_queue;
87
88static uint64_t last_relay_viewer_session_id;
89static pthread_mutex_t last_relay_viewer_session_id_lock =
90 PTHREAD_MUTEX_INITIALIZER;
91
92static
93const char *lttng_viewer_command_str(lttng_viewer_command cmd)
94{
95 switch (cmd) {
96 case LTTNG_VIEWER_CONNECT:
97 return "CONNECT";
98 case LTTNG_VIEWER_LIST_SESSIONS:
99 return "LIST_SESSIONS";
100 case LTTNG_VIEWER_ATTACH_SESSION:
101 return "ATTACH_SESSION";
102 case LTTNG_VIEWER_GET_NEXT_INDEX:
103 return "GET_NEXT_INDEX";
104 case LTTNG_VIEWER_GET_PACKET:
105 return "GET_PACKET";
106 case LTTNG_VIEWER_GET_METADATA:
107 return "GET_METADATA";
108 case LTTNG_VIEWER_GET_NEW_STREAMS:
109 return "GET_NEW_STREAMS";
110 case LTTNG_VIEWER_CREATE_SESSION:
111 return "CREATE_SESSION";
112 case LTTNG_VIEWER_DETACH_SESSION:
113 return "DETACH_SESSION";
114 default:
115 abort();
116 }
117}
118
119/*
120 * Cleanup the daemon
121 */
122static
123void cleanup_relayd_live(void)
124{
125 DBG("Cleaning up");
126
127 free(live_uri);
128}
129
130/*
131 * Receive a request buffer using a given socket, destination allocated buffer
132 * of length size.
133 *
134 * Return the size of the received message or else a negative value on error
135 * with errno being set by recvmsg() syscall.
136 */
137static
138ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
139{
140 ssize_t ret;
141
142 ret = sock->ops->recvmsg(sock, buf, size, 0);
143 if (ret < 0 || ret != size) {
144 if (ret == 0) {
145 /* Orderly shutdown. Not necessary to print an error. */
146 DBG("Socket %d did an orderly shutdown", sock->fd);
147 } else {
148 ERR("Relay failed to receive request.");
149 }
150 ret = -1;
151 }
152
153 return ret;
154}
155
156/*
157 * Send a response buffer using a given socket, source allocated buffer of
158 * length size.
159 *
160 * Return the size of the sent message or else a negative value on error with
161 * errno being set by sendmsg() syscall.
162 */
163static
164ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
165{
166 ssize_t ret;
167
168 ret = sock->ops->sendmsg(sock, buf, size, 0);
169 if (ret < 0) {
170 ERR("Relayd failed to send response.");
171 }
172
173 return ret;
174}
175
176/*
177 * Atomically check if new streams got added in one of the sessions attached
178 * and reset the flag to 0.
179 *
180 * Returns 1 if new streams got added, 0 if nothing changed, a negative value
181 * on error.
182 */
183static
184int check_new_streams(struct relay_connection *conn)
185{
186 struct relay_session *session;
187 unsigned long current_val;
188 int ret = 0;
189
190 if (!conn->viewer_session) {
191 goto end;
192 }
193 rcu_read_lock();
194 cds_list_for_each_entry_rcu(session,
195 &conn->viewer_session->session_list,
196 viewer_session_node) {
197 if (!session_get(session)) {
198 continue;
199 }
200 current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
201 ret = current_val;
202 session_put(session);
203 if (ret == 1) {
204 goto end;
205 }
206 }
207end:
208 rcu_read_unlock();
209 return ret;
210}
211
212/*
213 * Send viewer streams to the given socket. The ignore_sent_flag indicates if
214 * this function should ignore the sent flag or not.
215 *
216 * Return 0 on success or else a negative value.
217 */
218static
219ssize_t send_viewer_streams(struct lttcomm_sock *sock,
220 uint64_t session_id, unsigned int ignore_sent_flag)
221{
222 ssize_t ret;
223 struct lttng_ht_iter iter;
224 struct relay_viewer_stream *vstream;
225
226 rcu_read_lock();
227
228 cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
229 stream_n.node) {
230 struct ctf_trace *ctf_trace;
231 struct lttng_viewer_stream send_stream = {};
232
233 health_code_update();
234
235 if (!viewer_stream_get(vstream)) {
236 continue;
237 }
238
239 pthread_mutex_lock(&vstream->stream->lock);
240 /* Ignore if not the same session. */
241 if (vstream->stream->trace->session->id != session_id ||
242 (!ignore_sent_flag && vstream->sent_flag)) {
243 pthread_mutex_unlock(&vstream->stream->lock);
244 viewer_stream_put(vstream);
245 continue;
246 }
247
248 ctf_trace = vstream->stream->trace;
249 send_stream.id = htobe64(vstream->stream->stream_handle);
250 send_stream.ctf_trace_id = htobe64(ctf_trace->id);
251 send_stream.metadata_flag = htobe32(
252 vstream->stream->is_metadata);
253 if (lttng_strncpy(send_stream.path_name, vstream->path_name,
254 sizeof(send_stream.path_name))) {
255 pthread_mutex_unlock(&vstream->stream->lock);
256 viewer_stream_put(vstream);
257 ret = -1; /* Error. */
258 goto end_unlock;
259 }
260 if (lttng_strncpy(send_stream.channel_name,
261 vstream->channel_name,
262 sizeof(send_stream.channel_name))) {
263 pthread_mutex_unlock(&vstream->stream->lock);
264 viewer_stream_put(vstream);
265 ret = -1; /* Error. */
266 goto end_unlock;
267 }
268
269 DBG("Sending stream %" PRIu64 " to viewer",
270 vstream->stream->stream_handle);
271 vstream->sent_flag = 1;
272 pthread_mutex_unlock(&vstream->stream->lock);
273
274 ret = send_response(sock, &send_stream, sizeof(send_stream));
275 viewer_stream_put(vstream);
276 if (ret < 0) {
277 goto end_unlock;
278 }
279 }
280
281 ret = 0;
282
283end_unlock:
284 rcu_read_unlock();
285 return ret;
286}
287
288/*
289 * Create every viewer stream possible for the given session with the seek
290 * type. Three counters *can* be return which are in order the total amount of
291 * viewer stream of the session, the number of unsent stream and the number of
292 * stream created. Those counters can be NULL and thus will be ignored.
293 *
294 * session must be locked to ensure that we see either none or all initial
295 * streams for a session, but no intermediate state..
296 *
297 * Return 0 on success or else a negative value.
298 */
299static int make_viewer_streams(struct relay_session *relay_session,
300 struct relay_viewer_session *viewer_session,
301 enum lttng_viewer_seek seek_t,
302 uint32_t *nb_total,
303 uint32_t *nb_unsent,
304 uint32_t *nb_created,
305 bool *closed)
306{
307 int ret;
308 struct lttng_ht_iter iter;
309 struct ctf_trace *ctf_trace;
310 struct relay_stream *relay_stream = NULL;
311
312 LTTNG_ASSERT(relay_session);
313 ASSERT_LOCKED(relay_session->lock);
314
315 if (relay_session->connection_closed) {
316 *closed = true;
317 }
318
319 /*
320 * Create viewer streams for relay streams that are ready to be
321 * used for a the given session id only.
322 */
323 rcu_read_lock();
324 cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter,
325 ctf_trace, node.node) {
326 bool trace_has_metadata_stream = false;
327
328 health_code_update();
329
330 if (!ctf_trace_get(ctf_trace)) {
331 continue;
332 }
333
334 /*
335 * Iterate over all the streams of the trace to see if we have a
336 * metadata stream.
337 */
338 cds_list_for_each_entry_rcu(relay_stream,
339 &ctf_trace->stream_list, stream_node)
340 {
341 bool is_metadata_stream;
342
343 pthread_mutex_lock(&relay_stream->lock);
344 is_metadata_stream = relay_stream->is_metadata;
345 pthread_mutex_unlock(&relay_stream->lock);
346
347 if (is_metadata_stream) {
348 trace_has_metadata_stream = true;
349 break;
350 }
351 }
352
353 relay_stream = NULL;
354
355 /*
356 * If there is no metadata stream in this trace at the moment
357 * and we never sent one to the viewer, skip the trace. We
358 * accept that the viewer will not see this trace at all.
359 */
360 if (!trace_has_metadata_stream &&
361 !ctf_trace->metadata_stream_sent_to_viewer) {
362 ctf_trace_put(ctf_trace);
363 continue;
364 }
365
366 cds_list_for_each_entry_rcu(relay_stream,
367 &ctf_trace->stream_list, stream_node)
368 {
369 struct relay_viewer_stream *viewer_stream;
370
371 if (!stream_get(relay_stream)) {
372 continue;
373 }
374
375 pthread_mutex_lock(&relay_stream->lock);
376 /*
377 * stream published is protected by the session lock.
378 */
379 if (!relay_stream->published) {
380 goto next;
381 }
382 viewer_stream = viewer_stream_get_by_id(
383 relay_stream->stream_handle);
384 if (!viewer_stream) {
385 struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL;
386
387 /*
388 * Save that we sent the metadata stream to the
389 * viewer. So that we know what trace the viewer
390 * is aware of.
391 */
392 if (relay_stream->is_metadata) {
393 ctf_trace->metadata_stream_sent_to_viewer = true;
394 }
395
396 /*
397 * If a rotation is ongoing, use a copy of the
398 * relay stream's chunk to ensure the stream
399 * files exist.
400 *
401 * Otherwise, the viewer session's current trace
402 * chunk can be used safely.
403 */
404 if ((relay_stream->ongoing_rotation.is_set ||
405 relay_session->ongoing_rotation) &&
406 relay_stream->trace_chunk) {
407 viewer_stream_trace_chunk = lttng_trace_chunk_copy(
408 relay_stream->trace_chunk);
409 if (!viewer_stream_trace_chunk) {
410 ret = -1;
411 ctf_trace_put(ctf_trace);
412 goto error_unlock;
413 }
414 } else {
415 /*
416 * Transition the viewer session into the newest trace chunk available.
417 */
418 if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk,
419 relay_stream->trace_chunk)) {
420
421 ret = viewer_session_set_trace_chunk_copy(
422 viewer_session,
423 relay_stream->trace_chunk);
424 if (ret) {
425 ret = -1;
426 ctf_trace_put(ctf_trace);
427 goto error_unlock;
428 }
429 }
430
431 if (relay_stream->trace_chunk) {
432 /*
433 * If the corresponding relay
434 * stream's trace chunk is set,
435 * the viewer stream will be
436 * created under it.
437 *
438 * Note that a relay stream can
439 * have a NULL output trace
440 * chunk (for instance, after a
441 * clear against a stopped
442 * session).
443 */
444 const bool reference_acquired = lttng_trace_chunk_get(
445 viewer_session->current_trace_chunk);
446
447 LTTNG_ASSERT(reference_acquired);
448 viewer_stream_trace_chunk =
449 viewer_session->current_trace_chunk;
450 }
451 }
452
453 viewer_stream = viewer_stream_create(
454 relay_stream,
455 viewer_stream_trace_chunk,
456 seek_t);
457 lttng_trace_chunk_put(viewer_stream_trace_chunk);
458 viewer_stream_trace_chunk = NULL;
459 if (!viewer_stream) {
460 ret = -1;
461 ctf_trace_put(ctf_trace);
462 goto error_unlock;
463 }
464
465 if (nb_created) {
466 /* Update number of created stream counter. */
467 (*nb_created)++;
468 }
469 /*
470 * Ensure a self-reference is preserved even
471 * after we have put our local reference.
472 */
473 if (!viewer_stream_get(viewer_stream)) {
474 ERR("Unable to get self-reference on viewer stream, logic error.");
475 abort();
476 }
477 } else {
478 if (!viewer_stream->sent_flag && nb_unsent) {
479 /* Update number of unsent stream counter. */
480 (*nb_unsent)++;
481 }
482 }
483 /* Update number of total stream counter. */
484 if (nb_total) {
485 if (relay_stream->is_metadata) {
486 if (!relay_stream->closed ||
487 relay_stream->metadata_received >
488 viewer_stream->metadata_sent) {
489 (*nb_total)++;
490 }
491 } else {
492 if (!relay_stream->closed ||
493 !(((int64_t)(relay_stream->prev_data_seq -
494 relay_stream->last_net_seq_num)) >=
495 0)) {
496 (*nb_total)++;
497 }
498 }
499 }
500 /* Put local reference. */
501 viewer_stream_put(viewer_stream);
502 next:
503 pthread_mutex_unlock(&relay_stream->lock);
504 stream_put(relay_stream);
505 }
506 relay_stream = NULL;
507 ctf_trace_put(ctf_trace);
508 }
509
510 ret = 0;
511
512error_unlock:
513 rcu_read_unlock();
514
515 if (relay_stream) {
516 pthread_mutex_unlock(&relay_stream->lock);
517 stream_put(relay_stream);
518 }
519
520 return ret;
521}
522
523int relayd_live_stop(void)
524{
525 /* Stop dispatch thread */
526 CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
527 futex_nto1_wake(&viewer_conn_queue.futex);
528 return 0;
529}
530
531/*
532 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
533 */
534static
535int create_named_thread_poll_set(struct lttng_poll_event *events,
536 int size, const char *name)
537{
538 int ret;
539
540 if (events == NULL || size == 0) {
541 ret = -1;
542 goto error;
543 }
544
545 ret = fd_tracker_util_poll_create(the_fd_tracker,
546 name, events, 1, LTTNG_CLOEXEC);
547 if (ret) {
548 PERROR("Failed to create \"%s\" poll file descriptor", name);
549 goto error;
550 }
551
552 /* Add quit pipe */
553 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
554 if (ret < 0) {
555 goto error;
556 }
557
558 return 0;
559
560error:
561 return ret;
562}
563
564/*
565 * Check if the thread quit pipe was triggered.
566 *
567 * Return 1 if it was triggered else 0;
568 */
569static
570int check_thread_quit_pipe(int fd, uint32_t events)
571{
572 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
573 return 1;
574 }
575
576 return 0;
577}
578
579static
580int create_sock(void *data, int *out_fd)
581{
582 int ret;
583 struct lttcomm_sock *sock = (lttcomm_sock *) data;
584
585 ret = lttcomm_create_sock(sock);
586 if (ret < 0) {
587 goto end;
588 }
589
590 *out_fd = sock->fd;
591end:
592 return ret;
593}
594
595static
596int close_sock(void *data, int *in_fd)
597{
598 struct lttcomm_sock *sock = (lttcomm_sock *) data;
599
600 return sock->ops->close(sock);
601}
602
603static int accept_sock(void *data, int *out_fd)
604{
605 int ret = 0;
606 /* Socks is an array of in_sock, out_sock. */
607 struct lttcomm_sock **socks = (lttcomm_sock **) data;
608 struct lttcomm_sock *in_sock = socks[0];
609
610 socks[1] = in_sock->ops->accept(in_sock);
611 if (!socks[1]) {
612 ret = -1;
613 goto end;
614 }
615 *out_fd = socks[1]->fd;
616end:
617 return ret;
618}
619
620static
621struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
622 const char *name)
623{
624 int out_fd, ret;
625 struct lttcomm_sock *socks[2] = { listening_sock, NULL };
626 struct lttcomm_sock *new_sock = NULL;
627
628 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd,
629 (const char **) &name, 1, accept_sock, &socks);
630 if (ret) {
631 goto end;
632 }
633 new_sock = socks[1];
634 DBG("%s accepted, socket %d", name, new_sock->fd);
635end:
636 return new_sock;
637}
638
639/*
640 * Create and init socket from uri.
641 */
642static
643struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
644{
645 int ret, sock_fd;
646 struct lttcomm_sock *sock = NULL;
647 char uri_str[LTTNG_PATH_MAX];
648 char *formated_name = NULL;
649
650 sock = lttcomm_alloc_sock_from_uri(uri);
651 if (sock == NULL) {
652 ERR("Allocating socket");
653 goto error;
654 }
655
656 /*
657 * Don't fail to create the socket if the name can't be built as it is
658 * only used for debugging purposes.
659 */
660 ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
661 uri_str[sizeof(uri_str) - 1] = '\0';
662 if (ret >= 0) {
663 ret = asprintf(&formated_name, "%s socket @ %s", name,
664 uri_str);
665 if (ret < 0) {
666 formated_name = NULL;
667 }
668 }
669
670 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
671 (const char **) (formated_name ? &formated_name : NULL),
672 1, create_sock, sock);
673 if (ret) {
674 PERROR("Failed to create \"%s\" socket",
675 formated_name ?: "Unknown");
676 goto error;
677 }
678 DBG("Listening on %s socket %d", name, sock->fd);
679
680 ret = sock->ops->bind(sock);
681 if (ret < 0) {
682 PERROR("Failed to bind lttng-live socket");
683 goto error;
684 }
685
686 ret = sock->ops->listen(sock, -1);
687 if (ret < 0) {
688 goto error;
689
690 }
691
692 free(formated_name);
693 return sock;
694
695error:
696 if (sock) {
697 lttcomm_destroy_sock(sock);
698 }
699 free(formated_name);
700 return NULL;
701}
702
703/*
704 * This thread manages the listening for new connections on the network
705 */
706static
707void *thread_listener(void *data)
708{
709 int i, ret, pollfd, err = -1;
710 uint32_t revents, nb_fd;
711 struct lttng_poll_event events;
712 struct lttcomm_sock *live_control_sock;
713
714 DBG("[thread] Relay live listener started");
715
716 rcu_register_thread();
717 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_LISTENER);
718
719 health_code_update();
720
721 live_control_sock = init_socket(live_uri, "Live listener");
722 if (!live_control_sock) {
723 goto error_sock_control;
724 }
725
726 /* Pass 2 as size here for the thread quit pipe and control sockets. */
727 ret = create_named_thread_poll_set(&events, 2,
728 "Live listener thread epoll");
729 if (ret < 0) {
730 goto error_create_poll;
731 }
732
733 /* Add the control socket */
734 ret = lttng_poll_add(&events, live_control_sock->fd, LPOLLIN | LPOLLRDHUP);
735 if (ret < 0) {
736 goto error_poll_add;
737 }
738
739 lttng_relay_notify_ready();
740
741 if (testpoint(relayd_thread_live_listener)) {
742 goto error_testpoint;
743 }
744
745 while (1) {
746 health_code_update();
747
748 DBG("Listener accepting live viewers connections");
749
750restart:
751 health_poll_entry();
752 ret = lttng_poll_wait(&events, -1);
753 health_poll_exit();
754 if (ret < 0) {
755 /*
756 * Restart interrupted system call.
757 */
758 if (errno == EINTR) {
759 goto restart;
760 }
761 goto error;
762 }
763 nb_fd = ret;
764
765 DBG("Relay new viewer connection received");
766 for (i = 0; i < nb_fd; i++) {
767 health_code_update();
768
769 /* Fetch once the poll data */
770 revents = LTTNG_POLL_GETEV(&events, i);
771 pollfd = LTTNG_POLL_GETFD(&events, i);
772
773 /* Thread quit pipe has been closed. Killing thread. */
774 ret = check_thread_quit_pipe(pollfd, revents);
775 if (ret) {
776 err = 0;
777 goto exit;
778 }
779
780 if (revents & LPOLLIN) {
781 /*
782 * A new connection is requested, therefore a
783 * viewer connection is allocated in this
784 * thread, enqueued to a global queue and
785 * dequeued (and freed) in the worker thread.
786 */
787 int val = 1;
788 struct relay_connection *new_conn;
789 struct lttcomm_sock *newsock;
790
791 newsock = accept_live_sock(live_control_sock,
792 "Live socket to client");
793 if (!newsock) {
794 PERROR("accepting control sock");
795 goto error;
796 }
797 DBG("Relay viewer connection accepted socket %d", newsock->fd);
798
799 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
800 sizeof(val));
801 if (ret < 0) {
802 PERROR("setsockopt inet");
803 lttcomm_destroy_sock(newsock);
804 goto error;
805 }
806 new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
807 if (!new_conn) {
808 lttcomm_destroy_sock(newsock);
809 goto error;
810 }
811 /* Ownership assumed by the connection. */
812 newsock = NULL;
813
814 /* Enqueue request for the dispatcher thread. */
815 cds_wfcq_head_ptr_t head;
816 head.h = &viewer_conn_queue.head;
817 cds_wfcq_enqueue(head, &viewer_conn_queue.tail,
818 &new_conn->qnode);
819
820 /*
821 * Wake the dispatch queue futex.
822 * Implicit memory barrier with the
823 * exchange in cds_wfcq_enqueue.
824 */
825 futex_nto1_wake(&viewer_conn_queue.futex);
826 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
827 ERR("socket poll error");
828 goto error;
829 } else {
830 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
831 goto error;
832 }
833 }
834 }
835
836exit:
837error:
838error_poll_add:
839error_testpoint:
840 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
841error_create_poll:
842 if (live_control_sock->fd >= 0) {
843 int sock_fd = live_control_sock->fd;
844
845 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
846 &sock_fd, 1, close_sock,
847 live_control_sock);
848 if (ret) {
849 PERROR("close");
850 }
851 live_control_sock->fd = -1;
852 }
853 lttcomm_destroy_sock(live_control_sock);
854error_sock_control:
855 if (err) {
856 health_error();
857 DBG("Live viewer listener thread exited with error");
858 }
859 health_unregister(health_relayd);
860 rcu_unregister_thread();
861 DBG("Live viewer listener thread cleanup complete");
862 if (lttng_relay_stop_threads()) {
863 ERR("Error stopping threads");
864 }
865 return NULL;
866}
867
868/*
869 * This thread manages the dispatching of the requests to worker threads
870 */
871static
872void *thread_dispatcher(void *data)
873{
874 int err = -1;
875 ssize_t ret;
876 struct cds_wfcq_node *node;
877 struct relay_connection *conn = NULL;
878
879 DBG("[thread] Live viewer relay dispatcher started");
880
881 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_DISPATCHER);
882
883 if (testpoint(relayd_thread_live_dispatcher)) {
884 goto error_testpoint;
885 }
886
887 health_code_update();
888
889 for (;;) {
890 health_code_update();
891
892 /* Atomically prepare the queue futex */
893 futex_nto1_prepare(&viewer_conn_queue.futex);
894
895 if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
896 break;
897 }
898
899 do {
900 health_code_update();
901
902 /* Dequeue commands */
903 node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
904 &viewer_conn_queue.tail);
905 if (node == NULL) {
906 DBG("Woken up but nothing in the live-viewer "
907 "relay command queue");
908 /* Continue thread execution */
909 break;
910 }
911 conn = caa_container_of(node, struct relay_connection, qnode);
912 DBG("Dispatching viewer request waiting on sock %d",
913 conn->sock->fd);
914
915 /*
916 * Inform worker thread of the new request. This
917 * call is blocking so we can be assured that
918 * the data will be read at some point in time
919 * or wait to the end of the world :)
920 */
921 ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
922 if (ret < 0) {
923 PERROR("write conn pipe");
924 connection_put(conn);
925 goto error;
926 }
927 } while (node != NULL);
928
929 /* Futex wait on queue. Blocking call on futex() */
930 health_poll_entry();
931 futex_nto1_wait(&viewer_conn_queue.futex);
932 health_poll_exit();
933 }
934
935 /* Normal exit, no error */
936 err = 0;
937
938error:
939error_testpoint:
940 if (err) {
941 health_error();
942 ERR("Health error occurred in %s", __func__);
943 }
944 health_unregister(health_relayd);
945 DBG("Live viewer dispatch thread dying");
946 if (lttng_relay_stop_threads()) {
947 ERR("Error stopping threads");
948 }
949 return NULL;
950}
951
952/*
953 * Establish connection with the viewer and check the versions.
954 *
955 * Return 0 on success or else negative value.
956 */
957static
958int viewer_connect(struct relay_connection *conn)
959{
960 int ret;
961 struct lttng_viewer_connect reply, msg;
962
963 conn->version_check_done = 1;
964
965 health_code_update();
966
967 ret = recv_request(conn->sock, &msg, sizeof(msg));
968 if (ret < 0) {
969 goto end;
970 }
971
972 health_code_update();
973
974 memset(&reply, 0, sizeof(reply));
975 reply.major = RELAYD_VERSION_COMM_MAJOR;
976 reply.minor = RELAYD_VERSION_COMM_MINOR;
977
978 /* Major versions must be the same */
979 if (reply.major != be32toh(msg.major)) {
980 DBG("Incompatible major versions ([relayd] %u vs [client] %u)",
981 reply.major, be32toh(msg.major));
982 ret = -1;
983 goto end;
984 }
985
986 conn->major = reply.major;
987 /* We adapt to the lowest compatible version */
988 if (reply.minor <= be32toh(msg.minor)) {
989 conn->minor = reply.minor;
990 } else {
991 conn->minor = be32toh(msg.minor);
992 }
993
994 if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_COMMAND) {
995 conn->type = RELAY_VIEWER_COMMAND;
996 } else if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_NOTIFICATION) {
997 conn->type = RELAY_VIEWER_NOTIFICATION;
998 } else {
999 ERR("Unknown connection type : %u", be32toh(msg.type));
1000 ret = -1;
1001 goto end;
1002 }
1003
1004 reply.major = htobe32(reply.major);
1005 reply.minor = htobe32(reply.minor);
1006 if (conn->type == RELAY_VIEWER_COMMAND) {
1007 /*
1008 * Increment outside of htobe64 macro, because the argument can
1009 * be used more than once within the macro, and thus the
1010 * operation may be undefined.
1011 */
1012 pthread_mutex_lock(&last_relay_viewer_session_id_lock);
1013 last_relay_viewer_session_id++;
1014 pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
1015 reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
1016 }
1017
1018 health_code_update();
1019
1020 ret = send_response(conn->sock, &reply, sizeof(reply));
1021 if (ret < 0) {
1022 goto end;
1023 }
1024
1025 health_code_update();
1026
1027 DBG("Version check done using protocol %u.%u", conn->major, conn->minor);
1028 ret = 0;
1029
1030end:
1031 return ret;
1032}
1033
1034/*
1035 * Send the viewer the list of current sessions.
1036 * We need to create a copy of the hash table content because otherwise
1037 * we cannot assume the number of entries stays the same between getting
1038 * the number of HT elements and iteration over the HT.
1039 *
1040 * Return 0 on success or else a negative value.
1041 */
1042static
1043int viewer_list_sessions(struct relay_connection *conn)
1044{
1045 int ret = 0;
1046 struct lttng_viewer_list_sessions session_list;
1047 struct lttng_ht_iter iter;
1048 struct relay_session *session;
1049 struct lttng_viewer_session *send_session_buf = NULL;
1050 uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
1051 uint32_t count = 0;
1052
1053 send_session_buf = (lttng_viewer_session *) zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
1054 if (!send_session_buf) {
1055 return -1;
1056 }
1057
1058 rcu_read_lock();
1059 cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
1060 session_n.node) {
1061 struct lttng_viewer_session *send_session;
1062
1063 health_code_update();
1064
1065 pthread_mutex_lock(&session->lock);
1066 if (session->connection_closed) {
1067 /* Skip closed session */
1068 goto next_session;
1069 }
1070
1071 if (count >= buf_count) {
1072 struct lttng_viewer_session *newbuf;
1073 uint32_t new_buf_count = buf_count << 1;
1074
1075 newbuf = (lttng_viewer_session *) realloc(send_session_buf,
1076 new_buf_count * sizeof(*send_session_buf));
1077 if (!newbuf) {
1078 ret = -1;
1079 goto break_loop;
1080 }
1081 send_session_buf = newbuf;
1082 buf_count = new_buf_count;
1083 }
1084 send_session = &send_session_buf[count];
1085 if (lttng_strncpy(send_session->session_name,
1086 session->session_name,
1087 sizeof(send_session->session_name))) {
1088 ret = -1;
1089 goto break_loop;
1090 }
1091 if (lttng_strncpy(send_session->hostname, session->hostname,
1092 sizeof(send_session->hostname))) {
1093 ret = -1;
1094 goto break_loop;
1095 }
1096 send_session->id = htobe64(session->id);
1097 send_session->live_timer = htobe32(session->live_timer);
1098 if (session->viewer_attached) {
1099 send_session->clients = htobe32(1);
1100 } else {
1101 send_session->clients = htobe32(0);
1102 }
1103 send_session->streams = htobe32(session->stream_count);
1104 count++;
1105 next_session:
1106 pthread_mutex_unlock(&session->lock);
1107 continue;
1108 break_loop:
1109 pthread_mutex_unlock(&session->lock);
1110 break;
1111 }
1112 rcu_read_unlock();
1113 if (ret < 0) {
1114 goto end_free;
1115 }
1116
1117 session_list.sessions_count = htobe32(count);
1118
1119 health_code_update();
1120
1121 ret = send_response(conn->sock, &session_list, sizeof(session_list));
1122 if (ret < 0) {
1123 goto end_free;
1124 }
1125
1126 health_code_update();
1127
1128 ret = send_response(conn->sock, send_session_buf,
1129 count * sizeof(*send_session_buf));
1130 if (ret < 0) {
1131 goto end_free;
1132 }
1133 health_code_update();
1134
1135 ret = 0;
1136end_free:
1137 free(send_session_buf);
1138 return ret;
1139}
1140
1141/*
1142 * Send the viewer the list of current streams.
1143 */
1144static
1145int viewer_get_new_streams(struct relay_connection *conn)
1146{
1147 int ret, send_streams = 0;
1148 uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
1149 struct lttng_viewer_new_streams_request request;
1150 struct lttng_viewer_new_streams_response response;
1151 struct relay_session *session = NULL;
1152 uint64_t session_id;
1153 bool closed = false;
1154
1155 LTTNG_ASSERT(conn);
1156
1157 health_code_update();
1158
1159 /* Receive the request from the connected client. */
1160 ret = recv_request(conn->sock, &request, sizeof(request));
1161 if (ret < 0) {
1162 goto error;
1163 }
1164 session_id = be64toh(request.session_id);
1165
1166 health_code_update();
1167
1168 memset(&response, 0, sizeof(response));
1169
1170 session = session_get_by_id(session_id);
1171 if (!session) {
1172 DBG("Relay session %" PRIu64 " not found", session_id);
1173 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
1174 goto send_reply;
1175 }
1176
1177 if (!viewer_session_is_attached(conn->viewer_session, session)) {
1178 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
1179 goto send_reply;
1180 }
1181
1182 /*
1183 * For any new stream, create it with LTTNG_VIEWER_SEEK_BEGINNING since
1184 * that at this point the client is already attached to the session.Aany
1185 * initial stream will have been created with the seek type at attach
1186 * time (for now most readers use the LTTNG_VIEWER_SEEK_LAST on attach).
1187 * Otherwise any event happening in a new stream between the attach and
1188 * a call to viewer_get_new_streams will be "lost" (never received) from
1189 * the viewer's point of view.
1190 */
1191 pthread_mutex_lock(&session->lock);
1192 /*
1193 * If a session rotation is ongoing, do not attempt to open any
1194 * stream, because the chunk can be in an intermediate state
1195 * due to directory renaming.
1196 */
1197 if (session->ongoing_rotation) {
1198 DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
1199 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_NO_NEW);
1200 goto send_reply_unlock;
1201 }
1202 ret = make_viewer_streams(session,
1203 conn->viewer_session,
1204 LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent,
1205 &nb_created, &closed);
1206 if (ret < 0) {
1207 goto error_unlock_session;
1208 }
1209 send_streams = 1;
1210 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
1211
1212 /* Only send back the newly created streams with the unsent ones. */
1213 nb_streams = nb_created + nb_unsent;
1214 response.streams_count = htobe32(nb_streams);
1215
1216 /*
1217 * If the session is closed, HUP when there are no more streams
1218 * with data.
1219 */
1220 if (closed && nb_total == 0) {
1221 send_streams = 0;
1222 response.streams_count = 0;
1223 response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
1224 goto send_reply_unlock;
1225 }
1226send_reply_unlock:
1227 pthread_mutex_unlock(&session->lock);
1228
1229send_reply:
1230 health_code_update();
1231 ret = send_response(conn->sock, &response, sizeof(response));
1232 if (ret < 0) {
1233 goto end_put_session;
1234 }
1235 health_code_update();
1236
1237 /*
1238 * Unknown or empty session, just return gracefully, the viewer
1239 * knows what is happening.
1240 */
1241 if (!send_streams || !nb_streams) {
1242 ret = 0;
1243 goto end_put_session;
1244 }
1245
1246 /*
1247 * Send stream and *DON'T* ignore the sent flag so every viewer
1248 * streams that were not sent from that point will be sent to
1249 * the viewer.
1250 */
1251 ret = send_viewer_streams(conn->sock, session_id, 0);
1252 if (ret < 0) {
1253 goto end_put_session;
1254 }
1255
1256end_put_session:
1257 if (session) {
1258 session_put(session);
1259 }
1260error:
1261 return ret;
1262error_unlock_session:
1263 pthread_mutex_unlock(&session->lock);
1264 session_put(session);
1265 return ret;
1266}
1267
1268/*
1269 * Send the viewer the list of current sessions.
1270 */
1271static
1272int viewer_attach_session(struct relay_connection *conn)
1273{
1274 int send_streams = 0;
1275 ssize_t ret;
1276 uint32_t nb_streams = 0;
1277 enum lttng_viewer_seek seek_type;
1278 struct lttng_viewer_attach_session_request request;
1279 struct lttng_viewer_attach_session_response response;
1280 struct relay_session *session = NULL;
1281 enum lttng_viewer_attach_return_code viewer_attach_status;
1282 bool closed = false;
1283 uint64_t session_id;
1284
1285 LTTNG_ASSERT(conn);
1286
1287 health_code_update();
1288
1289 /* Receive the request from the connected client. */
1290 ret = recv_request(conn->sock, &request, sizeof(request));
1291 if (ret < 0) {
1292 goto error;
1293 }
1294
1295 session_id = be64toh(request.session_id);
1296 health_code_update();
1297
1298 memset(&response, 0, sizeof(response));
1299
1300 if (!conn->viewer_session) {
1301 DBG("Client trying to attach before creating a live viewer session");
1302 response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
1303 goto send_reply;
1304 }
1305
1306 session = session_get_by_id(session_id);
1307 if (!session) {
1308 DBG("Relay session %" PRIu64 " not found", session_id);
1309 response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
1310 goto send_reply;
1311 }
1312 DBG("Attach session ID %" PRIu64 " received", session_id);
1313
1314 pthread_mutex_lock(&session->lock);
1315 if (session->live_timer == 0) {
1316 DBG("Not live session");
1317 response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
1318 goto send_reply;
1319 }
1320
1321 send_streams = 1;
1322 viewer_attach_status = viewer_session_attach(conn->viewer_session,
1323 session);
1324 if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
1325 response.status = htobe32(viewer_attach_status);
1326 goto send_reply;
1327 }
1328
1329 switch (be32toh(request.seek)) {
1330 case LTTNG_VIEWER_SEEK_BEGINNING:
1331 case LTTNG_VIEWER_SEEK_LAST:
1332 response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
1333 seek_type = (lttng_viewer_seek) be32toh(request.seek);
1334 break;
1335 default:
1336 ERR("Wrong seek parameter");
1337 response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
1338 send_streams = 0;
1339 goto send_reply;
1340 }
1341
1342 /*
1343 * If a session rotation is ongoing, do not attempt to open any
1344 * stream, because the chunk can be in an intermediate state
1345 * due to directory renaming.
1346 */
1347 if (session->ongoing_rotation) {
1348 DBG("Relay session %" PRIu64 " rotation ongoing", session_id);
1349 send_streams = 0;
1350 goto send_reply;
1351 }
1352
1353 ret = make_viewer_streams(session,
1354 conn->viewer_session, seek_type,
1355 &nb_streams, NULL, NULL, &closed);
1356 if (ret < 0) {
1357 goto end_put_session;
1358 }
1359 pthread_mutex_unlock(&session->lock);
1360 session_put(session);
1361 session = NULL;
1362
1363 response.streams_count = htobe32(nb_streams);
1364 /*
1365 * If the session is closed when the viewer is attaching, it
1366 * means some of the streams may have been concurrently removed,
1367 * so we don't allow the viewer to attach, even if there are
1368 * streams available.
1369 */
1370 if (closed) {
1371 send_streams = 0;
1372 response.streams_count = 0;
1373 response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
1374 goto send_reply;
1375 }
1376
1377send_reply:
1378 health_code_update();
1379 ret = send_response(conn->sock, &response, sizeof(response));
1380 if (ret < 0) {
1381 goto end_put_session;
1382 }
1383 health_code_update();
1384
1385 /*
1386 * Unknown or empty session, just return gracefully, the viewer
1387 * knows what is happening.
1388 */
1389 if (!send_streams || !nb_streams) {
1390 ret = 0;
1391 goto end_put_session;
1392 }
1393
1394 /* Send stream and ignore the sent flag. */
1395 ret = send_viewer_streams(conn->sock, session_id, 1);
1396 if (ret < 0) {
1397 goto end_put_session;
1398 }
1399
1400end_put_session:
1401 if (session) {
1402 pthread_mutex_unlock(&session->lock);
1403 session_put(session);
1404 }
1405error:
1406 return ret;
1407}
1408
1409/*
1410 * Open the index file if needed for the given vstream.
1411 *
1412 * If an index file is successfully opened, the vstream will set it as its
1413 * current index file.
1414 *
1415 * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
1416 *
1417 * Called with rstream lock held.
1418 */
1419static int try_open_index(struct relay_viewer_stream *vstream,
1420 struct relay_stream *rstream)
1421{
1422 int ret = 0;
1423 const uint32_t connection_major = rstream->trace->session->major;
1424 const uint32_t connection_minor = rstream->trace->session->minor;
1425 enum lttng_trace_chunk_status chunk_status;
1426
1427 if (vstream->index_file) {
1428 goto end;
1429 }
1430
1431 /*
1432 * First time, we open the index file and at least one index is ready.
1433 */
1434 if (rstream->index_received_seqcount == 0 ||
1435 !vstream->stream_file.trace_chunk) {
1436 ret = -ENOENT;
1437 goto end;
1438 }
1439
1440 chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
1441 vstream->stream_file.trace_chunk, rstream->path_name,
1442 rstream->channel_name, rstream->tracefile_size,
1443 vstream->current_tracefile_id,
1444 lttng_to_index_major(connection_major, connection_minor),
1445 lttng_to_index_minor(connection_major, connection_minor),
1446 true, &vstream->index_file);
1447 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1448 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
1449 ret = -ENOENT;
1450 } else {
1451 ret = -1;
1452 }
1453 }
1454
1455end:
1456 return ret;
1457}
1458
1459/*
1460 * Check the status of the index for the given stream. This function
1461 * updates the index structure if needed and can put (close) the vstream
1462 * in the HUP situation.
1463 *
1464 * Return 0 means that we can proceed with the index. A value of 1 means
1465 * that the index has been updated and is ready to be sent to the
1466 * client. A negative value indicates an error that can't be handled.
1467 *
1468 * Called with rstream lock held.
1469 */
1470static int check_index_status(struct relay_viewer_stream *vstream,
1471 struct relay_stream *rstream, struct ctf_trace *trace,
1472 struct lttng_viewer_index *index)
1473{
1474 int ret;
1475
1476 DBG("Check index status: index_received_seqcount %" PRIu64 " "
1477 "index_sent_seqcount %" PRIu64 " "
1478 "for stream %" PRIu64,
1479 rstream->index_received_seqcount,
1480 vstream->index_sent_seqcount,
1481 vstream->stream->stream_handle);
1482 if ((trace->session->connection_closed || rstream->closed)
1483 && rstream->index_received_seqcount
1484 == vstream->index_sent_seqcount) {
1485 /*
1486 * Last index sent and session connection or relay
1487 * stream are closed.
1488 */
1489 index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1490 goto hup;
1491 } else if (rstream->beacon_ts_end != -1ULL &&
1492 (rstream->index_received_seqcount == 0 ||
1493 (vstream->index_sent_seqcount != 0 &&
1494 rstream->index_received_seqcount
1495 <= vstream->index_sent_seqcount))) {
1496 /*
1497 * We've received a synchronization beacon and the last index
1498 * available has been sent, the index for now is inactive.
1499 *
1500 * In this case, we have received a beacon which allows us to
1501 * inform the client of a time interval during which we can
1502 * guarantee that there are no events to read (and never will
1503 * be).
1504 *
1505 * The sent seqcount can grow higher than receive seqcount on
1506 * clear because the rotation performed by clear will push
1507 * the index_sent_seqcount ahead (see
1508 * viewer_stream_sync_tracefile_array_tail) and skip over
1509 * packet sequence numbers.
1510 */
1511 index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
1512 index->timestamp_end = htobe64(rstream->beacon_ts_end);
1513 index->stream_id = htobe64(rstream->ctf_stream_id);
1514 DBG("Check index status: inactive with beacon, for stream %" PRIu64,
1515 vstream->stream->stream_handle);
1516 goto index_ready;
1517 } else if (rstream->index_received_seqcount == 0 ||
1518 (vstream->index_sent_seqcount != 0 &&
1519 rstream->index_received_seqcount
1520 <= vstream->index_sent_seqcount)) {
1521 /*
1522 * This checks whether received <= sent seqcount. In
1523 * this case, we have not received a beacon. Therefore,
1524 * we can only ask the client to retry later.
1525 *
1526 * The sent seqcount can grow higher than receive seqcount on
1527 * clear because the rotation performed by clear will push
1528 * the index_sent_seqcount ahead (see
1529 * viewer_stream_sync_tracefile_array_tail) and skip over
1530 * packet sequence numbers.
1531 */
1532 index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1533 DBG("Check index status: retry for stream %" PRIu64,
1534 vstream->stream->stream_handle);
1535 goto index_ready;
1536 } else if (!tracefile_array_seq_in_file(rstream->tfa,
1537 vstream->current_tracefile_id,
1538 vstream->index_sent_seqcount)) {
1539 /*
1540 * The next index we want to send cannot be read either
1541 * because we need to perform a rotation, or due to
1542 * the producer having overwritten its trace file.
1543 */
1544 DBG("Viewer stream %" PRIu64 " rotation",
1545 vstream->stream->stream_handle);
1546 ret = viewer_stream_rotate(vstream);
1547 if (ret == 1) {
1548 /* EOF across entire stream. */
1549 index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1550 goto hup;
1551 }
1552 /*
1553 * If we have been pushed due to overwrite, it
1554 * necessarily means there is data that can be read in
1555 * the stream. If we rotated because we reached the end
1556 * of a tracefile, it means the following tracefile
1557 * needs to contain at least one index, else we would
1558 * have already returned LTTNG_VIEWER_INDEX_RETRY to the
1559 * viewer. The updated index_sent_seqcount needs to
1560 * point to a readable index entry now.
1561 *
1562 * In the case where we "rotate" on a single file, we
1563 * can end up in a case where the requested index is
1564 * still unavailable.
1565 */
1566 if (rstream->tracefile_count == 1 &&
1567 !tracefile_array_seq_in_file(
1568 rstream->tfa,
1569 vstream->current_tracefile_id,
1570 vstream->index_sent_seqcount)) {
1571 index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1572 DBG("Check index status: retry: "
1573 "tracefile array sequence number %" PRIu64
1574 " not in file for stream %" PRIu64,
1575 vstream->index_sent_seqcount,
1576 vstream->stream->stream_handle);
1577 goto index_ready;
1578 }
1579 LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa,
1580 vstream->current_tracefile_id,
1581 vstream->index_sent_seqcount));
1582 }
1583 /* ret == 0 means successful so we continue. */
1584 ret = 0;
1585 return ret;
1586
1587hup:
1588 viewer_stream_put(vstream);
1589index_ready:
1590 return 1;
1591}
1592
1593static
1594void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream,
1595 struct lttng_trace_chunk *new_trace_chunk)
1596{
1597 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
1598
1599 if (new_trace_chunk) {
1600 const bool acquired_reference = lttng_trace_chunk_get(
1601 new_trace_chunk);
1602
1603 LTTNG_ASSERT(acquired_reference);
1604 }
1605
1606 vstream->stream_file.trace_chunk = new_trace_chunk;
1607 viewer_stream_sync_tracefile_array_tail(vstream);
1608 viewer_stream_close_files(vstream);
1609}
1610
1611/*
1612 * Send the next index for a stream.
1613 *
1614 * Return 0 on success or else a negative value.
1615 */
1616static
1617int viewer_get_next_index(struct relay_connection *conn)
1618{
1619 int ret;
1620 struct lttng_viewer_get_next_index request_index;
1621 struct lttng_viewer_index viewer_index;
1622 struct ctf_packet_index packet_index;
1623 struct relay_viewer_stream *vstream = NULL;
1624 struct relay_stream *rstream = NULL;
1625 struct ctf_trace *ctf_trace = NULL;
1626 struct relay_viewer_stream *metadata_viewer_stream = NULL;
1627 bool viewer_stream_and_session_in_same_chunk, viewer_stream_one_rotation_behind;
1628 uint64_t stream_file_chunk_id = -1ULL, viewer_session_chunk_id = -1ULL;
1629 enum lttng_trace_chunk_status status;
1630
1631 LTTNG_ASSERT(conn);
1632
1633 memset(&viewer_index, 0, sizeof(viewer_index));
1634 health_code_update();
1635
1636 ret = recv_request(conn->sock, &request_index, sizeof(request_index));
1637 if (ret < 0) {
1638 goto end;
1639 }
1640 health_code_update();
1641
1642 vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
1643 if (!vstream) {
1644 DBG("Client requested index of unknown stream id %" PRIu64,
1645 (uint64_t) be64toh(request_index.stream_id));
1646 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1647 goto send_reply;
1648 }
1649
1650 /* Use back. ref. Protected by refcounts. */
1651 rstream = vstream->stream;
1652 ctf_trace = rstream->trace;
1653
1654 /* metadata_viewer_stream may be NULL. */
1655 metadata_viewer_stream =
1656 ctf_trace_get_viewer_metadata_stream(ctf_trace);
1657
1658 /*
1659 * Hold the session lock to protect against concurrent changes
1660 * to the chunk files (e.g. rename done by clear), which are
1661 * protected by the session ongoing rotation state. Those are
1662 * synchronized with the session lock.
1663 */
1664 pthread_mutex_lock(&rstream->trace->session->lock);
1665 pthread_mutex_lock(&rstream->lock);
1666
1667 /*
1668 * The viewer should not ask for index on metadata stream.
1669 */
1670 if (rstream->is_metadata) {
1671 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1672 goto send_reply;
1673 }
1674
1675 if (rstream->ongoing_rotation.is_set) {
1676 /* Rotation is ongoing, try again later. */
1677 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1678 goto send_reply;
1679 }
1680
1681 if (rstream->trace->session->ongoing_rotation) {
1682 /* Rotation is ongoing, try again later. */
1683 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1684 goto send_reply;
1685 }
1686
1687 /*
1688 * Transition the viewer session into the newest trace chunk available.
1689 */
1690 if (!lttng_trace_chunk_ids_equal(
1691 conn->viewer_session->current_trace_chunk,
1692 rstream->trace_chunk)) {
1693 DBG("Relay stream and viewer chunk ids differ");
1694
1695 ret = viewer_session_set_trace_chunk_copy(
1696 conn->viewer_session,
1697 rstream->trace_chunk);
1698 if (ret) {
1699 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1700 goto send_reply;
1701 }
1702 }
1703
1704 /*
1705 * Transition the viewer stream into the latest trace chunk available.
1706 *
1707 * Note that the stream must _not_ rotate in one precise condition:
1708 * the relay stream has rotated to a NULL trace chunk and the viewer
1709 * stream is consuming the trace chunk that was active just before
1710 * that rotation to NULL.
1711 *
1712 * This allows clients to consume all the packets of a trace chunk
1713 * after a session's destruction.
1714 */
1715 if (vstream->stream_file.trace_chunk) {
1716 status = lttng_trace_chunk_get_id(
1717 vstream->stream_file.trace_chunk,
1718 &stream_file_chunk_id);
1719 LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
1720 }
1721 if (conn->viewer_session->current_trace_chunk) {
1722 status = lttng_trace_chunk_get_id(
1723 conn->viewer_session->current_trace_chunk,
1724 &viewer_session_chunk_id);
1725 LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK);
1726 }
1727
1728 viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal(
1729 conn->viewer_session->current_trace_chunk,
1730 vstream->stream_file.trace_chunk);
1731 viewer_stream_one_rotation_behind = rstream->completed_rotation_count ==
1732 vstream->last_seen_rotation_count + 1;
1733
1734 if (viewer_stream_and_session_in_same_chunk) {
1735 DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate",
1736 vstream->stream_file.trace_chunk ?
1737 std::to_string(stream_file_chunk_id).c_str() :
1738 "None",
1739 conn->viewer_session->current_trace_chunk ?
1740 std::to_string(viewer_session_chunk_id).c_str() :
1741 "None");
1742 } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) {
1743 DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate",
1744 vstream->stream_file.trace_chunk ?
1745 std::to_string(stream_file_chunk_id).c_str() :
1746 "None",
1747 conn->viewer_session->current_trace_chunk ?
1748 std::to_string(viewer_session_chunk_id).c_str() :
1749 "None");
1750 } else {
1751 DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
1752 vstream->stream_file.trace_chunk ?
1753 std::to_string(stream_file_chunk_id).c_str() :
1754 "None",
1755 conn->viewer_session->current_trace_chunk ?
1756 std::to_string(viewer_session_chunk_id).c_str() :
1757 "None");
1758
1759 viewer_stream_rotate_to_trace_chunk(vstream,
1760 conn->viewer_session->current_trace_chunk);
1761 vstream->last_seen_rotation_count =
1762 rstream->completed_rotation_count;
1763 }
1764
1765 ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
1766 if (ret < 0) {
1767 goto error_put;
1768 } else if (ret == 1) {
1769 /*
1770 * We have no index to send and check_index_status has populated
1771 * viewer_index's status.
1772 */
1773 goto send_reply;
1774 }
1775 /* At this point, ret is 0 thus we will be able to read the index. */
1776 LTTNG_ASSERT(!ret);
1777
1778 /* Try to open an index if one is needed for that stream. */
1779 ret = try_open_index(vstream, rstream);
1780 if (ret == -ENOENT) {
1781 if (rstream->closed) {
1782 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1783 goto send_reply;
1784 } else {
1785 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
1786 goto send_reply;
1787 }
1788 }
1789 if (ret < 0) {
1790 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1791 goto send_reply;
1792 }
1793
1794 /*
1795 * vstream->stream_fd may be NULL if it has been closed by
1796 * tracefile rotation, or if we are at the beginning of the
1797 * stream. We open the data stream file here to protect against
1798 * overwrite caused by tracefile rotation (in association with
1799 * unlink performed before overwrite).
1800 */
1801 if (!vstream->stream_file.handle) {
1802 char file_path[LTTNG_PATH_MAX];
1803 struct fs_handle *fs_handle;
1804
1805 ret = utils_stream_file_path(rstream->path_name,
1806 rstream->channel_name, rstream->tracefile_size,
1807 vstream->current_tracefile_id, NULL, file_path,
1808 sizeof(file_path));
1809 if (ret < 0) {
1810 goto error_put;
1811 }
1812
1813 /*
1814 * It is possible the the file we are trying to open is
1815 * missing if the stream has been closed (application exits with
1816 * per-pid buffers) and a clear command has been performed.
1817 */
1818 status = lttng_trace_chunk_open_fs_handle(
1819 vstream->stream_file.trace_chunk,
1820 file_path, O_RDONLY, 0, &fs_handle, true);
1821 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1822 if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
1823 rstream->closed) {
1824 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
1825 goto send_reply;
1826 }
1827 PERROR("Failed to open trace file for viewer stream");
1828 goto error_put;
1829 }
1830 vstream->stream_file.handle = fs_handle;
1831 }
1832
1833 ret = check_new_streams(conn);
1834 if (ret < 0) {
1835 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1836 goto send_reply;
1837 } else if (ret == 1) {
1838 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
1839 }
1840
1841 ret = lttng_index_file_read(vstream->index_file, &packet_index);
1842 if (ret) {
1843 ERR("Relay error reading index file");
1844 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
1845 goto send_reply;
1846 } else {
1847 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
1848 vstream->index_sent_seqcount++;
1849 }
1850
1851 /*
1852 * Indexes are stored in big endian, no need to switch before sending.
1853 */
1854 DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
1855 rstream->stream_handle,
1856 (uint64_t) be64toh(packet_index.offset));
1857 viewer_index.offset = packet_index.offset;
1858 viewer_index.packet_size = packet_index.packet_size;
1859 viewer_index.content_size = packet_index.content_size;
1860 viewer_index.timestamp_begin = packet_index.timestamp_begin;
1861 viewer_index.timestamp_end = packet_index.timestamp_end;
1862 viewer_index.events_discarded = packet_index.events_discarded;
1863 viewer_index.stream_id = packet_index.stream_id;
1864
1865send_reply:
1866 if (rstream) {
1867 pthread_mutex_unlock(&rstream->lock);
1868 pthread_mutex_unlock(&rstream->trace->session->lock);
1869 }
1870
1871 if (metadata_viewer_stream) {
1872 pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
1873 DBG("get next index metadata check: recv %" PRIu64
1874 " sent %" PRIu64,
1875 metadata_viewer_stream->stream->metadata_received,
1876 metadata_viewer_stream->metadata_sent);
1877 if (!metadata_viewer_stream->stream->metadata_received ||
1878 metadata_viewer_stream->stream->metadata_received >
1879 metadata_viewer_stream->metadata_sent) {
1880 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
1881 }
1882 pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
1883 }
1884
1885 viewer_index.flags = htobe32(viewer_index.flags);
1886 health_code_update();
1887
1888 ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
1889 if (ret < 0) {
1890 goto end;
1891 }
1892 health_code_update();
1893
1894 if (vstream) {
1895 DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
1896 vstream->index_sent_seqcount,
1897 vstream->stream->stream_handle);
1898 }
1899end:
1900 if (metadata_viewer_stream) {
1901 viewer_stream_put(metadata_viewer_stream);
1902 }
1903 if (vstream) {
1904 viewer_stream_put(vstream);
1905 }
1906 return ret;
1907
1908error_put:
1909 pthread_mutex_unlock(&rstream->lock);
1910 pthread_mutex_unlock(&rstream->trace->session->lock);
1911 if (metadata_viewer_stream) {
1912 viewer_stream_put(metadata_viewer_stream);
1913 }
1914 viewer_stream_put(vstream);
1915 return ret;
1916}
1917
1918/*
1919 * Send the next index for a stream
1920 *
1921 * Return 0 on success or else a negative value.
1922 */
1923static
1924int viewer_get_packet(struct relay_connection *conn)
1925{
1926 int ret;
1927 off_t lseek_ret;
1928 char *reply = NULL;
1929 struct lttng_viewer_get_packet get_packet_info;
1930 struct lttng_viewer_trace_packet reply_header;
1931 struct relay_viewer_stream *vstream = NULL;
1932 uint32_t reply_size = sizeof(reply_header);
1933 uint32_t packet_data_len = 0;
1934 ssize_t read_len;
1935 uint64_t stream_id;
1936
1937 health_code_update();
1938
1939 ret = recv_request(conn->sock, &get_packet_info,
1940 sizeof(get_packet_info));
1941 if (ret < 0) {
1942 goto end;
1943 }
1944 health_code_update();
1945
1946 /* From this point on, the error label can be reached. */
1947 memset(&reply_header, 0, sizeof(reply_header));
1948 stream_id = (uint64_t) be64toh(get_packet_info.stream_id);
1949
1950 vstream = viewer_stream_get_by_id(stream_id);
1951 if (!vstream) {
1952 DBG("Client requested packet of unknown stream id %" PRIu64,
1953 stream_id);
1954 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
1955 goto send_reply_nolock;
1956 } else {
1957 packet_data_len = be32toh(get_packet_info.len);
1958 reply_size += packet_data_len;
1959 }
1960
1961 reply = (char *) zmalloc(reply_size);
1962 if (!reply) {
1963 PERROR("packet reply zmalloc");
1964 reply_size = sizeof(reply_header);
1965 goto error;
1966 }
1967
1968 pthread_mutex_lock(&vstream->stream->lock);
1969 lseek_ret = fs_handle_seek(vstream->stream_file.handle,
1970 be64toh(get_packet_info.offset), SEEK_SET);
1971 if (lseek_ret < 0) {
1972 PERROR("Failed to seek file system handle of viewer stream %" PRIu64
1973 " to offset %" PRIu64,
1974 stream_id,
1975 (uint64_t) be64toh(get_packet_info.offset));
1976 goto error;
1977 }
1978 read_len = fs_handle_read(vstream->stream_file.handle,
1979 reply + sizeof(reply_header), packet_data_len);
1980 if (read_len < packet_data_len) {
1981 PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
1982 ", offset: %" PRIu64,
1983 stream_id,
1984 (uint64_t) be64toh(get_packet_info.offset));
1985 goto error;
1986 }
1987 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
1988 reply_header.len = htobe32(packet_data_len);
1989 goto send_reply;
1990
1991error:
1992 /* No payload to send on error. */
1993 reply_size = sizeof(reply_header);
1994 reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
1995
1996send_reply:
1997 if (vstream) {
1998 pthread_mutex_unlock(&vstream->stream->lock);
1999 }
2000send_reply_nolock:
2001
2002 health_code_update();
2003
2004 if (reply) {
2005 memcpy(reply, &reply_header, sizeof(reply_header));
2006 ret = send_response(conn->sock, reply, reply_size);
2007 } else {
2008 /* No reply to send. */
2009 ret = send_response(conn->sock, &reply_header,
2010 reply_size);
2011 }
2012
2013 health_code_update();
2014 if (ret < 0) {
2015 PERROR("sendmsg of packet data failed");
2016 goto end_free;
2017 }
2018
2019 DBG("Sent %u bytes for stream %" PRIu64, reply_size, stream_id);
2020
2021end_free:
2022 free(reply);
2023end:
2024 if (vstream) {
2025 viewer_stream_put(vstream);
2026 }
2027 return ret;
2028}
2029
2030/*
2031 * Send the session's metadata
2032 *
2033 * Return 0 on success else a negative value.
2034 */
2035static
2036int viewer_get_metadata(struct relay_connection *conn)
2037{
2038 int ret = 0;
2039 int fd = -1;
2040 ssize_t read_len;
2041 uint64_t len = 0;
2042 char *data = NULL;
2043 struct lttng_viewer_get_metadata request;
2044 struct lttng_viewer_metadata_packet reply;
2045 struct relay_viewer_stream *vstream = NULL;
2046
2047 LTTNG_ASSERT(conn);
2048
2049 health_code_update();
2050
2051 ret = recv_request(conn->sock, &request, sizeof(request));
2052 if (ret < 0) {
2053 goto end;
2054 }
2055 health_code_update();
2056
2057 memset(&reply, 0, sizeof(reply));
2058
2059 vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
2060 if (!vstream) {
2061 /*
2062 * The metadata stream can be closed by a CLOSE command
2063 * just before we attach. It can also be closed by
2064 * per-pid tracing during tracing. Therefore, it is
2065 * possible that we cannot find this viewer stream.
2066 * Reply back to the client with an error if we cannot
2067 * find it.
2068 */
2069 DBG("Client requested metadata of unknown stream id %" PRIu64,
2070 (uint64_t) be64toh(request.stream_id));
2071 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2072 goto send_reply;
2073 }
2074 pthread_mutex_lock(&vstream->stream->lock);
2075 if (!vstream->stream->is_metadata) {
2076 ERR("Invalid metadata stream");
2077 goto error;
2078 }
2079
2080 if (vstream->metadata_sent >= vstream->stream->metadata_received) {
2081 /*
2082 * The live viewers expect to receive a NO_NEW_METADATA
2083 * status before a stream disappears, otherwise they abort the
2084 * entire live connection when receiving an error status.
2085 *
2086 * Clear feature resets the metadata_sent to 0 until the
2087 * same metadata is received again.
2088 */
2089 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2090 /*
2091 * The live viewer considers a closed 0 byte metadata stream as
2092 * an error.
2093 */
2094 if (vstream->metadata_sent > 0) {
2095 if (vstream->stream->closed && vstream->stream->no_new_metadata_notified) {
2096 /*
2097 * Release ownership for the viewer metadata
2098 * stream. Note that this reference is the
2099 * viewer's reference. The vstream still exists
2100 * until the end of the function as
2101 * viewer_stream_get_by_id() took a reference.
2102 */
2103 viewer_stream_put(vstream);
2104 }
2105
2106 vstream->stream->no_new_metadata_notified = true;
2107 }
2108 goto send_reply;
2109 }
2110
2111 if (vstream->stream->trace_chunk &&
2112 !lttng_trace_chunk_ids_equal(
2113 conn->viewer_session->current_trace_chunk,
2114 vstream->stream->trace_chunk)) {
2115 /* A rotation has occurred on the relay stream. */
2116 DBG("Metadata relay stream and viewer chunk ids differ");
2117
2118 ret = viewer_session_set_trace_chunk_copy(
2119 conn->viewer_session,
2120 vstream->stream->trace_chunk);
2121 if (ret) {
2122 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2123 goto send_reply;
2124 }
2125 }
2126
2127 if (conn->viewer_session->current_trace_chunk &&
2128 !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk,
2129 vstream->stream_file.trace_chunk)) {
2130 bool acquired_reference;
2131
2132 DBG("Viewer session and viewer stream chunk differ: "
2133 "vsession chunk %p vstream chunk %p",
2134 conn->viewer_session->current_trace_chunk,
2135 vstream->stream_file.trace_chunk);
2136 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
2137 acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
2138 LTTNG_ASSERT(acquired_reference);
2139 vstream->stream_file.trace_chunk =
2140 conn->viewer_session->current_trace_chunk;
2141 viewer_stream_close_files(vstream);
2142 }
2143
2144 len = vstream->stream->metadata_received - vstream->metadata_sent;
2145
2146 if (!vstream->stream_file.trace_chunk) {
2147 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2148 len = 0;
2149 goto send_reply;
2150 } else if (vstream->stream_file.trace_chunk &&
2151 !vstream->stream_file.handle && len > 0) {
2152 /*
2153 * Either this is the first time the metadata file is read, or a
2154 * rotation of the corresponding relay stream has occurred.
2155 */
2156 struct fs_handle *fs_handle;
2157 char file_path[LTTNG_PATH_MAX];
2158 enum lttng_trace_chunk_status status;
2159 struct relay_stream *rstream = vstream->stream;
2160
2161 ret = utils_stream_file_path(rstream->path_name,
2162 rstream->channel_name, rstream->tracefile_size,
2163 vstream->current_tracefile_id, NULL, file_path,
2164 sizeof(file_path));
2165 if (ret < 0) {
2166 goto error;
2167 }
2168
2169 /*
2170 * It is possible the the metadata file we are trying to open is
2171 * missing if the stream has been closed (application exits with
2172 * per-pid buffers) and a clear command has been performed.
2173 */
2174 status = lttng_trace_chunk_open_fs_handle(
2175 vstream->stream_file.trace_chunk,
2176 file_path, O_RDONLY, 0, &fs_handle, true);
2177 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
2178 if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
2179 reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
2180 len = 0;
2181 if (vstream->stream->closed) {
2182 viewer_stream_put(vstream);
2183 }
2184 goto send_reply;
2185 }
2186 PERROR("Failed to open metadata file for viewer stream");
2187 goto error;
2188 }
2189 vstream->stream_file.handle = fs_handle;
2190
2191 if (vstream->metadata_sent != 0) {
2192 /*
2193 * The client does not expect to receive any metadata
2194 * it has received and metadata files in successive
2195 * chunks must be a strict superset of one another.
2196 *
2197 * Skip the first `metadata_sent` bytes to ensure
2198 * they are not sent a second time to the client.
2199 *
2200 * Baring a block layer error or an internal error,
2201 * this seek should not fail as
2202 * `vstream->stream->metadata_received` is reset when
2203 * a relay stream is rotated. If this is reached, it is
2204 * safe to assume that
2205 * `metadata_received` > `metadata_sent`.
2206 */
2207 const off_t seek_ret = fs_handle_seek(fs_handle,
2208 vstream->metadata_sent, SEEK_SET);
2209
2210 if (seek_ret < 0) {
2211 PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64,
2212 vstream->metadata_sent);
2213 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2214 goto send_reply;
2215 }
2216 }
2217 }
2218
2219 reply.len = htobe64(len);
2220 data = (char *) zmalloc(len);
2221 if (!data) {
2222 PERROR("viewer metadata zmalloc");
2223 goto error;
2224 }
2225
2226 fd = fs_handle_get_fd(vstream->stream_file.handle);
2227 if (fd < 0) {
2228 ERR("Failed to restore viewer stream file system handle");
2229 goto error;
2230 }
2231 read_len = lttng_read(fd, data, len);
2232 fs_handle_put_fd(vstream->stream_file.handle);
2233 fd = -1;
2234 if (read_len < len) {
2235 if (read_len < 0) {
2236 PERROR("Failed to read metadata file");
2237 goto error;
2238 } else {
2239 /*
2240 * A clear has been performed which prevents the relay
2241 * from sending `len` bytes of metadata.
2242 *
2243 * It is important not to send any metadata if we
2244 * couldn't read all the available metadata in one shot:
2245 * sending partial metadata can cause the client to
2246 * attempt to parse an incomplete (incoherent) metadata
2247 * stream, which would result in an error.
2248 */
2249 const off_t seek_ret = fs_handle_seek(
2250 vstream->stream_file.handle, -read_len,
2251 SEEK_CUR);
2252
2253 DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd",
2254 len, read_len);
2255 read_len = 0;
2256 len = 0;
2257 if (seek_ret < 0) {
2258 PERROR("Failed to restore metadata file position after partial read");
2259 ret = -1;
2260 goto error;
2261 }
2262 }
2263 }
2264 vstream->metadata_sent += read_len;
2265 reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
2266
2267 goto send_reply;
2268
2269error:
2270 reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
2271
2272send_reply:
2273 health_code_update();
2274 if (vstream) {
2275 pthread_mutex_unlock(&vstream->stream->lock);
2276 }
2277 ret = send_response(conn->sock, &reply, sizeof(reply));
2278 if (ret < 0) {
2279 goto end_free;
2280 }
2281 health_code_update();
2282
2283 if (len > 0) {
2284 ret = send_response(conn->sock, data, len);
2285 if (ret < 0) {
2286 goto end_free;
2287 }
2288 }
2289
2290 DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
2291 (uint64_t) be64toh(request.stream_id));
2292
2293 DBG("Metadata sent");
2294
2295end_free:
2296 free(data);
2297end:
2298 if (vstream) {
2299 viewer_stream_put(vstream);
2300 }
2301 return ret;
2302}
2303
2304/*
2305 * Create a viewer session.
2306 *
2307 * Return 0 on success or else a negative value.
2308 */
2309static
2310int viewer_create_session(struct relay_connection *conn)
2311{
2312 int ret;
2313 struct lttng_viewer_create_session_response resp;
2314
2315 memset(&resp, 0, sizeof(resp));
2316 resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
2317 conn->viewer_session = viewer_session_create();
2318 if (!conn->viewer_session) {
2319 ERR("Allocation viewer session");
2320 resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
2321 goto send_reply;
2322 }
2323
2324send_reply:
2325 health_code_update();
2326 ret = send_response(conn->sock, &resp, sizeof(resp));
2327 if (ret < 0) {
2328 goto end;
2329 }
2330 health_code_update();
2331 ret = 0;
2332
2333end:
2334 return ret;
2335}
2336
2337/*
2338 * Detach a viewer session.
2339 *
2340 * Return 0 on success or else a negative value.
2341 */
2342static
2343int viewer_detach_session(struct relay_connection *conn)
2344{
2345 int ret;
2346 struct lttng_viewer_detach_session_response response;
2347 struct lttng_viewer_detach_session_request request;
2348 struct relay_session *session = NULL;
2349 uint64_t viewer_session_to_close;
2350
2351 LTTNG_ASSERT(conn);
2352
2353 health_code_update();
2354
2355 /* Receive the request from the connected client. */
2356 ret = recv_request(conn->sock, &request, sizeof(request));
2357 if (ret < 0) {
2358 goto end;
2359 }
2360 viewer_session_to_close = be64toh(request.session_id);
2361
2362 if (!conn->viewer_session) {
2363 DBG("Client trying to detach before creating a live viewer session");
2364 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR);
2365 goto send_reply;
2366 }
2367
2368 health_code_update();
2369
2370 memset(&response, 0, sizeof(response));
2371 DBG("Detaching from session ID %" PRIu64, viewer_session_to_close);
2372
2373 session = session_get_by_id(be64toh(request.session_id));
2374 if (!session) {
2375 DBG("Relay session %" PRIu64 " not found",
2376 (uint64_t) be64toh(request.session_id));
2377 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
2378 goto send_reply;
2379 }
2380
2381 ret = viewer_session_is_attached(conn->viewer_session, session);
2382 if (ret != 1) {
2383 DBG("Not attached to this session");
2384 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_ERR);
2385 goto send_reply_put;
2386 }
2387
2388 viewer_session_close_one_session(conn->viewer_session, session);
2389 response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_OK);
2390 DBG("Session %" PRIu64 " detached.", viewer_session_to_close);
2391
2392send_reply_put:
2393 session_put(session);
2394
2395send_reply:
2396 health_code_update();
2397 ret = send_response(conn->sock, &response, sizeof(response));
2398 if (ret < 0) {
2399 goto end;
2400 }
2401 health_code_update();
2402 ret = 0;
2403
2404end:
2405 return ret;
2406}
2407
2408/*
2409 * live_relay_unknown_command: send -1 if received unknown command
2410 */
2411static
2412void live_relay_unknown_command(struct relay_connection *conn)
2413{
2414 struct lttcomm_relayd_generic_reply reply;
2415
2416 memset(&reply, 0, sizeof(reply));
2417 reply.ret_code = htobe32(LTTNG_ERR_UNK);
2418 (void) send_response(conn->sock, &reply, sizeof(reply));
2419}
2420
2421/*
2422 * Process the commands received on the control socket
2423 */
2424static
2425int process_control(struct lttng_viewer_cmd *recv_hdr,
2426 struct relay_connection *conn)
2427{
2428 int ret = 0;
2429 lttng_viewer_command cmd =
2430 (lttng_viewer_command) be32toh(recv_hdr->cmd);
2431
2432 /*
2433 * Make sure we've done the version check before any command other then
2434 * a new client connection.
2435 */
2436 if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
2437 ERR("Viewer on connection %d requested %s command before version check",
2438 conn->sock->fd, lttng_viewer_command_str(cmd));
2439 ret = -1;
2440 goto end;
2441 }
2442
2443 DBG("Processing %s viewer command from connection %d",
2444 lttng_viewer_command_str(cmd), conn->sock->fd);
2445
2446 switch (cmd) {
2447 case LTTNG_VIEWER_CONNECT:
2448 ret = viewer_connect(conn);
2449 break;
2450 case LTTNG_VIEWER_LIST_SESSIONS:
2451 ret = viewer_list_sessions(conn);
2452 break;
2453 case LTTNG_VIEWER_ATTACH_SESSION:
2454 ret = viewer_attach_session(conn);
2455 break;
2456 case LTTNG_VIEWER_GET_NEXT_INDEX:
2457 ret = viewer_get_next_index(conn);
2458 break;
2459 case LTTNG_VIEWER_GET_PACKET:
2460 ret = viewer_get_packet(conn);
2461 break;
2462 case LTTNG_VIEWER_GET_METADATA:
2463 ret = viewer_get_metadata(conn);
2464 break;
2465 case LTTNG_VIEWER_GET_NEW_STREAMS:
2466 ret = viewer_get_new_streams(conn);
2467 break;
2468 case LTTNG_VIEWER_CREATE_SESSION:
2469 ret = viewer_create_session(conn);
2470 break;
2471 case LTTNG_VIEWER_DETACH_SESSION:
2472 ret = viewer_detach_session(conn);
2473 break;
2474 default:
2475 ERR("Received unknown viewer command (%u)",
2476 be32toh(recv_hdr->cmd));
2477 live_relay_unknown_command(conn);
2478 ret = -1;
2479 goto end;
2480 }
2481
2482end:
2483 return ret;
2484}
2485
2486static
2487void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
2488{
2489 int ret;
2490
2491 (void) lttng_poll_del(events, pollfd);
2492
2493 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
2494 fd_tracker_util_close_fd, NULL);
2495 if (ret < 0) {
2496 ERR("Closing pollfd %d", pollfd);
2497 }
2498}
2499
2500/*
2501 * This thread does the actual work
2502 */
2503static
2504void *thread_worker(void *data)
2505{
2506 int ret, err = -1;
2507 uint32_t nb_fd;
2508 struct lttng_poll_event events;
2509 struct lttng_ht *viewer_connections_ht;
2510 struct lttng_ht_iter iter;
2511 struct lttng_viewer_cmd recv_hdr;
2512 struct relay_connection *destroy_conn;
2513
2514 DBG("[thread] Live viewer relay worker started");
2515
2516 rcu_register_thread();
2517
2518 health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_WORKER);
2519
2520 if (testpoint(relayd_thread_live_worker)) {
2521 goto error_testpoint;
2522 }
2523
2524 /* table of connections indexed on socket */
2525 viewer_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2526 if (!viewer_connections_ht) {
2527 goto viewer_connections_ht_error;
2528 }
2529
2530 ret = create_named_thread_poll_set(&events, 2,
2531 "Live viewer worker thread epoll");
2532 if (ret < 0) {
2533 goto error_poll_create;
2534 }
2535
2536 ret = lttng_poll_add(&events, live_conn_pipe[0], LPOLLIN | LPOLLRDHUP);
2537 if (ret < 0) {
2538 goto error;
2539 }
2540
2541restart:
2542 while (1) {
2543 int i;
2544
2545 health_code_update();
2546
2547 /* Infinite blocking call, waiting for transmission */
2548 DBG3("Relayd live viewer worker thread polling...");
2549 health_poll_entry();
2550 ret = lttng_poll_wait(&events, -1);
2551 health_poll_exit();
2552 if (ret < 0) {
2553 /*
2554 * Restart interrupted system call.
2555 */
2556 if (errno == EINTR) {
2557 goto restart;
2558 }
2559 goto error;
2560 }
2561
2562 nb_fd = ret;
2563
2564 /*
2565 * Process control. The control connection is prioritised so we don't
2566 * starve it with high throughput tracing data on the data
2567 * connection.
2568 */
2569 for (i = 0; i < nb_fd; i++) {
2570 /* Fetch once the poll data */
2571 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
2572 int pollfd = LTTNG_POLL_GETFD(&events, i);
2573
2574 health_code_update();
2575
2576 /* Thread quit pipe has been closed. Killing thread. */
2577 ret = check_thread_quit_pipe(pollfd, revents);
2578 if (ret) {
2579 err = 0;
2580 goto exit;
2581 }
2582
2583 /* Inspect the relay conn pipe for new connection. */
2584 if (pollfd == live_conn_pipe[0]) {
2585 if (revents & LPOLLIN) {
2586 struct relay_connection *conn;
2587
2588 ret = lttng_read(live_conn_pipe[0],
2589 &conn, sizeof(conn));
2590 if (ret < 0) {
2591 goto error;
2592 }
2593 ret = lttng_poll_add(&events,
2594 conn->sock->fd,
2595 LPOLLIN | LPOLLRDHUP);
2596 if (ret) {
2597 ERR("Failed to add new live connection file descriptor to poll set");
2598 goto error;
2599 }
2600 connection_ht_add(viewer_connections_ht, conn);
2601 DBG("Connection socket %d added to poll", conn->sock->fd);
2602 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2603 ERR("Relay live pipe error");
2604 goto error;
2605 } else {
2606 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2607 goto error;
2608 }
2609 } else {
2610 /* Connection activity. */
2611 struct relay_connection *conn;
2612
2613 conn = connection_get_by_sock(viewer_connections_ht, pollfd);
2614 if (!conn) {
2615 continue;
2616 }
2617
2618 if (revents & LPOLLIN) {
2619 ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
2620 sizeof(recv_hdr), 0);
2621 if (ret <= 0) {
2622 /* Connection closed. */
2623 cleanup_connection_pollfd(&events, pollfd);
2624 /* Put "create" ownership reference. */
2625 connection_put(conn);
2626 DBG("Viewer control conn closed with %d", pollfd);
2627 } else {
2628 ret = process_control(&recv_hdr, conn);
2629 if (ret < 0) {
2630 /* Clear the session on error. */
2631 cleanup_connection_pollfd(&events, pollfd);
2632 /* Put "create" ownership reference. */
2633 connection_put(conn);
2634 DBG("Viewer connection closed with %d", pollfd);
2635 }
2636 }
2637 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2638 cleanup_connection_pollfd(&events, pollfd);
2639 /* Put "create" ownership reference. */
2640 connection_put(conn);
2641 } else {
2642 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2643 connection_put(conn);
2644 goto error;
2645 }
2646 /* Put local "get_by_sock" reference. */
2647 connection_put(conn);
2648 }
2649 }
2650 }
2651
2652exit:
2653error:
2654 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
2655
2656 /* Cleanup remaining connection object. */
2657 rcu_read_lock();
2658 cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter,
2659 destroy_conn,
2660 sock_n.node) {
2661 health_code_update();
2662 connection_put(destroy_conn);
2663 }
2664 rcu_read_unlock();
2665error_poll_create:
2666 lttng_ht_destroy(viewer_connections_ht);
2667viewer_connections_ht_error:
2668 /* Close relay conn pipes */
2669 (void) fd_tracker_util_pipe_close(the_fd_tracker, live_conn_pipe);
2670 if (err) {
2671 DBG("Viewer worker thread exited with error");
2672 }
2673 DBG("Viewer worker thread cleanup complete");
2674error_testpoint:
2675 if (err) {
2676 health_error();
2677 ERR("Health error occurred in %s", __func__);
2678 }
2679 health_unregister(health_relayd);
2680 if (lttng_relay_stop_threads()) {
2681 ERR("Error stopping threads");
2682 }
2683 rcu_unregister_thread();
2684 return NULL;
2685}
2686
2687/*
2688 * Create the relay command pipe to wake thread_manage_apps.
2689 * Closed in cleanup().
2690 */
2691static int create_conn_pipe(void)
2692{
2693 return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
2694 "Live connection pipe", live_conn_pipe);
2695}
2696
2697int relayd_live_join(void)
2698{
2699 int ret, retval = 0;
2700 void *status;
2701
2702 ret = pthread_join(live_listener_thread, &status);
2703 if (ret) {
2704 errno = ret;
2705 PERROR("pthread_join live listener");
2706 retval = -1;
2707 }
2708
2709 ret = pthread_join(live_worker_thread, &status);
2710 if (ret) {
2711 errno = ret;
2712 PERROR("pthread_join live worker");
2713 retval = -1;
2714 }
2715
2716 ret = pthread_join(live_dispatcher_thread, &status);
2717 if (ret) {
2718 errno = ret;
2719 PERROR("pthread_join live dispatcher");
2720 retval = -1;
2721 }
2722
2723 cleanup_relayd_live();
2724
2725 return retval;
2726}
2727
2728/*
2729 * main
2730 */
2731int relayd_live_create(struct lttng_uri *uri)
2732{
2733 int ret = 0, retval = 0;
2734 void *status;
2735 int is_root;
2736
2737 if (!uri) {
2738 retval = -1;
2739 goto exit_init_data;
2740 }
2741 live_uri = uri;
2742
2743 /* Check if daemon is UID = 0 */
2744 is_root = !getuid();
2745
2746 if (!is_root) {
2747 if (live_uri->port < 1024) {
2748 ERR("Need to be root to use ports < 1024");
2749 retval = -1;
2750 goto exit_init_data;
2751 }
2752 }
2753
2754 /* Setup the thread apps communication pipe. */
2755 if (create_conn_pipe()) {
2756 retval = -1;
2757 goto exit_init_data;
2758 }
2759
2760 /* Init relay command queue. */
2761 cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
2762
2763 /* Set up max poll set size */
2764 if (lttng_poll_set_max_size()) {
2765 retval = -1;
2766 goto exit_init_data;
2767 }
2768
2769 /* Setup the dispatcher thread */
2770 ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(),
2771 thread_dispatcher, (void *) NULL);
2772 if (ret) {
2773 errno = ret;
2774 PERROR("pthread_create viewer dispatcher");
2775 retval = -1;
2776 goto exit_dispatcher_thread;
2777 }
2778
2779 /* Setup the worker thread */
2780 ret = pthread_create(&live_worker_thread, default_pthread_attr(),
2781 thread_worker, NULL);
2782 if (ret) {
2783 errno = ret;
2784 PERROR("pthread_create viewer worker");
2785 retval = -1;
2786 goto exit_worker_thread;
2787 }
2788
2789 /* Setup the listener thread */
2790 ret = pthread_create(&live_listener_thread, default_pthread_attr(),
2791 thread_listener, (void *) NULL);
2792 if (ret) {
2793 errno = ret;
2794 PERROR("pthread_create viewer listener");
2795 retval = -1;
2796 goto exit_listener_thread;
2797 }
2798
2799 /*
2800 * All OK, started all threads.
2801 */
2802 return retval;
2803
2804 /*
2805 * Join on the live_listener_thread should anything be added after
2806 * the live_listener thread's creation.
2807 */
2808
2809exit_listener_thread:
2810
2811 ret = pthread_join(live_worker_thread, &status);
2812 if (ret) {
2813 errno = ret;
2814 PERROR("pthread_join live worker");
2815 retval = -1;
2816 }
2817exit_worker_thread:
2818
2819 ret = pthread_join(live_dispatcher_thread, &status);
2820 if (ret) {
2821 errno = ret;
2822 PERROR("pthread_join live dispatcher");
2823 retval = -1;
2824 }
2825exit_dispatcher_thread:
2826
2827exit_init_data:
2828 cleanup_relayd_live();
2829
2830 return retval;
2831}
This page took 0.048594 seconds and 4 git commands to generate.