Build fix: missing stdio.h include in signal-helper.hpp
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.cpp
CommitLineData
5d1b0219 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5d1b0219 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
5d1b0219 7 *
5d1b0219
JG
8 */
9
10#include <stddef.h>
11#include <stdlib.h>
12#include <urcu.h>
c9e313bc
SM
13#include <common/futex.hpp>
14#include <common/macros.hpp>
5d1b0219 15
c9e313bc
SM
16#include "dispatch.hpp"
17#include "ust-app.hpp"
18#include "testpoint.hpp"
19#include "fd-limit.hpp"
20#include "health-sessiond.hpp"
21#include "lttng-sessiond.hpp"
22#include "thread.hpp"
5d1b0219 23
f1494934 24namespace {
5d1b0219
JG
25struct thread_notifiers {
26 struct ust_cmd_queue *ust_cmd_queue;
27 int apps_cmd_pipe_write_fd;
28 int apps_cmd_notify_pipe_write_fd;
29 int dispatch_thread_exit;
30};
f1494934 31} /* namespace */
5d1b0219
JG
32
33/*
34 * For each tracing session, update newly registered apps. The session list
35 * lock MUST be acquired before calling this.
36 */
37static void update_ust_app(int app_sock)
38{
39 struct ltt_session *sess, *stmp;
40 const struct ltt_session_list *session_list = session_get_list();
70670472 41 struct ust_app *app;
5d1b0219
JG
42
43 /* Consumer is in an ERROR state. Stop any application update. */
412d7227 44 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
5d1b0219
JG
45 /* Stop the update process since the consumer is dead. */
46 return;
47 }
48
70670472 49 rcu_read_lock();
a0377dfe 50 LTTNG_ASSERT(app_sock >= 0);
70670472
JR
51 app = ust_app_find_by_sock(app_sock);
52 if (app == NULL) {
53 /*
54 * Application can be unregistered before so
55 * this is possible hence simply stopping the
56 * update.
57 */
58 DBG3("UST app update failed to find app sock %d",
59 app_sock);
60 goto unlock_rcu;
61 }
62
63 /* Update all event notifiers for the app. */
64 ust_app_global_update_event_notifier_rules(app);
65
5d1b0219
JG
66 /* For all tracing session(s) */
67 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
5d1b0219
JG
68 if (!session_get(sess)) {
69 continue;
70 }
71 session_lock(sess);
8f4456a8
JR
72 if (!sess->active || !sess->ust_session ||
73 !sess->ust_session->active) {
5d1b0219
JG
74 goto unlock_session;
75 }
76
5d1b0219 77 ust_app_global_update(sess->ust_session, app);
5d1b0219
JG
78 unlock_session:
79 session_unlock(sess);
80 session_put(sess);
81 }
70670472
JR
82
83unlock_rcu:
84 rcu_read_unlock();
5d1b0219
JG
85}
86
87/*
88 * Sanitize the wait queue of the dispatch registration thread meaning removing
89 * invalid nodes from it. This is to avoid memory leaks for the case the UST
90 * notify socket is never received.
91 */
92static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
93{
94 int ret, nb_fd = 0, i;
95 unsigned int fd_added = 0;
96 struct lttng_poll_event events;
97 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
98
a0377dfe 99 LTTNG_ASSERT(wait_queue);
5d1b0219
JG
100
101 lttng_poll_init(&events);
102
103 /* Just skip everything for an empty queue. */
104 if (!wait_queue->count) {
105 goto end;
106 }
107
108 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
109 if (ret < 0) {
110 goto error_create;
111 }
112
113 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
114 &wait_queue->head, head) {
a0377dfe 115 LTTNG_ASSERT(wait_node->app);
1524f98c 116 ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN);
5d1b0219
JG
117 if (ret < 0) {
118 goto error;
119 }
120
121 fd_added = 1;
122 }
123
124 if (!fd_added) {
125 goto end;
126 }
127
128 /*
129 * Poll but don't block so we can quickly identify the faulty events and
130 * clean them afterwards from the wait queue.
131 */
132 ret = lttng_poll_wait(&events, 0);
133 if (ret < 0) {
134 goto error;
135 }
136 nb_fd = ret;
137
138 for (i = 0; i < nb_fd; i++) {
139 /* Get faulty FD. */
140 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
141 int pollfd = LTTNG_POLL_GETFD(&events, i);
142
5d1b0219
JG
143 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
144 &wait_queue->head, head) {
145 if (pollfd == wait_node->app->sock &&
146 (revents & (LPOLLHUP | LPOLLERR))) {
147 cds_list_del(&wait_node->head);
148 wait_queue->count--;
149 ust_app_destroy(wait_node->app);
150 free(wait_node);
151 /*
152 * Silence warning of use-after-free in
153 * cds_list_for_each_entry_safe which uses
154 * __typeof__(*wait_node).
155 */
156 wait_node = NULL;
157 break;
158 } else {
159 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
160 goto error;
161 }
162 }
163 }
164
165 if (nb_fd > 0) {
166 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
167 }
168
169end:
170 lttng_poll_clean(&events);
171 return;
172
173error:
174 lttng_poll_clean(&events);
175error_create:
176 ERR("Unable to sanitize wait queue");
177 return;
178}
179
180/*
181 * Send a socket to a thread This is called from the dispatch UST registration
182 * thread once all sockets are set for the application.
183 *
184 * The sock value can be invalid, we don't really care, the thread will handle
185 * it and make the necessary cleanup if so.
186 *
187 * On success, return 0 else a negative value being the errno message of the
188 * write().
189 */
190static int send_socket_to_thread(int fd, int sock)
191{
192 ssize_t ret;
193
194 /*
195 * It's possible that the FD is set as invalid with -1 concurrently just
196 * before calling this function being a shutdown state of the thread.
197 */
198 if (fd < 0) {
199 ret = -EBADF;
200 goto error;
201 }
202
203 ret = lttng_write(fd, &sock, sizeof(sock));
204 if (ret < sizeof(sock)) {
205 PERROR("write apps pipe %d", fd);
206 if (ret < 0) {
207 ret = -errno;
208 }
209 goto error;
210 }
211
212 /* All good. Don't send back the write positive ret value. */
213 ret = 0;
214error:
215 return (int) ret;
216}
217
218static void cleanup_ust_dispatch_thread(void *data)
219{
220 free(data);
221}
222
223/*
224 * Dispatch request from the registration threads to the application
225 * communication thread.
226 */
227static void *thread_dispatch_ust_registration(void *data)
228{
229 int ret, err = -1;
230 struct cds_wfcq_node *node;
231 struct ust_command *ust_cmd = NULL;
232 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
233 struct ust_reg_wait_queue wait_queue = {
234 .count = 0,
1c9a0b0e 235 .head = {},
5d1b0219 236 };
7966af57 237 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
238
239 rcu_register_thread();
240
412d7227
SM
241 health_register(the_health_sessiond,
242 HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
5d1b0219
JG
243
244 if (testpoint(sessiond_thread_app_reg_dispatch)) {
245 goto error_testpoint;
246 }
247
248 health_code_update();
249
250 CDS_INIT_LIST_HEAD(&wait_queue.head);
251
252 DBG("[thread] Dispatch UST command started");
253
254 for (;;) {
255 health_code_update();
256
257 /* Atomically prepare the queue futex */
258 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
259
260 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
261 break;
262 }
263
264 do {
265 struct ust_app *app = NULL;
266 ust_cmd = NULL;
267
268 /*
269 * Make sure we don't have node(s) that have hung up before receiving
270 * the notify socket. This is to clean the list in order to avoid
271 * memory leaks from notify socket that are never seen.
272 */
273 sanitize_wait_queue(&wait_queue);
274
275 health_code_update();
276 /* Dequeue command for registration */
277 node = cds_wfcq_dequeue_blocking(
278 &notifiers->ust_cmd_queue->head,
279 &notifiers->ust_cmd_queue->tail);
280 if (node == NULL) {
281 DBG("Woken up but nothing in the UST command queue");
282 /* Continue thread execution */
283 break;
284 }
285
0114db0e 286 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
287
288 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
289 " gid:%d sock:%d name:%s (version %d.%d)",
290 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
291 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
292 ust_cmd->sock, ust_cmd->reg_msg.name,
293 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
294
b623cb6a 295 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
64803277 296 wait_node = zmalloc<ust_reg_wait_node>();
5d1b0219
JG
297 if (!wait_node) {
298 PERROR("zmalloc wait_node dispatch");
299 ret = close(ust_cmd->sock);
300 if (ret < 0) {
301 PERROR("close ust sock dispatch %d", ust_cmd->sock);
302 }
303 lttng_fd_put(LTTNG_FD_APPS, 1);
304 free(ust_cmd);
58f835e1 305 ust_cmd = NULL;
5d1b0219
JG
306 goto error;
307 }
308 CDS_INIT_LIST_HEAD(&wait_node->head);
309
310 /* Create application object if socket is CMD. */
311 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
312 ust_cmd->sock);
313 if (!wait_node->app) {
314 ret = close(ust_cmd->sock);
315 if (ret < 0) {
316 PERROR("close ust sock dispatch %d", ust_cmd->sock);
317 }
318 lttng_fd_put(LTTNG_FD_APPS, 1);
319 free(wait_node);
58f835e1 320 wait_node = NULL;
5d1b0219 321 free(ust_cmd);
58f835e1 322 ust_cmd = NULL;
5d1b0219
JG
323 continue;
324 }
325 /*
326 * Add application to the wait queue so we can set the notify
327 * socket before putting this object in the global ht.
328 */
329 cds_list_add(&wait_node->head, &wait_queue.head);
330 wait_queue.count++;
331
332 free(ust_cmd);
58f835e1 333 ust_cmd = NULL;
5d1b0219
JG
334 /*
335 * We have to continue here since we don't have the notify
336 * socket and the application MUST be added to the hash table
337 * only at that moment.
338 */
339 continue;
340 } else {
341 /*
342 * Look for the application in the local wait queue and set the
343 * notify socket if found.
344 */
345 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
346 &wait_queue.head, head) {
347 health_code_update();
348 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
349 wait_node->app->notify_sock = ust_cmd->sock;
350 cds_list_del(&wait_node->head);
351 wait_queue.count--;
352 app = wait_node->app;
353 free(wait_node);
58f835e1 354 wait_node = NULL;
5d1b0219
JG
355 DBG3("UST app notify socket %d is set", ust_cmd->sock);
356 break;
357 }
358 }
359
360 /*
361 * With no application at this stage the received socket is
362 * basically useless so close it before we free the cmd data
363 * structure for good.
364 */
365 if (!app) {
366 ret = close(ust_cmd->sock);
367 if (ret < 0) {
368 PERROR("close ust sock dispatch %d", ust_cmd->sock);
369 }
370 lttng_fd_put(LTTNG_FD_APPS, 1);
371 }
372 free(ust_cmd);
58f835e1 373 ust_cmd = NULL;
5d1b0219
JG
374 }
375
376 if (app) {
377 /*
378 * @session_lock_list
379 *
380 * Lock the global session list so from the register up to the
381 * registration done message, no thread can see the application
382 * and change its state.
383 */
384 session_lock_list();
385 rcu_read_lock();
386
387 /*
388 * Add application to the global hash table. This needs to be
389 * done before the update to the UST registry can locate the
390 * application.
391 */
392 ust_app_add(app);
393
394 /* Set app version. This call will print an error if needed. */
395 (void) ust_app_version(app);
396
da873412
JR
397 (void) ust_app_setup_event_notifier_group(app);
398
5d1b0219
JG
399 /* Send notify socket through the notify pipe. */
400 ret = send_socket_to_thread(
401 notifiers->apps_cmd_notify_pipe_write_fd,
402 app->notify_sock);
403 if (ret < 0) {
404 rcu_read_unlock();
405 session_unlock_list();
406 /*
407 * No notify thread, stop the UST tracing. However, this is
408 * not an internal error of the this thread thus setting
409 * the health error code to a normal exit.
410 */
411 err = 0;
412 goto error;
413 }
414
415 /*
416 * Update newly registered application with the tracing
417 * registry info already enabled information.
418 */
419 update_ust_app(app->sock);
420
421 /*
422 * Don't care about return value. Let the manage apps threads
423 * handle app unregistration upon socket close.
424 */
425 (void) ust_app_register_done(app);
426
427 /*
428 * Even if the application socket has been closed, send the app
429 * to the thread and unregistration will take place at that
430 * place.
431 */
432 ret = send_socket_to_thread(
433 notifiers->apps_cmd_pipe_write_fd,
434 app->sock);
435 if (ret < 0) {
436 rcu_read_unlock();
437 session_unlock_list();
438 /*
439 * No apps. thread, stop the UST tracing. However, this is
440 * not an internal error of the this thread thus setting
441 * the health error code to a normal exit.
442 */
443 err = 0;
444 goto error;
445 }
446
447 rcu_read_unlock();
448 session_unlock_list();
449 }
450 } while (node != NULL);
451
452 health_poll_entry();
453 /* Futex wait on queue. Blocking call on futex() */
454 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
455 health_poll_exit();
456 }
457 /* Normal exit, no error */
458 err = 0;
459
460error:
461 /* Clean up wait queue. */
462 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
463 &wait_queue.head, head) {
464 cds_list_del(&wait_node->head);
465 wait_queue.count--;
466 free(wait_node);
467 }
468
469 /* Empty command queue. */
470 for (;;) {
471 /* Dequeue command for registration */
472 node = cds_wfcq_dequeue_blocking(
473 &notifiers->ust_cmd_queue->head,
474 &notifiers->ust_cmd_queue->tail);
475 if (node == NULL) {
476 break;
477 }
0114db0e 478 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
479 ret = close(ust_cmd->sock);
480 if (ret < 0) {
481 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
482 }
483 lttng_fd_put(LTTNG_FD_APPS, 1);
484 free(ust_cmd);
485 }
486
487error_testpoint:
488 DBG("Dispatch thread dying");
489 if (err) {
490 health_error();
491 ERR("Health error occurred in %s", __func__);
492 }
412d7227 493 health_unregister(the_health_sessiond);
5d1b0219
JG
494 rcu_unregister_thread();
495 return NULL;
496}
497
498static bool shutdown_ust_dispatch_thread(void *data)
499{
7966af57 500 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
501
502 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
503 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
504 return true;
505}
506
507bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
508 int apps_cmd_pipe_write_fd,
509 int apps_cmd_notify_pipe_write_fd)
510{
511 struct lttng_thread *thread;
512 struct thread_notifiers *notifiers;
513
64803277 514 notifiers = zmalloc<thread_notifiers>();
5d1b0219
JG
515 if (!notifiers) {
516 goto error;
517 }
518 notifiers->ust_cmd_queue = cmd_queue;
519 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
520 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
521
522 thread = lttng_thread_create("UST registration dispatch",
523 thread_dispatch_ust_registration,
524 shutdown_ust_dispatch_thread,
525 cleanup_ust_dispatch_thread,
526 notifiers);
527 if (!thread) {
528 goto error;
529 }
530 lttng_thread_put(thread);
531 return true;
532error:
533 free(notifiers);
534 return false;
535}
This page took 0.070037 seconds and 4 git commands to generate.