Clean-up: use a define for support thread count
[lttng-tools.git] / src / bin / lttng-sessiond / main.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <getopt.h>
22 #include <grp.h>
23 #include <limits.h>
24 #include <paths.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <inttypes.h>
31 #include <sys/mman.h>
32 #include <sys/mount.h>
33 #include <sys/resource.h>
34 #include <sys/socket.h>
35 #include <sys/stat.h>
36 #include <sys/types.h>
37 #include <sys/wait.h>
38 #include <urcu/uatomic.h>
39 #include <unistd.h>
40 #include <ctype.h>
41
42 #include <common/common.h>
43 #include <common/compat/socket.h>
44 #include <common/compat/getenv.h>
45 #include <common/defaults.h>
46 #include <common/kernel-consumer/kernel-consumer.h>
47 #include <common/futex.h>
48 #include <common/relayd/relayd.h>
49 #include <common/utils.h>
50 #include <common/daemonize.h>
51 #include <common/config/session-config.h>
52
53 #include "lttng-sessiond.h"
54 #include "buffer-registry.h"
55 #include "channel.h"
56 #include "cmd.h"
57 #include "consumer.h"
58 #include "context.h"
59 #include "event.h"
60 #include "kernel.h"
61 #include "kernel-consumer.h"
62 #include "modprobe.h"
63 #include "shm.h"
64 #include "ust-ctl.h"
65 #include "ust-consumer.h"
66 #include "utils.h"
67 #include "fd-limit.h"
68 #include "health-sessiond.h"
69 #include "testpoint.h"
70 #include "ust-thread.h"
71 #include "agent-thread.h"
72 #include "save.h"
73 #include "load-session-thread.h"
74 #include "notification-thread.h"
75 #include "notification-thread-commands.h"
76 #include "rotation-thread.h"
77 #include "lttng-syscall.h"
78 #include "agent.h"
79 #include "ht-cleanup.h"
80 #include "sessiond-config.h"
81 #include "sessiond-timer.h"
82
83 static const char *help_msg =
84 #ifdef LTTNG_EMBED_HELP
85 #include <lttng-sessiond.8.h>
86 #else
87 NULL
88 #endif
89 ;
90
91 const char *progname;
92 static pid_t ppid; /* Parent PID for --sig-parent option */
93 static pid_t child_ppid; /* Internal parent PID use with daemonize. */
94 static int lockfile_fd = -1;
95
96 /* Set to 1 when a SIGUSR1 signal is received. */
97 static int recv_child_signal;
98
99 static struct lttng_kernel_tracer_version kernel_tracer_version;
100 static struct lttng_kernel_tracer_abi_version kernel_tracer_abi_version;
101
102 /*
103 * Consumer daemon specific control data. Every value not initialized here is
104 * set to 0 by the static definition.
105 */
106 static struct consumer_data kconsumer_data = {
107 .type = LTTNG_CONSUMER_KERNEL,
108 .err_sock = -1,
109 .cmd_sock = -1,
110 .channel_monitor_pipe = -1,
111 .channel_rotate_pipe = -1,
112 .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
113 .lock = PTHREAD_MUTEX_INITIALIZER,
114 .cond = PTHREAD_COND_INITIALIZER,
115 .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
116 };
117 static struct consumer_data ustconsumer64_data = {
118 .type = LTTNG_CONSUMER64_UST,
119 .err_sock = -1,
120 .cmd_sock = -1,
121 .channel_monitor_pipe = -1,
122 .channel_rotate_pipe = -1,
123 .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
124 .lock = PTHREAD_MUTEX_INITIALIZER,
125 .cond = PTHREAD_COND_INITIALIZER,
126 .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
127 };
128 static struct consumer_data ustconsumer32_data = {
129 .type = LTTNG_CONSUMER32_UST,
130 .err_sock = -1,
131 .cmd_sock = -1,
132 .channel_monitor_pipe = -1,
133 .channel_rotate_pipe = -1,
134 .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
135 .lock = PTHREAD_MUTEX_INITIALIZER,
136 .cond = PTHREAD_COND_INITIALIZER,
137 .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
138 };
139
140 /* Command line options */
141 static const struct option long_options[] = {
142 { "client-sock", required_argument, 0, 'c' },
143 { "apps-sock", required_argument, 0, 'a' },
144 { "kconsumerd-cmd-sock", required_argument, 0, '\0' },
145 { "kconsumerd-err-sock", required_argument, 0, '\0' },
146 { "ustconsumerd32-cmd-sock", required_argument, 0, '\0' },
147 { "ustconsumerd32-err-sock", required_argument, 0, '\0' },
148 { "ustconsumerd64-cmd-sock", required_argument, 0, '\0' },
149 { "ustconsumerd64-err-sock", required_argument, 0, '\0' },
150 { "consumerd32-path", required_argument, 0, '\0' },
151 { "consumerd32-libdir", required_argument, 0, '\0' },
152 { "consumerd64-path", required_argument, 0, '\0' },
153 { "consumerd64-libdir", required_argument, 0, '\0' },
154 { "daemonize", no_argument, 0, 'd' },
155 { "background", no_argument, 0, 'b' },
156 { "sig-parent", no_argument, 0, 'S' },
157 { "help", no_argument, 0, 'h' },
158 { "group", required_argument, 0, 'g' },
159 { "version", no_argument, 0, 'V' },
160 { "quiet", no_argument, 0, 'q' },
161 { "verbose", no_argument, 0, 'v' },
162 { "verbose-consumer", no_argument, 0, '\0' },
163 { "no-kernel", no_argument, 0, '\0' },
164 { "pidfile", required_argument, 0, 'p' },
165 { "agent-tcp-port", required_argument, 0, '\0' },
166 { "config", required_argument, 0, 'f' },
167 { "load", required_argument, 0, 'l' },
168 { "kmod-probes", required_argument, 0, '\0' },
169 { "extra-kmod-probes", required_argument, 0, '\0' },
170 { NULL, 0, 0, 0 }
171 };
172
173 struct sessiond_config config;
174
175 /* Command line options to ignore from configuration file */
176 static const char *config_ignore_options[] = { "help", "version", "config" };
177
178 /* Shared between threads */
179 static int dispatch_thread_exit;
180
181 /* Sockets and FDs */
182 static int client_sock = -1;
183 static int apps_sock = -1;
184 int kernel_tracer_fd = -1;
185 static int kernel_poll_pipe[2] = { -1, -1 };
186
187 /*
188 * Quit pipe for all threads. This permits a single cancellation point
189 * for all threads when receiving an event on the pipe.
190 */
191 static int thread_quit_pipe[2] = { -1, -1 };
192
193 /*
194 * This pipe is used to inform the thread managing application communication
195 * that a command is queued and ready to be processed.
196 */
197 static int apps_cmd_pipe[2] = { -1, -1 };
198
199 int apps_cmd_notify_pipe[2] = { -1, -1 };
200
201 /* Pthread, Mutexes and Semaphores */
202 static pthread_t apps_thread;
203 static pthread_t apps_notify_thread;
204 static pthread_t reg_apps_thread;
205 static pthread_t client_thread;
206 static pthread_t kernel_thread;
207 static pthread_t dispatch_thread;
208 static pthread_t health_thread;
209 static pthread_t ht_cleanup_thread;
210 static pthread_t agent_reg_thread;
211 static pthread_t load_session_thread;
212 static pthread_t notification_thread;
213 static pthread_t rotation_thread;
214 static pthread_t timer_thread;
215
216 /*
217 * UST registration command queue. This queue is tied with a futex and uses a N
218 * wakers / 1 waiter implemented and detailed in futex.c/.h
219 *
220 * The thread_registration_apps and thread_dispatch_ust_registration uses this
221 * queue along with the wait/wake scheme. The thread_manage_apps receives down
222 * the line new application socket and monitors it for any I/O error or clean
223 * close that triggers an unregistration of the application.
224 */
225 static struct ust_cmd_queue ust_cmd_queue;
226
227 /*
228 * Pointer initialized before thread creation.
229 *
230 * This points to the tracing session list containing the session count and a
231 * mutex lock. The lock MUST be taken if you iterate over the list. The lock
232 * MUST NOT be taken if you call a public function in session.c.
233 *
234 * The lock is nested inside the structure: session_list_ptr->lock. Please use
235 * session_lock_list and session_unlock_list for lock acquisition.
236 */
237 static struct ltt_session_list *session_list_ptr;
238
239 int ust_consumerd64_fd = -1;
240 int ust_consumerd32_fd = -1;
241
242 static const char *module_proc_lttng = "/proc/lttng";
243
244 /*
245 * Consumer daemon state which is changed when spawning it, killing it or in
246 * case of a fatal error.
247 */
248 enum consumerd_state {
249 CONSUMER_STARTED = 1,
250 CONSUMER_STOPPED = 2,
251 CONSUMER_ERROR = 3,
252 };
253
254 /*
255 * This consumer daemon state is used to validate if a client command will be
256 * able to reach the consumer. If not, the client is informed. For instance,
257 * doing a "lttng start" when the consumer state is set to ERROR will return an
258 * error to the client.
259 *
260 * The following example shows a possible race condition of this scheme:
261 *
262 * consumer thread error happens
263 * client cmd arrives
264 * client cmd checks state -> still OK
265 * consumer thread exit, sets error
266 * client cmd try to talk to consumer
267 * ...
268 *
269 * However, since the consumer is a different daemon, we have no way of making
270 * sure the command will reach it safely even with this state flag. This is why
271 * we consider that up to the state validation during command processing, the
272 * command is safe. After that, we can not guarantee the correctness of the
273 * client request vis-a-vis the consumer.
274 */
275 static enum consumerd_state ust_consumerd_state;
276 static enum consumerd_state kernel_consumerd_state;
277
278 /* Set in main() with the current page size. */
279 long page_size;
280
281 /* Application health monitoring */
282 struct health_app *health_sessiond;
283
284 /* Am I root or not. */
285 int is_root; /* Set to 1 if the daemon is running as root */
286
287 const char * const config_section_name = "sessiond";
288
289 /* Load session thread information to operate. */
290 struct load_session_thread_data *load_info;
291
292 /* Notification thread handle. */
293 struct notification_thread_handle *notification_thread_handle;
294
295 /* Rotation thread handle. */
296 struct rotation_thread_handle *rotation_thread_handle;
297
298 /* Global hash tables */
299 struct lttng_ht *agent_apps_ht_by_sock = NULL;
300
301 /*
302 * The initialization of the session daemon is done in multiple phases.
303 *
304 * While all threads are launched near-simultaneously, only some of them
305 * are needed to ensure the session daemon can start to respond to client
306 * requests.
307 *
308 * There are two important guarantees that we wish to offer with respect
309 * to the initialisation of the session daemon:
310 * - When the daemonize/background launcher process exits, the sessiond
311 * is fully able to respond to client requests,
312 * - Auto-loaded sessions are visible to clients.
313 *
314 * In order to achieve this, a number of support threads have to be launched
315 * to allow the "client" thread to function properly. Moreover, since the
316 * "load session" thread needs the client thread, we must provide a way
317 * for the "load session" thread to know that the "client" thread is up
318 * and running.
319 *
320 * Hence, the support threads decrement the lttng_sessiond_ready counter
321 * while the "client" threads waits for it to reach 0. Once the "client" thread
322 * unblocks, it posts the message_thread_ready semaphore which allows the
323 * "load session" thread to progress.
324 *
325 * This implies that the "load session" thread is the last to be initialized
326 * and will explicitly call sessiond_signal_parents(), which signals the parents
327 * that the session daemon is fully initialized.
328 *
329 * The four (4) support threads are:
330 * - agent_thread
331 * - notification_thread
332 * - rotation_thread
333 * - health_thread
334 */
335 #define NR_LTTNG_SESSIOND_SUPPORT_THREADS 4
336 int lttng_sessiond_ready = NR_LTTNG_SESSIOND_SUPPORT_THREADS;
337
338 int sessiond_check_thread_quit_pipe(int fd, uint32_t events)
339 {
340 return (fd == thread_quit_pipe[0] && (events & LPOLLIN)) ? 1 : 0;
341 }
342
343 /* Notify parents that we are ready for cmd and health check */
344 LTTNG_HIDDEN
345 void sessiond_signal_parents(void)
346 {
347 /*
348 * Notify parent pid that we are ready to accept command
349 * for client side. This ppid is the one from the
350 * external process that spawned us.
351 */
352 if (config.sig_parent) {
353 kill(ppid, SIGUSR1);
354 }
355
356 /*
357 * Notify the parent of the fork() process that we are
358 * ready.
359 */
360 if (config.daemonize || config.background) {
361 kill(child_ppid, SIGUSR1);
362 }
363 }
364
365 LTTNG_HIDDEN
366 void sessiond_notify_ready(void)
367 {
368 /*
369 * The _return variant is used since the implied memory barriers are
370 * required.
371 */
372 (void) uatomic_sub_return(&lttng_sessiond_ready, 1);
373 }
374
375 static
376 int __sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size,
377 int *a_pipe)
378 {
379 int ret;
380
381 assert(events);
382
383 ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
384 if (ret < 0) {
385 goto error;
386 }
387
388 /* Add quit pipe */
389 ret = lttng_poll_add(events, a_pipe[0], LPOLLIN | LPOLLERR);
390 if (ret < 0) {
391 goto error;
392 }
393
394 return 0;
395
396 error:
397 return ret;
398 }
399
400 /*
401 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
402 */
403 int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size)
404 {
405 return __sessiond_set_thread_pollset(events, size, thread_quit_pipe);
406 }
407
408 /*
409 * Init thread quit pipe.
410 *
411 * Return -1 on error or 0 if all pipes are created.
412 */
413 static int __init_thread_quit_pipe(int *a_pipe)
414 {
415 int ret, i;
416
417 ret = pipe(a_pipe);
418 if (ret < 0) {
419 PERROR("thread quit pipe");
420 goto error;
421 }
422
423 for (i = 0; i < 2; i++) {
424 ret = fcntl(a_pipe[i], F_SETFD, FD_CLOEXEC);
425 if (ret < 0) {
426 PERROR("fcntl");
427 goto error;
428 }
429 }
430
431 error:
432 return ret;
433 }
434
435 static int init_thread_quit_pipe(void)
436 {
437 return __init_thread_quit_pipe(thread_quit_pipe);
438 }
439
440 /*
441 * Stop all threads by closing the thread quit pipe.
442 */
443 static void stop_threads(void)
444 {
445 int ret;
446
447 /* Stopping all threads */
448 DBG("Terminating all threads");
449 ret = notify_thread_pipe(thread_quit_pipe[1]);
450 if (ret < 0) {
451 ERR("write error on thread quit pipe");
452 }
453
454 /* Dispatch thread */
455 CMM_STORE_SHARED(dispatch_thread_exit, 1);
456 futex_nto1_wake(&ust_cmd_queue.futex);
457 }
458
459 /*
460 * Close every consumer sockets.
461 */
462 static void close_consumer_sockets(void)
463 {
464 int ret;
465
466 if (kconsumer_data.err_sock >= 0) {
467 ret = close(kconsumer_data.err_sock);
468 if (ret < 0) {
469 PERROR("kernel consumer err_sock close");
470 }
471 }
472 if (ustconsumer32_data.err_sock >= 0) {
473 ret = close(ustconsumer32_data.err_sock);
474 if (ret < 0) {
475 PERROR("UST consumerd32 err_sock close");
476 }
477 }
478 if (ustconsumer64_data.err_sock >= 0) {
479 ret = close(ustconsumer64_data.err_sock);
480 if (ret < 0) {
481 PERROR("UST consumerd64 err_sock close");
482 }
483 }
484 if (kconsumer_data.cmd_sock >= 0) {
485 ret = close(kconsumer_data.cmd_sock);
486 if (ret < 0) {
487 PERROR("kernel consumer cmd_sock close");
488 }
489 }
490 if (ustconsumer32_data.cmd_sock >= 0) {
491 ret = close(ustconsumer32_data.cmd_sock);
492 if (ret < 0) {
493 PERROR("UST consumerd32 cmd_sock close");
494 }
495 }
496 if (ustconsumer64_data.cmd_sock >= 0) {
497 ret = close(ustconsumer64_data.cmd_sock);
498 if (ret < 0) {
499 PERROR("UST consumerd64 cmd_sock close");
500 }
501 }
502 if (kconsumer_data.channel_monitor_pipe >= 0) {
503 ret = close(kconsumer_data.channel_monitor_pipe);
504 if (ret < 0) {
505 PERROR("kernel consumer channel monitor pipe close");
506 }
507 }
508 if (ustconsumer32_data.channel_monitor_pipe >= 0) {
509 ret = close(ustconsumer32_data.channel_monitor_pipe);
510 if (ret < 0) {
511 PERROR("UST consumerd32 channel monitor pipe close");
512 }
513 }
514 if (ustconsumer64_data.channel_monitor_pipe >= 0) {
515 ret = close(ustconsumer64_data.channel_monitor_pipe);
516 if (ret < 0) {
517 PERROR("UST consumerd64 channel monitor pipe close");
518 }
519 }
520 if (kconsumer_data.channel_rotate_pipe >= 0) {
521 ret = close(kconsumer_data.channel_rotate_pipe);
522 if (ret < 0) {
523 PERROR("kernel consumer channel rotate pipe close");
524 }
525 }
526 if (ustconsumer32_data.channel_rotate_pipe >= 0) {
527 ret = close(ustconsumer32_data.channel_rotate_pipe);
528 if (ret < 0) {
529 PERROR("UST consumerd32 channel rotate pipe close");
530 }
531 }
532 if (ustconsumer64_data.channel_rotate_pipe >= 0) {
533 ret = close(ustconsumer64_data.channel_rotate_pipe);
534 if (ret < 0) {
535 PERROR("UST consumerd64 channel rotate pipe close");
536 }
537 }
538 }
539
540 /*
541 * Wait on consumer process termination.
542 *
543 * Need to be called with the consumer data lock held or from a context
544 * ensuring no concurrent access to data (e.g: cleanup).
545 */
546 static void wait_consumer(struct consumer_data *consumer_data)
547 {
548 pid_t ret;
549 int status;
550
551 if (consumer_data->pid <= 0) {
552 return;
553 }
554
555 DBG("Waiting for complete teardown of consumerd (PID: %d)",
556 consumer_data->pid);
557 ret = waitpid(consumer_data->pid, &status, 0);
558 if (ret == -1) {
559 PERROR("consumerd waitpid pid: %d", consumer_data->pid)
560 } else if (!WIFEXITED(status)) {
561 ERR("consumerd termination with error: %d",
562 WEXITSTATUS(ret));
563 }
564 consumer_data->pid = 0;
565 }
566
567 /*
568 * Cleanup the session daemon's data structures.
569 */
570 static void sessiond_cleanup(void)
571 {
572 int ret;
573 struct ltt_session *sess, *stmp;
574
575 DBG("Cleanup sessiond");
576
577 /*
578 * Close the thread quit pipe. It has already done its job,
579 * since we are now called.
580 */
581 utils_close_pipe(thread_quit_pipe);
582
583 /*
584 * If config.pid_file_path.value is undefined, the default file will be
585 * wiped when removing the rundir.
586 */
587 if (config.pid_file_path.value) {
588 ret = remove(config.pid_file_path.value);
589 if (ret < 0) {
590 PERROR("remove pidfile %s", config.pid_file_path.value);
591 }
592 }
593
594 DBG("Removing sessiond and consumerd content of directory %s",
595 config.rundir.value);
596
597 /* sessiond */
598 DBG("Removing %s", config.pid_file_path.value);
599 (void) unlink(config.pid_file_path.value);
600
601 DBG("Removing %s", config.agent_port_file_path.value);
602 (void) unlink(config.agent_port_file_path.value);
603
604 /* kconsumerd */
605 DBG("Removing %s", kconsumer_data.err_unix_sock_path);
606 (void) unlink(kconsumer_data.err_unix_sock_path);
607
608 DBG("Removing directory %s", config.kconsumerd_path.value);
609 (void) rmdir(config.kconsumerd_path.value);
610
611 /* ust consumerd 32 */
612 DBG("Removing %s", config.consumerd32_err_unix_sock_path.value);
613 (void) unlink(config.consumerd32_err_unix_sock_path.value);
614
615 DBG("Removing directory %s", config.consumerd32_path.value);
616 (void) rmdir(config.consumerd32_path.value);
617
618 /* ust consumerd 64 */
619 DBG("Removing %s", config.consumerd64_err_unix_sock_path.value);
620 (void) unlink(config.consumerd64_err_unix_sock_path.value);
621
622 DBG("Removing directory %s", config.consumerd64_path.value);
623 (void) rmdir(config.consumerd64_path.value);
624
625 DBG("Cleaning up all sessions");
626
627 /* Destroy session list mutex */
628 if (session_list_ptr != NULL) {
629 pthread_mutex_destroy(&session_list_ptr->lock);
630
631 /* Cleanup ALL session */
632 cds_list_for_each_entry_safe(sess, stmp,
633 &session_list_ptr->head, list) {
634 cmd_destroy_session(sess, kernel_poll_pipe[1],
635 notification_thread_handle);
636 }
637 }
638
639 wait_consumer(&kconsumer_data);
640 wait_consumer(&ustconsumer64_data);
641 wait_consumer(&ustconsumer32_data);
642
643 DBG("Cleaning up all agent apps");
644 agent_app_ht_clean();
645
646 DBG("Closing all UST sockets");
647 ust_app_clean_list();
648 buffer_reg_destroy_registries();
649
650 if (is_root && !config.no_kernel) {
651 DBG2("Closing kernel fd");
652 if (kernel_tracer_fd >= 0) {
653 ret = close(kernel_tracer_fd);
654 if (ret) {
655 PERROR("close");
656 }
657 }
658 DBG("Unloading kernel modules");
659 modprobe_remove_lttng_all();
660 free(syscall_table);
661 }
662
663 close_consumer_sockets();
664
665 if (load_info) {
666 load_session_destroy_data(load_info);
667 free(load_info);
668 }
669
670 /*
671 * We do NOT rmdir rundir because there are other processes
672 * using it, for instance lttng-relayd, which can start in
673 * parallel with this teardown.
674 */
675 }
676
677 /*
678 * Cleanup the daemon's option data structures.
679 */
680 static void sessiond_cleanup_options(void)
681 {
682 DBG("Cleaning up options");
683
684 sessiond_config_fini(&config);
685
686 run_as_destroy_worker();
687 }
688
689 /*
690 * Send data on a unix socket using the liblttsessiondcomm API.
691 *
692 * Return lttcomm error code.
693 */
694 static int send_unix_sock(int sock, void *buf, size_t len)
695 {
696 /* Check valid length */
697 if (len == 0) {
698 return -1;
699 }
700
701 return lttcomm_send_unix_sock(sock, buf, len);
702 }
703
704 /*
705 * Free memory of a command context structure.
706 */
707 static void clean_command_ctx(struct command_ctx **cmd_ctx)
708 {
709 DBG("Clean command context structure");
710 if (*cmd_ctx) {
711 if ((*cmd_ctx)->llm) {
712 free((*cmd_ctx)->llm);
713 }
714 if ((*cmd_ctx)->lsm) {
715 free((*cmd_ctx)->lsm);
716 }
717 free(*cmd_ctx);
718 *cmd_ctx = NULL;
719 }
720 }
721
722 /*
723 * Notify UST applications using the shm mmap futex.
724 */
725 static int notify_ust_apps(int active)
726 {
727 char *wait_shm_mmap;
728
729 DBG("Notifying applications of session daemon state: %d", active);
730
731 /* See shm.c for this call implying mmap, shm and futex calls */
732 wait_shm_mmap = shm_ust_get_mmap(config.wait_shm_path.value, is_root);
733 if (wait_shm_mmap == NULL) {
734 goto error;
735 }
736
737 /* Wake waiting process */
738 futex_wait_update((int32_t *) wait_shm_mmap, active);
739
740 /* Apps notified successfully */
741 return 0;
742
743 error:
744 return -1;
745 }
746
747 /*
748 * Setup the outgoing data buffer for the response (llm) by allocating the
749 * right amount of memory and copying the original information from the lsm
750 * structure.
751 *
752 * Return 0 on success, negative value on error.
753 */
754 static int setup_lttng_msg(struct command_ctx *cmd_ctx,
755 const void *payload_buf, size_t payload_len,
756 const void *cmd_header_buf, size_t cmd_header_len)
757 {
758 int ret = 0;
759 const size_t header_len = sizeof(struct lttcomm_lttng_msg);
760 const size_t cmd_header_offset = header_len;
761 const size_t payload_offset = cmd_header_offset + cmd_header_len;
762 const size_t total_msg_size = header_len + cmd_header_len + payload_len;
763
764 cmd_ctx->llm = zmalloc(total_msg_size);
765
766 if (cmd_ctx->llm == NULL) {
767 PERROR("zmalloc");
768 ret = -ENOMEM;
769 goto end;
770 }
771
772 /* Copy common data */
773 cmd_ctx->llm->cmd_type = cmd_ctx->lsm->cmd_type;
774 cmd_ctx->llm->pid = cmd_ctx->lsm->domain.attr.pid;
775 cmd_ctx->llm->cmd_header_size = cmd_header_len;
776 cmd_ctx->llm->data_size = payload_len;
777 cmd_ctx->lttng_msg_size = total_msg_size;
778
779 /* Copy command header */
780 if (cmd_header_len) {
781 memcpy(((uint8_t *) cmd_ctx->llm) + cmd_header_offset, cmd_header_buf,
782 cmd_header_len);
783 }
784
785 /* Copy payload */
786 if (payload_len) {
787 memcpy(((uint8_t *) cmd_ctx->llm) + payload_offset, payload_buf,
788 payload_len);
789 }
790
791 end:
792 return ret;
793 }
794
795 /*
796 * Version of setup_lttng_msg() without command header.
797 */
798 static int setup_lttng_msg_no_cmd_header(struct command_ctx *cmd_ctx,
799 void *payload_buf, size_t payload_len)
800 {
801 return setup_lttng_msg(cmd_ctx, payload_buf, payload_len, NULL, 0);
802 }
803 /*
804 * Update the kernel poll set of all channel fd available over all tracing
805 * session. Add the wakeup pipe at the end of the set.
806 */
807 static int update_kernel_poll(struct lttng_poll_event *events)
808 {
809 int ret;
810 struct ltt_session *session;
811 struct ltt_kernel_channel *channel;
812
813 DBG("Updating kernel poll set");
814
815 session_lock_list();
816 cds_list_for_each_entry(session, &session_list_ptr->head, list) {
817 session_lock(session);
818 if (session->kernel_session == NULL) {
819 session_unlock(session);
820 continue;
821 }
822
823 cds_list_for_each_entry(channel,
824 &session->kernel_session->channel_list.head, list) {
825 /* Add channel fd to the kernel poll set */
826 ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM);
827 if (ret < 0) {
828 session_unlock(session);
829 goto error;
830 }
831 DBG("Channel fd %d added to kernel set", channel->fd);
832 }
833 session_unlock(session);
834 }
835 session_unlock_list();
836
837 return 0;
838
839 error:
840 session_unlock_list();
841 return -1;
842 }
843
844 /*
845 * Find the channel fd from 'fd' over all tracing session. When found, check
846 * for new channel stream and send those stream fds to the kernel consumer.
847 *
848 * Useful for CPU hotplug feature.
849 */
850 static int update_kernel_stream(struct consumer_data *consumer_data, int fd)
851 {
852 int ret = 0;
853 struct ltt_session *session;
854 struct ltt_kernel_session *ksess;
855 struct ltt_kernel_channel *channel;
856
857 DBG("Updating kernel streams for channel fd %d", fd);
858
859 session_lock_list();
860 cds_list_for_each_entry(session, &session_list_ptr->head, list) {
861 session_lock(session);
862 if (session->kernel_session == NULL) {
863 session_unlock(session);
864 continue;
865 }
866 ksess = session->kernel_session;
867
868 cds_list_for_each_entry(channel,
869 &ksess->channel_list.head, list) {
870 struct lttng_ht_iter iter;
871 struct consumer_socket *socket;
872
873 if (channel->fd != fd) {
874 continue;
875 }
876 DBG("Channel found, updating kernel streams");
877 ret = kernel_open_channel_stream(channel);
878 if (ret < 0) {
879 goto error;
880 }
881 /* Update the stream global counter */
882 ksess->stream_count_global += ret;
883
884 /*
885 * Have we already sent fds to the consumer? If yes, it
886 * means that tracing is started so it is safe to send
887 * our updated stream fds.
888 */
889 if (ksess->consumer_fds_sent != 1
890 || ksess->consumer == NULL) {
891 ret = -1;
892 goto error;
893 }
894
895 rcu_read_lock();
896 cds_lfht_for_each_entry(ksess->consumer->socks->ht,
897 &iter.iter, socket, node.node) {
898 pthread_mutex_lock(socket->lock);
899 ret = kernel_consumer_send_channel_streams(socket,
900 channel, ksess,
901 session->output_traces ? 1 : 0);
902 pthread_mutex_unlock(socket->lock);
903 if (ret < 0) {
904 rcu_read_unlock();
905 goto error;
906 }
907 }
908 rcu_read_unlock();
909 }
910 session_unlock(session);
911 }
912 session_unlock_list();
913 return ret;
914
915 error:
916 session_unlock(session);
917 session_unlock_list();
918 return ret;
919 }
920
921 /*
922 * For each tracing session, update newly registered apps. The session list
923 * lock MUST be acquired before calling this.
924 */
925 static void update_ust_app(int app_sock)
926 {
927 struct ltt_session *sess, *stmp;
928
929 /* Consumer is in an ERROR state. Stop any application update. */
930 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
931 /* Stop the update process since the consumer is dead. */
932 return;
933 }
934
935 /* For all tracing session(s) */
936 cds_list_for_each_entry_safe(sess, stmp, &session_list_ptr->head, list) {
937 struct ust_app *app;
938
939 session_lock(sess);
940 if (!sess->ust_session) {
941 goto unlock_session;
942 }
943
944 rcu_read_lock();
945 assert(app_sock >= 0);
946 app = ust_app_find_by_sock(app_sock);
947 if (app == NULL) {
948 /*
949 * Application can be unregistered before so
950 * this is possible hence simply stopping the
951 * update.
952 */
953 DBG3("UST app update failed to find app sock %d",
954 app_sock);
955 goto unlock_rcu;
956 }
957 ust_app_global_update(sess->ust_session, app);
958 unlock_rcu:
959 rcu_read_unlock();
960 unlock_session:
961 session_unlock(sess);
962 }
963 }
964
965 /*
966 * This thread manage event coming from the kernel.
967 *
968 * Features supported in this thread:
969 * -) CPU Hotplug
970 */
971 static void *thread_manage_kernel(void *data)
972 {
973 int ret, i, pollfd, update_poll_flag = 1, err = -1;
974 uint32_t revents, nb_fd;
975 char tmp;
976 struct lttng_poll_event events;
977
978 DBG("[thread] Thread manage kernel started");
979
980 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL);
981
982 /*
983 * This first step of the while is to clean this structure which could free
984 * non NULL pointers so initialize it before the loop.
985 */
986 lttng_poll_init(&events);
987
988 if (testpoint(sessiond_thread_manage_kernel)) {
989 goto error_testpoint;
990 }
991
992 health_code_update();
993
994 if (testpoint(sessiond_thread_manage_kernel_before_loop)) {
995 goto error_testpoint;
996 }
997
998 while (1) {
999 health_code_update();
1000
1001 if (update_poll_flag == 1) {
1002 /* Clean events object. We are about to populate it again. */
1003 lttng_poll_clean(&events);
1004
1005 ret = sessiond_set_thread_pollset(&events, 2);
1006 if (ret < 0) {
1007 goto error_poll_create;
1008 }
1009
1010 ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN);
1011 if (ret < 0) {
1012 goto error;
1013 }
1014
1015 /* This will add the available kernel channel if any. */
1016 ret = update_kernel_poll(&events);
1017 if (ret < 0) {
1018 goto error;
1019 }
1020 update_poll_flag = 0;
1021 }
1022
1023 DBG("Thread kernel polling");
1024
1025 /* Poll infinite value of time */
1026 restart:
1027 health_poll_entry();
1028 ret = lttng_poll_wait(&events, -1);
1029 DBG("Thread kernel return from poll on %d fds",
1030 LTTNG_POLL_GETNB(&events));
1031 health_poll_exit();
1032 if (ret < 0) {
1033 /*
1034 * Restart interrupted system call.
1035 */
1036 if (errno == EINTR) {
1037 goto restart;
1038 }
1039 goto error;
1040 } else if (ret == 0) {
1041 /* Should not happen since timeout is infinite */
1042 ERR("Return value of poll is 0 with an infinite timeout.\n"
1043 "This should not have happened! Continuing...");
1044 continue;
1045 }
1046
1047 nb_fd = ret;
1048
1049 for (i = 0; i < nb_fd; i++) {
1050 /* Fetch once the poll data */
1051 revents = LTTNG_POLL_GETEV(&events, i);
1052 pollfd = LTTNG_POLL_GETFD(&events, i);
1053
1054 health_code_update();
1055
1056 if (!revents) {
1057 /* No activity for this FD (poll implementation). */
1058 continue;
1059 }
1060
1061 /* Thread quit pipe has been closed. Killing thread. */
1062 ret = sessiond_check_thread_quit_pipe(pollfd, revents);
1063 if (ret) {
1064 err = 0;
1065 goto exit;
1066 }
1067
1068 /* Check for data on kernel pipe */
1069 if (revents & LPOLLIN) {
1070 if (pollfd == kernel_poll_pipe[0]) {
1071 (void) lttng_read(kernel_poll_pipe[0],
1072 &tmp, 1);
1073 /*
1074 * Ret value is useless here, if this pipe gets any actions an
1075 * update is required anyway.
1076 */
1077 update_poll_flag = 1;
1078 continue;
1079 } else {
1080 /*
1081 * New CPU detected by the kernel. Adding kernel stream to
1082 * kernel session and updating the kernel consumer
1083 */
1084 ret = update_kernel_stream(&kconsumer_data, pollfd);
1085 if (ret < 0) {
1086 continue;
1087 }
1088 break;
1089 }
1090 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1091 update_poll_flag = 1;
1092 continue;
1093 } else {
1094 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
1095 goto error;
1096 }
1097 }
1098 }
1099
1100 exit:
1101 error:
1102 lttng_poll_clean(&events);
1103 error_poll_create:
1104 error_testpoint:
1105 utils_close_pipe(kernel_poll_pipe);
1106 kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1;
1107 if (err) {
1108 health_error();
1109 ERR("Health error occurred in %s", __func__);
1110 WARN("Kernel thread died unexpectedly. "
1111 "Kernel tracing can continue but CPU hotplug is disabled.");
1112 }
1113 health_unregister(health_sessiond);
1114 DBG("Kernel thread dying");
1115 return NULL;
1116 }
1117
1118 /*
1119 * Signal pthread condition of the consumer data that the thread.
1120 */
1121 static void signal_consumer_condition(struct consumer_data *data, int state)
1122 {
1123 pthread_mutex_lock(&data->cond_mutex);
1124
1125 /*
1126 * The state is set before signaling. It can be any value, it's the waiter
1127 * job to correctly interpret this condition variable associated to the
1128 * consumer pthread_cond.
1129 *
1130 * A value of 0 means that the corresponding thread of the consumer data
1131 * was not started. 1 indicates that the thread has started and is ready
1132 * for action. A negative value means that there was an error during the
1133 * thread bootstrap.
1134 */
1135 data->consumer_thread_is_ready = state;
1136 (void) pthread_cond_signal(&data->cond);
1137
1138 pthread_mutex_unlock(&data->cond_mutex);
1139 }
1140
1141 /*
1142 * This thread manage the consumer error sent back to the session daemon.
1143 */
1144 static void *thread_manage_consumer(void *data)
1145 {
1146 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
1147 uint32_t revents, nb_fd;
1148 enum lttcomm_return_code code;
1149 struct lttng_poll_event events;
1150 struct consumer_data *consumer_data = data;
1151 struct consumer_socket *cmd_socket_wrapper = NULL;
1152
1153 DBG("[thread] Manage consumer started");
1154
1155 rcu_register_thread();
1156 rcu_thread_online();
1157
1158 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
1159
1160 health_code_update();
1161
1162 /*
1163 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
1164 * metadata_sock. Nothing more will be added to this poll set.
1165 */
1166 ret = sessiond_set_thread_pollset(&events, 3);
1167 if (ret < 0) {
1168 goto error_poll;
1169 }
1170
1171 /*
1172 * The error socket here is already in a listening state which was done
1173 * just before spawning this thread to avoid a race between the consumer
1174 * daemon exec trying to connect and the listen() call.
1175 */
1176 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
1177 if (ret < 0) {
1178 goto error;
1179 }
1180
1181 health_code_update();
1182
1183 /* Infinite blocking call, waiting for transmission */
1184 restart:
1185 health_poll_entry();
1186
1187 if (testpoint(sessiond_thread_manage_consumer)) {
1188 goto error;
1189 }
1190
1191 ret = lttng_poll_wait(&events, -1);
1192 health_poll_exit();
1193 if (ret < 0) {
1194 /*
1195 * Restart interrupted system call.
1196 */
1197 if (errno == EINTR) {
1198 goto restart;
1199 }
1200 goto error;
1201 }
1202
1203 nb_fd = ret;
1204
1205 for (i = 0; i < nb_fd; i++) {
1206 /* Fetch once the poll data */
1207 revents = LTTNG_POLL_GETEV(&events, i);
1208 pollfd = LTTNG_POLL_GETFD(&events, i);
1209
1210 health_code_update();
1211
1212 if (!revents) {
1213 /* No activity for this FD (poll implementation). */
1214 continue;
1215 }
1216
1217 /* Thread quit pipe has been closed. Killing thread. */
1218 ret = sessiond_check_thread_quit_pipe(pollfd, revents);
1219 if (ret) {
1220 err = 0;
1221 goto exit;
1222 }
1223
1224 /* Event on the registration socket */
1225 if (pollfd == consumer_data->err_sock) {
1226 if (revents & LPOLLIN) {
1227 continue;
1228 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1229 ERR("consumer err socket poll error");
1230 goto error;
1231 } else {
1232 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
1233 goto error;
1234 }
1235 }
1236 }
1237
1238 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
1239 if (sock < 0) {
1240 goto error;
1241 }
1242
1243 /*
1244 * Set the CLOEXEC flag. Return code is useless because either way, the
1245 * show must go on.
1246 */
1247 (void) utils_set_fd_cloexec(sock);
1248
1249 health_code_update();
1250
1251 DBG2("Receiving code from consumer err_sock");
1252
1253 /* Getting status code from kconsumerd */
1254 ret = lttcomm_recv_unix_sock(sock, &code,
1255 sizeof(enum lttcomm_return_code));
1256 if (ret <= 0) {
1257 goto error;
1258 }
1259
1260 health_code_update();
1261 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
1262 ERR("consumer error when waiting for SOCK_READY : %s",
1263 lttcomm_get_readable_code(-code));
1264 goto error;
1265 }
1266
1267 /* Connect both command and metadata sockets. */
1268 consumer_data->cmd_sock =
1269 lttcomm_connect_unix_sock(
1270 consumer_data->cmd_unix_sock_path);
1271 consumer_data->metadata_fd =
1272 lttcomm_connect_unix_sock(
1273 consumer_data->cmd_unix_sock_path);
1274 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
1275 PERROR("consumer connect cmd socket");
1276 /* On error, signal condition and quit. */
1277 signal_consumer_condition(consumer_data, -1);
1278 goto error;
1279 }
1280
1281 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
1282
1283 /* Create metadata socket lock. */
1284 consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
1285 if (consumer_data->metadata_sock.lock == NULL) {
1286 PERROR("zmalloc pthread mutex");
1287 goto error;
1288 }
1289 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
1290
1291 DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
1292 DBG("Consumer metadata socket ready (fd: %d)",
1293 consumer_data->metadata_fd);
1294
1295 /*
1296 * Remove the consumerd error sock since we've established a connection.
1297 */
1298 ret = lttng_poll_del(&events, consumer_data->err_sock);
1299 if (ret < 0) {
1300 goto error;
1301 }
1302
1303 /* Add new accepted error socket. */
1304 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
1305 if (ret < 0) {
1306 goto error;
1307 }
1308
1309 /* Add metadata socket that is successfully connected. */
1310 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
1311 LPOLLIN | LPOLLRDHUP);
1312 if (ret < 0) {
1313 goto error;
1314 }
1315
1316 health_code_update();
1317
1318 /*
1319 * Transfer the write-end of the channel monitoring and rotate pipe
1320 * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE and
1321 * SET_CHANNEL_ROTATE_PIPE commands.
1322 */
1323 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
1324 if (!cmd_socket_wrapper) {
1325 goto error;
1326 }
1327 cmd_socket_wrapper->lock = &consumer_data->lock;
1328
1329 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
1330 consumer_data->channel_monitor_pipe);
1331 if (ret) {
1332 goto error;
1333 }
1334
1335 ret = consumer_send_channel_rotate_pipe(cmd_socket_wrapper,
1336 consumer_data->channel_rotate_pipe);
1337 if (ret) {
1338 goto error;
1339 }
1340
1341 /* Discard the socket wrapper as it is no longer needed. */
1342 consumer_destroy_socket(cmd_socket_wrapper);
1343 cmd_socket_wrapper = NULL;
1344
1345 /* The thread is completely initialized, signal that it is ready. */
1346 signal_consumer_condition(consumer_data, 1);
1347
1348 /* Infinite blocking call, waiting for transmission */
1349 restart_poll:
1350 while (1) {
1351 health_code_update();
1352
1353 /* Exit the thread because the thread quit pipe has been triggered. */
1354 if (should_quit) {
1355 /* Not a health error. */
1356 err = 0;
1357 goto exit;
1358 }
1359
1360 health_poll_entry();
1361 ret = lttng_poll_wait(&events, -1);
1362 health_poll_exit();
1363 if (ret < 0) {
1364 /*
1365 * Restart interrupted system call.
1366 */
1367 if (errno == EINTR) {
1368 goto restart_poll;
1369 }
1370 goto error;
1371 }
1372
1373 nb_fd = ret;
1374
1375 for (i = 0; i < nb_fd; i++) {
1376 /* Fetch once the poll data */
1377 revents = LTTNG_POLL_GETEV(&events, i);
1378 pollfd = LTTNG_POLL_GETFD(&events, i);
1379
1380 health_code_update();
1381
1382 if (!revents) {
1383 /* No activity for this FD (poll implementation). */
1384 continue;
1385 }
1386
1387 /*
1388 * Thread quit pipe has been triggered, flag that we should stop
1389 * but continue the current loop to handle potential data from
1390 * consumer.
1391 */
1392 should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
1393
1394 if (pollfd == sock) {
1395 /* Event on the consumerd socket */
1396 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
1397 && !(revents & LPOLLIN)) {
1398 ERR("consumer err socket second poll error");
1399 goto error;
1400 }
1401 health_code_update();
1402 /* Wait for any kconsumerd error */
1403 ret = lttcomm_recv_unix_sock(sock, &code,
1404 sizeof(enum lttcomm_return_code));
1405 if (ret <= 0) {
1406 ERR("consumer closed the command socket");
1407 goto error;
1408 }
1409
1410 ERR("consumer return code : %s",
1411 lttcomm_get_readable_code(-code));
1412
1413 goto exit;
1414 } else if (pollfd == consumer_data->metadata_fd) {
1415 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
1416 && !(revents & LPOLLIN)) {
1417 ERR("consumer err metadata socket second poll error");
1418 goto error;
1419 }
1420 /* UST metadata requests */
1421 ret = ust_consumer_metadata_request(
1422 &consumer_data->metadata_sock);
1423 if (ret < 0) {
1424 ERR("Handling metadata request");
1425 goto error;
1426 }
1427 }
1428 /* No need for an else branch all FDs are tested prior. */
1429 }
1430 health_code_update();
1431 }
1432
1433 exit:
1434 error:
1435 /*
1436 * We lock here because we are about to close the sockets and some other
1437 * thread might be using them so get exclusive access which will abort all
1438 * other consumer command by other threads.
1439 */
1440 pthread_mutex_lock(&consumer_data->lock);
1441
1442 /* Immediately set the consumerd state to stopped */
1443 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
1444 uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
1445 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
1446 consumer_data->type == LTTNG_CONSUMER32_UST) {
1447 uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
1448 } else {
1449 /* Code flow error... */
1450 assert(0);
1451 }
1452
1453 if (consumer_data->err_sock >= 0) {
1454 ret = close(consumer_data->err_sock);
1455 if (ret) {
1456 PERROR("close");
1457 }
1458 consumer_data->err_sock = -1;
1459 }
1460 if (consumer_data->cmd_sock >= 0) {
1461 ret = close(consumer_data->cmd_sock);
1462 if (ret) {
1463 PERROR("close");
1464 }
1465 consumer_data->cmd_sock = -1;
1466 }
1467 if (consumer_data->metadata_sock.fd_ptr &&
1468 *consumer_data->metadata_sock.fd_ptr >= 0) {
1469 ret = close(*consumer_data->metadata_sock.fd_ptr);
1470 if (ret) {
1471 PERROR("close");
1472 }
1473 }
1474 if (sock >= 0) {
1475 ret = close(sock);
1476 if (ret) {
1477 PERROR("close");
1478 }
1479 }
1480
1481 unlink(consumer_data->err_unix_sock_path);
1482 unlink(consumer_data->cmd_unix_sock_path);
1483 pthread_mutex_unlock(&consumer_data->lock);
1484
1485 /* Cleanup metadata socket mutex. */
1486 if (consumer_data->metadata_sock.lock) {
1487 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
1488 free(consumer_data->metadata_sock.lock);
1489 }
1490 lttng_poll_clean(&events);
1491
1492 if (cmd_socket_wrapper) {
1493 consumer_destroy_socket(cmd_socket_wrapper);
1494 }
1495 error_poll:
1496 if (err) {
1497 health_error();
1498 ERR("Health error occurred in %s", __func__);
1499 }
1500 health_unregister(health_sessiond);
1501 DBG("consumer thread cleanup completed");
1502
1503 rcu_thread_offline();
1504 rcu_unregister_thread();
1505
1506 return NULL;
1507 }
1508
1509 /*
1510 * This thread receives application command sockets (FDs) on the
1511 * apps_cmd_pipe and waits (polls) on them until they are closed
1512 * or an error occurs.
1513 *
1514 * At that point, it flushes the data (tracing and metadata) associated
1515 * with this application and tears down ust app sessions and other
1516 * associated data structures through ust_app_unregister().
1517 *
1518 * Note that this thread never sends commands to the applications
1519 * through the command sockets; it merely listens for hang-ups
1520 * and errors on those sockets and cleans-up as they occur.
1521 */
1522 static void *thread_manage_apps(void *data)
1523 {
1524 int i, ret, pollfd, err = -1;
1525 ssize_t size_ret;
1526 uint32_t revents, nb_fd;
1527 struct lttng_poll_event events;
1528
1529 DBG("[thread] Manage application started");
1530
1531 rcu_register_thread();
1532 rcu_thread_online();
1533
1534 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_MANAGE);
1535
1536 if (testpoint(sessiond_thread_manage_apps)) {
1537 goto error_testpoint;
1538 }
1539
1540 health_code_update();
1541
1542 ret = sessiond_set_thread_pollset(&events, 2);
1543 if (ret < 0) {
1544 goto error_poll_create;
1545 }
1546
1547 ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
1548 if (ret < 0) {
1549 goto error;
1550 }
1551
1552 if (testpoint(sessiond_thread_manage_apps_before_loop)) {
1553 goto error;
1554 }
1555
1556 health_code_update();
1557
1558 while (1) {
1559 DBG("Apps thread polling");
1560
1561 /* Inifinite blocking call, waiting for transmission */
1562 restart:
1563 health_poll_entry();
1564 ret = lttng_poll_wait(&events, -1);
1565 DBG("Apps thread return from poll on %d fds",
1566 LTTNG_POLL_GETNB(&events));
1567 health_poll_exit();
1568 if (ret < 0) {
1569 /*
1570 * Restart interrupted system call.
1571 */
1572 if (errno == EINTR) {
1573 goto restart;
1574 }
1575 goto error;
1576 }
1577
1578 nb_fd = ret;
1579
1580 for (i = 0; i < nb_fd; i++) {
1581 /* Fetch once the poll data */
1582 revents = LTTNG_POLL_GETEV(&events, i);
1583 pollfd = LTTNG_POLL_GETFD(&events, i);
1584
1585 health_code_update();
1586
1587 if (!revents) {
1588 /* No activity for this FD (poll implementation). */
1589 continue;
1590 }
1591
1592 /* Thread quit pipe has been closed. Killing thread. */
1593 ret = sessiond_check_thread_quit_pipe(pollfd, revents);
1594 if (ret) {
1595 err = 0;
1596 goto exit;
1597 }
1598
1599 /* Inspect the apps cmd pipe */
1600 if (pollfd == apps_cmd_pipe[0]) {
1601 if (revents & LPOLLIN) {
1602 int sock;
1603
1604 /* Empty pipe */
1605 size_ret = lttng_read(apps_cmd_pipe[0], &sock, sizeof(sock));
1606 if (size_ret < sizeof(sock)) {
1607 PERROR("read apps cmd pipe");
1608 goto error;
1609 }
1610
1611 health_code_update();
1612
1613 /*
1614 * Since this is a command socket (write then read),
1615 * we only monitor the error events of the socket.
1616 */
1617 ret = lttng_poll_add(&events, sock,
1618 LPOLLERR | LPOLLHUP | LPOLLRDHUP);
1619 if (ret < 0) {
1620 goto error;
1621 }
1622
1623 DBG("Apps with sock %d added to poll set", sock);
1624 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1625 ERR("Apps command pipe error");
1626 goto error;
1627 } else {
1628 ERR("Unknown poll events %u for sock %d", revents, pollfd);
1629 goto error;
1630 }
1631 } else {
1632 /*
1633 * At this point, we know that a registered application made
1634 * the event at poll_wait.
1635 */
1636 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1637 /* Removing from the poll set */
1638 ret = lttng_poll_del(&events, pollfd);
1639 if (ret < 0) {
1640 goto error;
1641 }
1642
1643 /* Socket closed on remote end. */
1644 ust_app_unregister(pollfd);
1645 } else {
1646 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
1647 goto error;
1648 }
1649 }
1650
1651 health_code_update();
1652 }
1653 }
1654
1655 exit:
1656 error:
1657 lttng_poll_clean(&events);
1658 error_poll_create:
1659 error_testpoint:
1660 utils_close_pipe(apps_cmd_pipe);
1661 apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1;
1662
1663 /*
1664 * We don't clean the UST app hash table here since already registered
1665 * applications can still be controlled so let them be until the session
1666 * daemon dies or the applications stop.
1667 */
1668
1669 if (err) {
1670 health_error();
1671 ERR("Health error occurred in %s", __func__);
1672 }
1673 health_unregister(health_sessiond);
1674 DBG("Application communication apps thread cleanup complete");
1675 rcu_thread_offline();
1676 rcu_unregister_thread();
1677 return NULL;
1678 }
1679
1680 /*
1681 * Send a socket to a thread This is called from the dispatch UST registration
1682 * thread once all sockets are set for the application.
1683 *
1684 * The sock value can be invalid, we don't really care, the thread will handle
1685 * it and make the necessary cleanup if so.
1686 *
1687 * On success, return 0 else a negative value being the errno message of the
1688 * write().
1689 */
1690 static int send_socket_to_thread(int fd, int sock)
1691 {
1692 ssize_t ret;
1693
1694 /*
1695 * It's possible that the FD is set as invalid with -1 concurrently just
1696 * before calling this function being a shutdown state of the thread.
1697 */
1698 if (fd < 0) {
1699 ret = -EBADF;
1700 goto error;
1701 }
1702
1703 ret = lttng_write(fd, &sock, sizeof(sock));
1704 if (ret < sizeof(sock)) {
1705 PERROR("write apps pipe %d", fd);
1706 if (ret < 0) {
1707 ret = -errno;
1708 }
1709 goto error;
1710 }
1711
1712 /* All good. Don't send back the write positive ret value. */
1713 ret = 0;
1714 error:
1715 return (int) ret;
1716 }
1717
1718 /*
1719 * Sanitize the wait queue of the dispatch registration thread meaning removing
1720 * invalid nodes from it. This is to avoid memory leaks for the case the UST
1721 * notify socket is never received.
1722 */
1723 static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
1724 {
1725 int ret, nb_fd = 0, i;
1726 unsigned int fd_added = 0;
1727 struct lttng_poll_event events;
1728 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
1729
1730 assert(wait_queue);
1731
1732 lttng_poll_init(&events);
1733
1734 /* Just skip everything for an empty queue. */
1735 if (!wait_queue->count) {
1736 goto end;
1737 }
1738
1739 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
1740 if (ret < 0) {
1741 goto error_create;
1742 }
1743
1744 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
1745 &wait_queue->head, head) {
1746 assert(wait_node->app);
1747 ret = lttng_poll_add(&events, wait_node->app->sock,
1748 LPOLLHUP | LPOLLERR);
1749 if (ret < 0) {
1750 goto error;
1751 }
1752
1753 fd_added = 1;
1754 }
1755
1756 if (!fd_added) {
1757 goto end;
1758 }
1759
1760 /*
1761 * Poll but don't block so we can quickly identify the faulty events and
1762 * clean them afterwards from the wait queue.
1763 */
1764 ret = lttng_poll_wait(&events, 0);
1765 if (ret < 0) {
1766 goto error;
1767 }
1768 nb_fd = ret;
1769
1770 for (i = 0; i < nb_fd; i++) {
1771 /* Get faulty FD. */
1772 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
1773 int pollfd = LTTNG_POLL_GETFD(&events, i);
1774
1775 if (!revents) {
1776 /* No activity for this FD (poll implementation). */
1777 continue;
1778 }
1779
1780 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
1781 &wait_queue->head, head) {
1782 if (pollfd == wait_node->app->sock &&
1783 (revents & (LPOLLHUP | LPOLLERR))) {
1784 cds_list_del(&wait_node->head);
1785 wait_queue->count--;
1786 ust_app_destroy(wait_node->app);
1787 free(wait_node);
1788 /*
1789 * Silence warning of use-after-free in
1790 * cds_list_for_each_entry_safe which uses
1791 * __typeof__(*wait_node).
1792 */
1793 wait_node = NULL;
1794 break;
1795 } else {
1796 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
1797 goto error;
1798 }
1799 }
1800 }
1801
1802 if (nb_fd > 0) {
1803 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
1804 }
1805
1806 end:
1807 lttng_poll_clean(&events);
1808 return;
1809
1810 error:
1811 lttng_poll_clean(&events);
1812 error_create:
1813 ERR("Unable to sanitize wait queue");
1814 return;
1815 }
1816
1817 /*
1818 * Dispatch request from the registration threads to the application
1819 * communication thread.
1820 */
1821 static void *thread_dispatch_ust_registration(void *data)
1822 {
1823 int ret, err = -1;
1824 struct cds_wfcq_node *node;
1825 struct ust_command *ust_cmd = NULL;
1826 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
1827 struct ust_reg_wait_queue wait_queue = {
1828 .count = 0,
1829 };
1830
1831 rcu_register_thread();
1832
1833 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
1834
1835 if (testpoint(sessiond_thread_app_reg_dispatch)) {
1836 goto error_testpoint;
1837 }
1838
1839 health_code_update();
1840
1841 CDS_INIT_LIST_HEAD(&wait_queue.head);
1842
1843 DBG("[thread] Dispatch UST command started");
1844
1845 for (;;) {
1846 health_code_update();
1847
1848 /* Atomically prepare the queue futex */
1849 futex_nto1_prepare(&ust_cmd_queue.futex);
1850
1851 if (CMM_LOAD_SHARED(dispatch_thread_exit)) {
1852 break;
1853 }
1854
1855 do {
1856 struct ust_app *app = NULL;
1857 ust_cmd = NULL;
1858
1859 /*
1860 * Make sure we don't have node(s) that have hung up before receiving
1861 * the notify socket. This is to clean the list in order to avoid
1862 * memory leaks from notify socket that are never seen.
1863 */
1864 sanitize_wait_queue(&wait_queue);
1865
1866 health_code_update();
1867 /* Dequeue command for registration */
1868 node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail);
1869 if (node == NULL) {
1870 DBG("Woken up but nothing in the UST command queue");
1871 /* Continue thread execution */
1872 break;
1873 }
1874
1875 ust_cmd = caa_container_of(node, struct ust_command, node);
1876
1877 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
1878 " gid:%d sock:%d name:%s (version %d.%d)",
1879 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
1880 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
1881 ust_cmd->sock, ust_cmd->reg_msg.name,
1882 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
1883
1884 if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
1885 wait_node = zmalloc(sizeof(*wait_node));
1886 if (!wait_node) {
1887 PERROR("zmalloc wait_node dispatch");
1888 ret = close(ust_cmd->sock);
1889 if (ret < 0) {
1890 PERROR("close ust sock dispatch %d", ust_cmd->sock);
1891 }
1892 lttng_fd_put(LTTNG_FD_APPS, 1);
1893 free(ust_cmd);
1894 goto error;
1895 }
1896 CDS_INIT_LIST_HEAD(&wait_node->head);
1897
1898 /* Create application object if socket is CMD. */
1899 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
1900 ust_cmd->sock);
1901 if (!wait_node->app) {
1902 ret = close(ust_cmd->sock);
1903 if (ret < 0) {
1904 PERROR("close ust sock dispatch %d", ust_cmd->sock);
1905 }
1906 lttng_fd_put(LTTNG_FD_APPS, 1);
1907 free(wait_node);
1908 free(ust_cmd);
1909 continue;
1910 }
1911 /*
1912 * Add application to the wait queue so we can set the notify
1913 * socket before putting this object in the global ht.
1914 */
1915 cds_list_add(&wait_node->head, &wait_queue.head);
1916 wait_queue.count++;
1917
1918 free(ust_cmd);
1919 /*
1920 * We have to continue here since we don't have the notify
1921 * socket and the application MUST be added to the hash table
1922 * only at that moment.
1923 */
1924 continue;
1925 } else {
1926 /*
1927 * Look for the application in the local wait queue and set the
1928 * notify socket if found.
1929 */
1930 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
1931 &wait_queue.head, head) {
1932 health_code_update();
1933 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
1934 wait_node->app->notify_sock = ust_cmd->sock;
1935 cds_list_del(&wait_node->head);
1936 wait_queue.count--;
1937 app = wait_node->app;
1938 free(wait_node);
1939 DBG3("UST app notify socket %d is set", ust_cmd->sock);
1940 break;
1941 }
1942 }
1943
1944 /*
1945 * With no application at this stage the received socket is
1946 * basically useless so close it before we free the cmd data
1947 * structure for good.
1948 */
1949 if (!app) {
1950 ret = close(ust_cmd->sock);
1951 if (ret < 0) {
1952 PERROR("close ust sock dispatch %d", ust_cmd->sock);
1953 }
1954 lttng_fd_put(LTTNG_FD_APPS, 1);
1955 }
1956 free(ust_cmd);
1957 }
1958
1959 if (app) {
1960 /*
1961 * @session_lock_list
1962 *
1963 * Lock the global session list so from the register up to the
1964 * registration done message, no thread can see the application
1965 * and change its state.
1966 */
1967 session_lock_list();
1968 rcu_read_lock();
1969
1970 /*
1971 * Add application to the global hash table. This needs to be
1972 * done before the update to the UST registry can locate the
1973 * application.
1974 */
1975 ust_app_add(app);
1976
1977 /* Set app version. This call will print an error if needed. */
1978 (void) ust_app_version(app);
1979
1980 /* Send notify socket through the notify pipe. */
1981 ret = send_socket_to_thread(apps_cmd_notify_pipe[1],
1982 app->notify_sock);
1983 if (ret < 0) {
1984 rcu_read_unlock();
1985 session_unlock_list();
1986 /*
1987 * No notify thread, stop the UST tracing. However, this is
1988 * not an internal error of the this thread thus setting
1989 * the health error code to a normal exit.
1990 */
1991 err = 0;
1992 goto error;
1993 }
1994
1995 /*
1996 * Update newly registered application with the tracing
1997 * registry info already enabled information.
1998 */
1999 update_ust_app(app->sock);
2000
2001 /*
2002 * Don't care about return value. Let the manage apps threads
2003 * handle app unregistration upon socket close.
2004 */
2005 (void) ust_app_register_done(app);
2006
2007 /*
2008 * Even if the application socket has been closed, send the app
2009 * to the thread and unregistration will take place at that
2010 * place.
2011 */
2012 ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock);
2013 if (ret < 0) {
2014 rcu_read_unlock();
2015 session_unlock_list();
2016 /*
2017 * No apps. thread, stop the UST tracing. However, this is
2018 * not an internal error of the this thread thus setting
2019 * the health error code to a normal exit.
2020 */
2021 err = 0;
2022 goto error;
2023 }
2024
2025 rcu_read_unlock();
2026 session_unlock_list();
2027 }
2028 } while (node != NULL);
2029
2030 health_poll_entry();
2031 /* Futex wait on queue. Blocking call on futex() */
2032 futex_nto1_wait(&ust_cmd_queue.futex);
2033 health_poll_exit();
2034 }
2035 /* Normal exit, no error */
2036 err = 0;
2037
2038 error:
2039 /* Clean up wait queue. */
2040 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
2041 &wait_queue.head, head) {
2042 cds_list_del(&wait_node->head);
2043 wait_queue.count--;
2044 free(wait_node);
2045 }
2046
2047 /* Empty command queue. */
2048 for (;;) {
2049 /* Dequeue command for registration */
2050 node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail);
2051 if (node == NULL) {
2052 break;
2053 }
2054 ust_cmd = caa_container_of(node, struct ust_command, node);
2055 ret = close(ust_cmd->sock);
2056 if (ret < 0) {
2057 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
2058 }
2059 lttng_fd_put(LTTNG_FD_APPS, 1);
2060 free(ust_cmd);
2061 }
2062
2063 error_testpoint:
2064 DBG("Dispatch thread dying");
2065 if (err) {
2066 health_error();
2067 ERR("Health error occurred in %s", __func__);
2068 }
2069 health_unregister(health_sessiond);
2070 rcu_unregister_thread();
2071 return NULL;
2072 }
2073
2074 /*
2075 * This thread manage application registration.
2076 */
2077 static void *thread_registration_apps(void *data)
2078 {
2079 int sock = -1, i, ret, pollfd, err = -1;
2080 uint32_t revents, nb_fd;
2081 struct lttng_poll_event events;
2082 /*
2083 * Get allocated in this thread, enqueued to a global queue, dequeued and
2084 * freed in the manage apps thread.
2085 */
2086 struct ust_command *ust_cmd = NULL;
2087
2088 DBG("[thread] Manage application registration started");
2089
2090 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG);
2091
2092 if (testpoint(sessiond_thread_registration_apps)) {
2093 goto error_testpoint;
2094 }
2095
2096 ret = lttcomm_listen_unix_sock(apps_sock);
2097 if (ret < 0) {
2098 goto error_listen;
2099 }
2100
2101 /*
2102 * Pass 2 as size here for the thread quit pipe and apps socket. Nothing
2103 * more will be added to this poll set.
2104 */
2105 ret = sessiond_set_thread_pollset(&events, 2);
2106 if (ret < 0) {
2107 goto error_create_poll;
2108 }
2109
2110 /* Add the application registration socket */
2111 ret = lttng_poll_add(&events, apps_sock, LPOLLIN | LPOLLRDHUP);
2112 if (ret < 0) {
2113 goto error_poll_add;
2114 }
2115
2116 /* Notify all applications to register */
2117 ret = notify_ust_apps(1);
2118 if (ret < 0) {
2119 ERR("Failed to notify applications or create the wait shared memory.\n"
2120 "Execution continues but there might be problem for already\n"
2121 "running applications that wishes to register.");
2122 }
2123
2124 while (1) {
2125 DBG("Accepting application registration");
2126
2127 /* Inifinite blocking call, waiting for transmission */
2128 restart:
2129 health_poll_entry();
2130 ret = lttng_poll_wait(&events, -1);
2131 health_poll_exit();
2132 if (ret < 0) {
2133 /*
2134 * Restart interrupted system call.
2135 */
2136 if (errno == EINTR) {
2137 goto restart;
2138 }
2139 goto error;
2140 }
2141
2142 nb_fd = ret;
2143
2144 for (i = 0; i < nb_fd; i++) {
2145 health_code_update();
2146
2147 /* Fetch once the poll data */
2148 revents = LTTNG_POLL_GETEV(&events, i);
2149 pollfd = LTTNG_POLL_GETFD(&events, i);
2150
2151 if (!revents) {
2152 /* No activity for this FD (poll implementation). */
2153 continue;
2154 }
2155
2156 /* Thread quit pipe has been closed. Killing thread. */
2157 ret = sessiond_check_thread_quit_pipe(pollfd, revents);
2158 if (ret) {
2159 err = 0;
2160 goto exit;
2161 }
2162
2163 /* Event on the registration socket */
2164 if (pollfd == apps_sock) {
2165 if (revents & LPOLLIN) {
2166 sock = lttcomm_accept_unix_sock(apps_sock);
2167 if (sock < 0) {
2168 goto error;
2169 }
2170
2171 /*
2172 * Set socket timeout for both receiving and ending.
2173 * app_socket_timeout is in seconds, whereas
2174 * lttcomm_setsockopt_rcv_timeout and
2175 * lttcomm_setsockopt_snd_timeout expect msec as
2176 * parameter.
2177 */
2178 if (config.app_socket_timeout >= 0) {
2179 (void) lttcomm_setsockopt_rcv_timeout(sock,
2180 config.app_socket_timeout * 1000);
2181 (void) lttcomm_setsockopt_snd_timeout(sock,
2182 config.app_socket_timeout * 1000);
2183 }
2184
2185 /*
2186 * Set the CLOEXEC flag. Return code is useless because
2187 * either way, the show must go on.
2188 */
2189 (void) utils_set_fd_cloexec(sock);
2190
2191 /* Create UST registration command for enqueuing */
2192 ust_cmd = zmalloc(sizeof(struct ust_command));
2193 if (ust_cmd == NULL) {
2194 PERROR("ust command zmalloc");
2195 ret = close(sock);
2196 if (ret) {
2197 PERROR("close");
2198 }
2199 goto error;
2200 }
2201
2202 /*
2203 * Using message-based transmissions to ensure we don't
2204 * have to deal with partially received messages.
2205 */
2206 ret = lttng_fd_get(LTTNG_FD_APPS, 1);
2207 if (ret < 0) {
2208 ERR("Exhausted file descriptors allowed for applications.");
2209 free(ust_cmd);
2210 ret = close(sock);
2211 if (ret) {
2212 PERROR("close");
2213 }
2214 sock = -1;
2215 continue;
2216 }
2217
2218 health_code_update();
2219 ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
2220 if (ret < 0) {
2221 free(ust_cmd);
2222 /* Close socket of the application. */
2223 ret = close(sock);
2224 if (ret) {
2225 PERROR("close");
2226 }
2227 lttng_fd_put(LTTNG_FD_APPS, 1);
2228 sock = -1;
2229 continue;
2230 }
2231 health_code_update();
2232
2233 ust_cmd->sock = sock;
2234 sock = -1;
2235
2236 DBG("UST registration received with pid:%d ppid:%d uid:%d"
2237 " gid:%d sock:%d name:%s (version %d.%d)",
2238 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
2239 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
2240 ust_cmd->sock, ust_cmd->reg_msg.name,
2241 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
2242
2243 /*
2244 * Lock free enqueue the registration request. The red pill
2245 * has been taken! This apps will be part of the *system*.
2246 */
2247 cds_wfcq_enqueue(&ust_cmd_queue.head, &ust_cmd_queue.tail, &ust_cmd->node);
2248
2249 /*
2250 * Wake the registration queue futex. Implicit memory
2251 * barrier with the exchange in cds_wfcq_enqueue.
2252 */
2253 futex_nto1_wake(&ust_cmd_queue.futex);
2254 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2255 ERR("Register apps socket poll error");
2256 goto error;
2257 } else {
2258 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
2259 goto error;
2260 }
2261 }
2262 }
2263 }
2264
2265 exit:
2266 error:
2267 /* Notify that the registration thread is gone */
2268 notify_ust_apps(0);
2269
2270 if (apps_sock >= 0) {
2271 ret = close(apps_sock);
2272 if (ret) {
2273 PERROR("close");
2274 }
2275 }
2276 if (sock >= 0) {
2277 ret = close(sock);
2278 if (ret) {
2279 PERROR("close");
2280 }
2281 lttng_fd_put(LTTNG_FD_APPS, 1);
2282 }
2283 unlink(config.apps_unix_sock_path.value);
2284
2285 error_poll_add:
2286 lttng_poll_clean(&events);
2287 error_listen:
2288 error_create_poll:
2289 error_testpoint:
2290 DBG("UST Registration thread cleanup complete");
2291 if (err) {
2292 health_error();
2293 ERR("Health error occurred in %s", __func__);
2294 }
2295 health_unregister(health_sessiond);
2296
2297 return NULL;
2298 }
2299
2300 /*
2301 * Start the thread_manage_consumer. This must be done after a lttng-consumerd
2302 * exec or it will fails.
2303 */
2304 static int spawn_consumer_thread(struct consumer_data *consumer_data)
2305 {
2306 int ret, clock_ret;
2307 struct timespec timeout;
2308
2309 /*
2310 * Make sure we set the readiness flag to 0 because we are NOT ready.
2311 * This access to consumer_thread_is_ready does not need to be
2312 * protected by consumer_data.cond_mutex (yet) since the consumer
2313 * management thread has not been started at this point.
2314 */
2315 consumer_data->consumer_thread_is_ready = 0;
2316
2317 /* Setup pthread condition */
2318 ret = pthread_condattr_init(&consumer_data->condattr);
2319 if (ret) {
2320 errno = ret;
2321 PERROR("pthread_condattr_init consumer data");
2322 goto error;
2323 }
2324
2325 /*
2326 * Set the monotonic clock in order to make sure we DO NOT jump in time
2327 * between the clock_gettime() call and the timedwait call. See bug #324
2328 * for a more details and how we noticed it.
2329 */
2330 ret = pthread_condattr_setclock(&consumer_data->condattr, CLOCK_MONOTONIC);
2331 if (ret) {
2332 errno = ret;
2333 PERROR("pthread_condattr_setclock consumer data");
2334 goto error;
2335 }
2336
2337 ret = pthread_cond_init(&consumer_data->cond, &consumer_data->condattr);
2338 if (ret) {
2339 errno = ret;
2340 PERROR("pthread_cond_init consumer data");
2341 goto error;
2342 }
2343
2344 ret = pthread_create(&consumer_data->thread, default_pthread_attr(),
2345 thread_manage_consumer, consumer_data);
2346 if (ret) {
2347 errno = ret;
2348 PERROR("pthread_create consumer");
2349 ret = -1;
2350 goto error;
2351 }
2352
2353 /* We are about to wait on a pthread condition */
2354 pthread_mutex_lock(&consumer_data->cond_mutex);
2355
2356 /* Get time for sem_timedwait absolute timeout */
2357 clock_ret = lttng_clock_gettime(CLOCK_MONOTONIC, &timeout);
2358 /*
2359 * Set the timeout for the condition timed wait even if the clock gettime
2360 * call fails since we might loop on that call and we want to avoid to
2361 * increment the timeout too many times.
2362 */
2363 timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT;
2364
2365 /*
2366 * The following loop COULD be skipped in some conditions so this is why we
2367 * set ret to 0 in order to make sure at least one round of the loop is
2368 * done.
2369 */
2370 ret = 0;
2371
2372 /*
2373 * Loop until the condition is reached or when a timeout is reached. Note
2374 * that the pthread_cond_timedwait(P) man page specifies that EINTR can NOT
2375 * be returned but the pthread_cond(3), from the glibc-doc, says that it is
2376 * possible. This loop does not take any chances and works with both of
2377 * them.
2378 */
2379 while (!consumer_data->consumer_thread_is_ready && ret != ETIMEDOUT) {
2380 if (clock_ret < 0) {
2381 PERROR("clock_gettime spawn consumer");
2382 /* Infinite wait for the consumerd thread to be ready */
2383 ret = pthread_cond_wait(&consumer_data->cond,
2384 &consumer_data->cond_mutex);
2385 } else {
2386 ret = pthread_cond_timedwait(&consumer_data->cond,
2387 &consumer_data->cond_mutex, &timeout);
2388 }
2389 }
2390
2391 /* Release the pthread condition */
2392 pthread_mutex_unlock(&consumer_data->cond_mutex);
2393
2394 if (ret != 0) {
2395 errno = ret;
2396 if (ret == ETIMEDOUT) {
2397 int pth_ret;
2398
2399 /*
2400 * Call has timed out so we kill the kconsumerd_thread and return
2401 * an error.
2402 */
2403 ERR("Condition timed out. The consumer thread was never ready."
2404 " Killing it");
2405 pth_ret = pthread_cancel(consumer_data->thread);
2406 if (pth_ret < 0) {
2407 PERROR("pthread_cancel consumer thread");
2408 }
2409 } else {
2410 PERROR("pthread_cond_wait failed consumer thread");
2411 }
2412 /* Caller is expecting a negative value on failure. */
2413 ret = -1;
2414 goto error;
2415 }
2416
2417 pthread_mutex_lock(&consumer_data->pid_mutex);
2418 if (consumer_data->pid == 0) {
2419 ERR("Consumerd did not start");
2420 pthread_mutex_unlock(&consumer_data->pid_mutex);
2421 goto error;
2422 }
2423 pthread_mutex_unlock(&consumer_data->pid_mutex);
2424
2425 return 0;
2426
2427 error:
2428 return ret;
2429 }
2430
2431 /*
2432 * Join consumer thread
2433 */
2434 static int join_consumer_thread(struct consumer_data *consumer_data)
2435 {
2436 void *status;
2437
2438 /* Consumer pid must be a real one. */
2439 if (consumer_data->pid > 0) {
2440 int ret;
2441 ret = kill(consumer_data->pid, SIGTERM);
2442 if (ret) {
2443 PERROR("Error killing consumer daemon");
2444 return ret;
2445 }
2446 return pthread_join(consumer_data->thread, &status);
2447 } else {
2448 return 0;
2449 }
2450 }
2451
2452 /*
2453 * Fork and exec a consumer daemon (consumerd).
2454 *
2455 * Return pid if successful else -1.
2456 */
2457 static pid_t spawn_consumerd(struct consumer_data *consumer_data)
2458 {
2459 int ret;
2460 pid_t pid;
2461 const char *consumer_to_use;
2462 const char *verbosity;
2463 struct stat st;
2464
2465 DBG("Spawning consumerd");
2466
2467 pid = fork();
2468 if (pid == 0) {
2469 /*
2470 * Exec consumerd.
2471 */
2472 if (config.verbose_consumer) {
2473 verbosity = "--verbose";
2474 } else if (lttng_opt_quiet) {
2475 verbosity = "--quiet";
2476 } else {
2477 verbosity = "";
2478 }
2479
2480 switch (consumer_data->type) {
2481 case LTTNG_CONSUMER_KERNEL:
2482 /*
2483 * Find out which consumerd to execute. We will first try the
2484 * 64-bit path, then the sessiond's installation directory, and
2485 * fallback on the 32-bit one,
2486 */
2487 DBG3("Looking for a kernel consumer at these locations:");
2488 DBG3(" 1) %s", config.consumerd64_bin_path.value ? : "NULL");
2489 DBG3(" 2) %s/%s", INSTALL_BIN_PATH, DEFAULT_CONSUMERD_FILE);
2490 DBG3(" 3) %s", config.consumerd32_bin_path.value ? : "NULL");
2491 if (stat(config.consumerd64_bin_path.value, &st) == 0) {
2492 DBG3("Found location #1");
2493 consumer_to_use = config.consumerd64_bin_path.value;
2494 } else if (stat(INSTALL_BIN_PATH "/" DEFAULT_CONSUMERD_FILE, &st) == 0) {
2495 DBG3("Found location #2");
2496 consumer_to_use = INSTALL_BIN_PATH "/" DEFAULT_CONSUMERD_FILE;
2497 } else if (stat(config.consumerd32_bin_path.value, &st) == 0) {
2498 DBG3("Found location #3");
2499 consumer_to_use = config.consumerd32_bin_path.value;
2500 } else {
2501 DBG("Could not find any valid consumerd executable");
2502 ret = -EINVAL;
2503 goto error;
2504 }
2505 DBG("Using kernel consumer at: %s", consumer_to_use);
2506 (void) execl(consumer_to_use,
2507 "lttng-consumerd", verbosity, "-k",
2508 "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path,
2509 "--consumerd-err-sock", consumer_data->err_unix_sock_path,
2510 "--group", config.tracing_group_name.value,
2511 NULL);
2512 break;
2513 case LTTNG_CONSUMER64_UST:
2514 {
2515 if (config.consumerd64_lib_dir.value) {
2516 char *tmp;
2517 size_t tmplen;
2518 char *tmpnew;
2519
2520 tmp = lttng_secure_getenv("LD_LIBRARY_PATH");
2521 if (!tmp) {
2522 tmp = "";
2523 }
2524 tmplen = strlen(config.consumerd64_lib_dir.value) + 1 /* : */ + strlen(tmp);
2525 tmpnew = zmalloc(tmplen + 1 /* \0 */);
2526 if (!tmpnew) {
2527 ret = -ENOMEM;
2528 goto error;
2529 }
2530 strcat(tmpnew, config.consumerd64_lib_dir.value);
2531 if (tmp[0] != '\0') {
2532 strcat(tmpnew, ":");
2533 strcat(tmpnew, tmp);
2534 }
2535 ret = setenv("LD_LIBRARY_PATH", tmpnew, 1);
2536 free(tmpnew);
2537 if (ret) {
2538 ret = -errno;
2539 goto error;
2540 }
2541 }
2542 DBG("Using 64-bit UST consumer at: %s", config.consumerd64_bin_path.value);
2543 (void) execl(config.consumerd64_bin_path.value, "lttng-consumerd", verbosity, "-u",
2544 "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path,
2545 "--consumerd-err-sock", consumer_data->err_unix_sock_path,
2546 "--group", config.tracing_group_name.value,
2547 NULL);
2548 break;
2549 }
2550 case LTTNG_CONSUMER32_UST:
2551 {
2552 if (config.consumerd32_lib_dir.value) {
2553 char *tmp;
2554 size_t tmplen;
2555 char *tmpnew;
2556
2557 tmp = lttng_secure_getenv("LD_LIBRARY_PATH");
2558 if (!tmp) {
2559 tmp = "";
2560 }
2561 tmplen = strlen(config.consumerd32_lib_dir.value) + 1 /* : */ + strlen(tmp);
2562 tmpnew = zmalloc(tmplen + 1 /* \0 */);
2563 if (!tmpnew) {
2564 ret = -ENOMEM;
2565 goto error;
2566 }
2567 strcat(tmpnew, config.consumerd32_lib_dir.value);
2568 if (tmp[0] != '\0') {
2569 strcat(tmpnew, ":");
2570 strcat(tmpnew, tmp);
2571 }
2572 ret = setenv("LD_LIBRARY_PATH", tmpnew, 1);
2573 free(tmpnew);
2574 if (ret) {
2575 ret = -errno;
2576 goto error;
2577 }
2578 }
2579 DBG("Using 32-bit UST consumer at: %s", config.consumerd32_bin_path.value);
2580 (void) execl(config.consumerd32_bin_path.value, "lttng-consumerd", verbosity, "-u",
2581 "--consumerd-cmd-sock", consumer_data->cmd_unix_sock_path,
2582 "--consumerd-err-sock", consumer_data->err_unix_sock_path,
2583 "--group", config.tracing_group_name.value,
2584 NULL);
2585 break;
2586 }
2587 default:
2588 ERR("unknown consumer type");
2589 errno = 0;
2590 }
2591 if (errno != 0) {
2592 PERROR("Consumer execl()");
2593 }
2594 /* Reaching this point, we got a failure on our execl(). */
2595 exit(EXIT_FAILURE);
2596 } else if (pid > 0) {
2597 ret = pid;
2598 } else {
2599 PERROR("start consumer fork");
2600 ret = -errno;
2601 }
2602 error:
2603 return ret;
2604 }
2605
2606 /*
2607 * Spawn the consumerd daemon and session daemon thread.
2608 */
2609 static int start_consumerd(struct consumer_data *consumer_data)
2610 {
2611 int ret;
2612
2613 /*
2614 * Set the listen() state on the socket since there is a possible race
2615 * between the exec() of the consumer daemon and this call if place in the
2616 * consumer thread. See bug #366 for more details.
2617 */
2618 ret = lttcomm_listen_unix_sock(consumer_data->err_sock);
2619 if (ret < 0) {
2620 goto error;
2621 }
2622
2623 pthread_mutex_lock(&consumer_data->pid_mutex);
2624 if (consumer_data->pid != 0) {
2625 pthread_mutex_unlock(&consumer_data->pid_mutex);
2626 goto end;
2627 }
2628
2629 ret = spawn_consumerd(consumer_data);
2630 if (ret < 0) {
2631 ERR("Spawning consumerd failed");
2632 pthread_mutex_unlock(&consumer_data->pid_mutex);
2633 goto error;
2634 }
2635
2636 /* Setting up the consumer_data pid */
2637 consumer_data->pid = ret;
2638 DBG2("Consumer pid %d", consumer_data->pid);
2639 pthread_mutex_unlock(&consumer_data->pid_mutex);
2640
2641 DBG2("Spawning consumer control thread");
2642 ret = spawn_consumer_thread(consumer_data);
2643 if (ret < 0) {
2644 ERR("Fatal error spawning consumer control thread");
2645 goto error;
2646 }
2647
2648 end:
2649 return 0;
2650
2651 error:
2652 /* Cleanup already created sockets on error. */
2653 if (consumer_data->err_sock >= 0) {
2654 int err;
2655
2656 err = close(consumer_data->err_sock);
2657 if (err < 0) {
2658 PERROR("close consumer data error socket");
2659 }
2660 }
2661 return ret;
2662 }
2663
2664 /*
2665 * Setup necessary data for kernel tracer action.
2666 */
2667 static int init_kernel_tracer(void)
2668 {
2669 int ret;
2670
2671 /* Modprobe lttng kernel modules */
2672 ret = modprobe_lttng_control();
2673 if (ret < 0) {
2674 goto error;
2675 }
2676
2677 /* Open debugfs lttng */
2678 kernel_tracer_fd = open(module_proc_lttng, O_RDWR);
2679 if (kernel_tracer_fd < 0) {
2680 DBG("Failed to open %s", module_proc_lttng);
2681 goto error_open;
2682 }
2683
2684 /* Validate kernel version */
2685 ret = kernel_validate_version(kernel_tracer_fd, &kernel_tracer_version,
2686 &kernel_tracer_abi_version);
2687 if (ret < 0) {
2688 goto error_version;
2689 }
2690
2691 ret = modprobe_lttng_data();
2692 if (ret < 0) {
2693 goto error_modules;
2694 }
2695
2696 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
2697 kernel_tracer_fd);
2698 if (ret < 0) {
2699 goto error_modules;
2700 }
2701
2702 if (ret < 1) {
2703 WARN("Kernel tracer does not support buffer monitoring. "
2704 "The monitoring timer of channels in the kernel domain "
2705 "will be set to 0 (disabled).");
2706 }
2707
2708 DBG("Kernel tracer fd %d", kernel_tracer_fd);
2709 return 0;
2710
2711 error_version:
2712 modprobe_remove_lttng_control();
2713 ret = close(kernel_tracer_fd);
2714 if (ret) {
2715 PERROR("close");
2716 }
2717 kernel_tracer_fd = -1;
2718 return LTTNG_ERR_KERN_VERSION;
2719
2720 error_modules:
2721 ret = close(kernel_tracer_fd);
2722 if (ret) {
2723 PERROR("close");
2724 }
2725
2726 error_open:
2727 modprobe_remove_lttng_control();
2728
2729 error:
2730 WARN("No kernel tracer available");
2731 kernel_tracer_fd = -1;
2732 if (!is_root) {
2733 return LTTNG_ERR_NEED_ROOT_SESSIOND;
2734 } else {
2735 return LTTNG_ERR_KERN_NA;
2736 }
2737 }
2738
2739
2740 /*
2741 * Copy consumer output from the tracing session to the domain session. The
2742 * function also applies the right modification on a per domain basis for the
2743 * trace files destination directory.
2744 *
2745 * Should *NOT* be called with RCU read-side lock held.
2746 */
2747 static int copy_session_consumer(int domain, struct ltt_session *session)
2748 {
2749 int ret;
2750 const char *dir_name;
2751 struct consumer_output *consumer;
2752
2753 assert(session);
2754 assert(session->consumer);
2755
2756 switch (domain) {
2757 case LTTNG_DOMAIN_KERNEL:
2758 DBG3("Copying tracing session consumer output in kernel session");
2759 /*
2760 * XXX: We should audit the session creation and what this function
2761 * does "extra" in order to avoid a destroy since this function is used
2762 * in the domain session creation (kernel and ust) only. Same for UST
2763 * domain.
2764 */
2765 if (session->kernel_session->consumer) {
2766 consumer_output_put(session->kernel_session->consumer);
2767 }
2768 session->kernel_session->consumer =
2769 consumer_copy_output(session->consumer);
2770 /* Ease our life a bit for the next part */
2771 consumer = session->kernel_session->consumer;
2772 dir_name = DEFAULT_KERNEL_TRACE_DIR;
2773 break;
2774 case LTTNG_DOMAIN_JUL:
2775 case LTTNG_DOMAIN_LOG4J:
2776 case LTTNG_DOMAIN_PYTHON:
2777 case LTTNG_DOMAIN_UST:
2778 DBG3("Copying tracing session consumer output in UST session");
2779 if (session->ust_session->consumer) {
2780 consumer_output_put(session->ust_session->consumer);
2781 }
2782 session->ust_session->consumer =
2783 consumer_copy_output(session->consumer);
2784 /* Ease our life a bit for the next part */
2785 consumer = session->ust_session->consumer;
2786 dir_name = DEFAULT_UST_TRACE_DIR;
2787 break;
2788 default:
2789 ret = LTTNG_ERR_UNKNOWN_DOMAIN;
2790 goto error;
2791 }
2792
2793 /* Append correct directory to subdir */
2794 strncat(consumer->subdir, dir_name,
2795 sizeof(consumer->subdir) - strlen(consumer->subdir) - 1);
2796 DBG3("Copy session consumer subdir %s", consumer->subdir);
2797
2798 ret = LTTNG_OK;
2799
2800 error:
2801 return ret;
2802 }
2803
2804 /*
2805 * Create an UST session and add it to the session ust list.
2806 *
2807 * Should *NOT* be called with RCU read-side lock held.
2808 */
2809 static int create_ust_session(struct ltt_session *session,
2810 struct lttng_domain *domain)
2811 {
2812 int ret;
2813 struct ltt_ust_session *lus = NULL;
2814
2815 assert(session);
2816 assert(domain);
2817 assert(session->consumer);
2818
2819 switch (domain->type) {
2820 case LTTNG_DOMAIN_JUL:
2821 case LTTNG_DOMAIN_LOG4J:
2822 case LTTNG_DOMAIN_PYTHON:
2823 case LTTNG_DOMAIN_UST:
2824 break;
2825 default:
2826 ERR("Unknown UST domain on create session %d", domain->type);
2827 ret = LTTNG_ERR_UNKNOWN_DOMAIN;
2828 goto error;
2829 }
2830
2831 DBG("Creating UST session");
2832
2833 lus = trace_ust_create_session(session->id);
2834 if (lus == NULL) {
2835 ret = LTTNG_ERR_UST_SESS_FAIL;
2836 goto error;
2837 }
2838
2839 lus->uid = session->uid;
2840 lus->gid = session->gid;
2841 lus->output_traces = session->output_traces;
2842 lus->snapshot_mode = session->snapshot_mode;
2843 lus->live_timer_interval = session->live_timer;
2844 session->ust_session = lus;
2845 if (session->shm_path[0]) {
2846 strncpy(lus->root_shm_path, session->shm_path,
2847 sizeof(lus->root_shm_path));
2848 lus->root_shm_path[sizeof(lus->root_shm_path) - 1] = '\0';
2849 strncpy(lus->shm_path, session->shm_path,
2850 sizeof(lus->shm_path));
2851 lus->shm_path[sizeof(lus->shm_path) - 1] = '\0';
2852 strncat(lus->shm_path, "/ust",
2853 sizeof(lus->shm_path) - strlen(lus->shm_path) - 1);
2854 }
2855 /* Copy session output to the newly created UST session */
2856 ret = copy_session_consumer(domain->type, session);
2857 if (ret != LTTNG_OK) {
2858 goto error;
2859 }
2860
2861 return LTTNG_OK;
2862
2863 error:
2864 free(lus);
2865 session->ust_session = NULL;
2866 return ret;
2867 }
2868
2869 /*
2870 * Create a kernel tracer session then create the default channel.
2871 */
2872 static int create_kernel_session(struct ltt_session *session)
2873 {
2874 int ret;
2875
2876 DBG("Creating kernel session");
2877
2878 ret = kernel_create_session(session, kernel_tracer_fd);
2879 if (ret < 0) {
2880 ret = LTTNG_ERR_KERN_SESS_FAIL;
2881 goto error;
2882 }
2883
2884 /* Code flow safety */
2885 assert(session->kernel_session);
2886
2887 /* Copy session output to the newly created Kernel session */
2888 ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session);
2889 if (ret != LTTNG_OK) {
2890 goto error;
2891 }
2892
2893 session->kernel_session->uid = session->uid;
2894 session->kernel_session->gid = session->gid;
2895 session->kernel_session->output_traces = session->output_traces;
2896 session->kernel_session->snapshot_mode = session->snapshot_mode;
2897
2898 return LTTNG_OK;
2899
2900 error:
2901 trace_kernel_destroy_session(session->kernel_session);
2902 session->kernel_session = NULL;
2903 return ret;
2904 }
2905
2906 /*
2907 * Count number of session permitted by uid/gid.
2908 */
2909 static unsigned int lttng_sessions_count(uid_t uid, gid_t gid)
2910 {
2911 unsigned int i = 0;
2912 struct ltt_session *session;
2913
2914 DBG("Counting number of available session for UID %d GID %d",
2915 uid, gid);
2916 cds_list_for_each_entry(session, &session_list_ptr->head, list) {
2917 /*
2918 * Only list the sessions the user can control.
2919 */
2920 if (!session_access_ok(session, uid, gid)) {
2921 continue;
2922 }
2923 i++;
2924 }
2925 return i;
2926 }
2927
2928 /*
2929 * Check if the current kernel tracer supports the session rotation feature.
2930 * Return 1 if it does, 0 otherwise.
2931 */
2932 static int check_rotate_compatible(void)
2933 {
2934 int ret = 1;
2935
2936 if (kernel_tracer_version.major != 2 || kernel_tracer_version.minor < 11) {
2937 DBG("Kernel tracer version is not compatible with the rotation feature");
2938 ret = 0;
2939 }
2940
2941 return ret;
2942 }
2943
2944 /*
2945 * Process the command requested by the lttng client within the command
2946 * context structure. This function make sure that the return structure (llm)
2947 * is set and ready for transmission before returning.
2948 *
2949 * Return any error encountered or 0 for success.
2950 *
2951 * "sock" is only used for special-case var. len data.
2952 *
2953 * Should *NOT* be called with RCU read-side lock held.
2954 */
2955 static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
2956 int *sock_error)
2957 {
2958 int ret = LTTNG_OK;
2959 int need_tracing_session = 1;
2960 int need_domain;
2961
2962 DBG("Processing client command %d", cmd_ctx->lsm->cmd_type);
2963
2964 assert(!rcu_read_ongoing());
2965
2966 *sock_error = 0;
2967
2968 switch (cmd_ctx->lsm->cmd_type) {
2969 case LTTNG_CREATE_SESSION:
2970 case LTTNG_CREATE_SESSION_SNAPSHOT:
2971 case LTTNG_CREATE_SESSION_LIVE:
2972 case LTTNG_DESTROY_SESSION:
2973 case LTTNG_LIST_SESSIONS:
2974 case LTTNG_LIST_DOMAINS:
2975 case LTTNG_START_TRACE:
2976 case LTTNG_STOP_TRACE:
2977 case LTTNG_DATA_PENDING:
2978 case LTTNG_SNAPSHOT_ADD_OUTPUT:
2979 case LTTNG_SNAPSHOT_DEL_OUTPUT:
2980 case LTTNG_SNAPSHOT_LIST_OUTPUT:
2981 case LTTNG_SNAPSHOT_RECORD:
2982 case LTTNG_SAVE_SESSION:
2983 case LTTNG_SET_SESSION_SHM_PATH:
2984 case LTTNG_REGENERATE_METADATA:
2985 case LTTNG_REGENERATE_STATEDUMP:
2986 case LTTNG_REGISTER_TRIGGER:
2987 case LTTNG_UNREGISTER_TRIGGER:
2988 case LTTNG_ROTATE_SESSION:
2989 case LTTNG_ROTATION_GET_INFO:
2990 case LTTNG_SESSION_GET_CURRENT_OUTPUT:
2991 case LTTNG_ROTATION_SET_SCHEDULE:
2992 case LTTNG_ROTATION_SCHEDULE_GET_TIMER_PERIOD:
2993 case LTTNG_ROTATION_SCHEDULE_GET_SIZE:
2994 need_domain = 0;
2995 break;
2996 default:
2997 need_domain = 1;
2998 }
2999
3000 if (config.no_kernel && need_domain
3001 && cmd_ctx->lsm->domain.type == LTTNG_DOMAIN_KERNEL) {
3002 if (!is_root) {
3003 ret = LTTNG_ERR_NEED_ROOT_SESSIOND;
3004 } else {
3005 ret = LTTNG_ERR_KERN_NA;
3006 }
3007 goto error;
3008 }
3009
3010 /* Deny register consumer if we already have a spawned consumer. */
3011 if (cmd_ctx->lsm->cmd_type == LTTNG_REGISTER_CONSUMER) {
3012 pthread_mutex_lock(&kconsumer_data.pid_mutex);
3013 if (kconsumer_data.pid > 0) {
3014 ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
3015 pthread_mutex_unlock(&kconsumer_data.pid_mutex);
3016 goto error;
3017 }
3018 pthread_mutex_unlock(&kconsumer_data.pid_mutex);
3019 }
3020
3021 /*
3022 * Check for command that don't needs to allocate a returned payload. We do
3023 * this here so we don't have to make the call for no payload at each
3024 * command.
3025 */
3026 switch(cmd_ctx->lsm->cmd_type) {
3027 case LTTNG_LIST_SESSIONS:
3028 case LTTNG_LIST_TRACEPOINTS:
3029 case LTTNG_LIST_TRACEPOINT_FIELDS:
3030 case LTTNG_LIST_DOMAINS:
3031 case LTTNG_LIST_CHANNELS:
3032 case LTTNG_LIST_EVENTS:
3033 case LTTNG_LIST_SYSCALLS:
3034 case LTTNG_LIST_TRACKER_PIDS:
3035 case LTTNG_DATA_PENDING:
3036 case LTTNG_ROTATE_SESSION:
3037 case LTTNG_ROTATION_GET_INFO:
3038 case LTTNG_ROTATION_SCHEDULE_GET_TIMER_PERIOD:
3039 case LTTNG_ROTATION_SCHEDULE_GET_SIZE:
3040 break;
3041 default:
3042 /* Setup lttng message with no payload */
3043 ret = setup_lttng_msg_no_cmd_header(cmd_ctx, NULL, 0);
3044 if (ret < 0) {
3045 /* This label does not try to unlock the session */
3046 goto init_setup_error;
3047 }
3048 }
3049
3050 /* Commands that DO NOT need a session. */
3051 switch (cmd_ctx->lsm->cmd_type) {
3052 case LTTNG_CREATE_SESSION:
3053 case LTTNG_CREATE_SESSION_SNAPSHOT:
3054 case LTTNG_CREATE_SESSION_LIVE:
3055 case LTTNG_LIST_SESSIONS:
3056 case LTTNG_LIST_TRACEPOINTS:
3057 case LTTNG_LIST_SYSCALLS:
3058 case LTTNG_LIST_TRACEPOINT_FIELDS:
3059 case LTTNG_SAVE_SESSION:
3060 case LTTNG_REGISTER_TRIGGER:
3061 case LTTNG_UNREGISTER_TRIGGER:
3062 need_tracing_session = 0;
3063 break;
3064 default:
3065 DBG("Getting session %s by name", cmd_ctx->lsm->session.name);
3066 /*
3067 * We keep the session list lock across _all_ commands
3068 * for now, because the per-session lock does not
3069 * handle teardown properly.
3070 */
3071 session_lock_list();
3072 cmd_ctx->session = session_find_by_name(cmd_ctx->lsm->session.name);
3073 if (cmd_ctx->session == NULL) {
3074 ret = LTTNG_ERR_SESS_NOT_FOUND;
3075 goto error;
3076 } else {
3077 /* Acquire lock for the session */
3078 session_lock(cmd_ctx->session);
3079 }
3080 break;
3081 }
3082
3083 /*
3084 * Commands that need a valid session but should NOT create one if none
3085 * exists. Instead of creating one and destroying it when the command is
3086 * handled, process that right before so we save some round trip in useless
3087 * code path.
3088 */
3089 switch (cmd_ctx->lsm->cmd_type) {
3090 case LTTNG_DISABLE_CHANNEL:
3091 case LTTNG_DISABLE_EVENT:
3092 switch (cmd_ctx->lsm->domain.type) {
3093 case LTTNG_DOMAIN_KERNEL:
3094 if (!cmd_ctx->session->kernel_session) {
3095 ret = LTTNG_ERR_NO_CHANNEL;
3096 goto error;
3097 }
3098 break;
3099 case LTTNG_DOMAIN_JUL:
3100 case LTTNG_DOMAIN_LOG4J:
3101 case LTTNG_DOMAIN_PYTHON:
3102 case LTTNG_DOMAIN_UST:
3103 if (!cmd_ctx->session->ust_session) {
3104 ret = LTTNG_ERR_NO_CHANNEL;
3105 goto error;
3106 }
3107 break;
3108 default:
3109 ret = LTTNG_ERR_UNKNOWN_DOMAIN;
3110 goto error;
3111 }
3112 default:
3113 break;
3114 }
3115
3116 if (!need_domain) {
3117 goto skip_domain;
3118 }
3119
3120 /*
3121 * Check domain type for specific "pre-action".
3122 */
3123 switch (cmd_ctx->lsm->domain.type) {
3124 case LTTNG_DOMAIN_KERNEL:
3125 if (!is_root) {
3126 ret = LTTNG_ERR_NEED_ROOT_SESSIOND;
3127 goto error;
3128 }
3129
3130 /* Kernel tracer check */
3131 if (kernel_tracer_fd == -1) {
3132 /* Basically, load kernel tracer modules */
3133 ret = init_kernel_tracer();
3134 if (ret != 0) {
3135 goto error;
3136 }
3137 }
3138
3139 /* Consumer is in an ERROR state. Report back to client */
3140 if (uatomic_read(&kernel_consumerd_state) == CONSUMER_ERROR) {
3141 ret = LTTNG_ERR_NO_KERNCONSUMERD;
3142 goto error;
3143 }
3144
3145 /* Need a session for kernel command */
3146 if (need_tracing_session) {
3147 if (cmd_ctx->session->kernel_session == NULL) {
3148 ret = create_kernel_session(cmd_ctx->session);
3149 if (ret < 0) {
3150 ret = LTTNG_ERR_KERN_SESS_FAIL;
3151 goto error;
3152 }
3153 }
3154
3155 /* Start the kernel consumer daemon */
3156 pthread_mutex_lock(&kconsumer_data.pid_mutex);
3157 if (kconsumer_data.pid == 0 &&
3158 cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) {
3159 pthread_mutex_unlock(&kconsumer_data.pid_mutex);
3160 ret = start_consumerd(&kconsumer_data);
3161 if (ret < 0) {
3162 ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
3163 goto error;
3164 }
3165 uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED);
3166 } else {
3167 pthread_mutex_unlock(&kconsumer_data.pid_mutex);
3168 }
3169
3170 /*
3171 * The consumer was just spawned so we need to add the socket to
3172 * the consumer output of the session if exist.
3173 */
3174 ret = consumer_create_socket(&kconsumer_data,
3175 cmd_ctx->session->kernel_session->consumer);
3176 if (ret < 0) {
3177 goto error;
3178 }
3179 }
3180
3181 break;
3182 case LTTNG_DOMAIN_JUL:
3183 case LTTNG_DOMAIN_LOG4J:
3184 case LTTNG_DOMAIN_PYTHON:
3185 case LTTNG_DOMAIN_UST:
3186 {
3187 if (!ust_app_supported()) {
3188 ret = LTTNG_ERR_NO_UST;
3189 goto error;
3190 }
3191 /* Consumer is in an ERROR state. Report back to client */
3192 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
3193 ret = LTTNG_ERR_NO_USTCONSUMERD;
3194 goto error;
3195 }
3196
3197 if (need_tracing_session) {
3198 /* Create UST session if none exist. */
3199 if (cmd_ctx->session->ust_session == NULL) {
3200 ret = create_ust_session(cmd_ctx->session,
3201 &cmd_ctx->lsm->domain);
3202 if (ret != LTTNG_OK) {
3203 goto error;
3204 }
3205 }
3206
3207 /* Start the UST consumer daemons */
3208 /* 64-bit */
3209 pthread_mutex_lock(&ustconsumer64_data.pid_mutex);
3210 if (config.consumerd64_bin_path.value &&
3211 ustconsumer64_data.pid == 0 &&
3212 cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) {
3213 pthread_mutex_unlock(&ustconsumer64_data.pid_mutex);
3214 ret = start_consumerd(&ustconsumer64_data);
3215 if (ret < 0) {
3216 ret = LTTNG_ERR_UST_CONSUMER64_FAIL;
3217 uatomic_set(&ust_consumerd64_fd, -EINVAL);
3218 goto error;
3219 }
3220
3221 uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock);
3222 uatomic_set(&ust_consumerd_state, CONSUMER_STARTED);
3223 } else {
3224 pthread_mutex_unlock(&ustconsumer64_data.pid_mutex);
3225 }
3226
3227