Cleanup: remove ignored flags from poll events bitmasks
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include <stddef.h>
11 #include <stdlib.h>
12 #include <urcu.h>
13 #include <common/futex.hpp>
14 #include <common/macros.hpp>
15
16 #include "dispatch.hpp"
17 #include "ust-app.hpp"
18 #include "testpoint.hpp"
19 #include "fd-limit.hpp"
20 #include "health-sessiond.hpp"
21 #include "lttng-sessiond.hpp"
22 #include "thread.hpp"
23
24 namespace {
25 struct thread_notifiers {
26 struct ust_cmd_queue *ust_cmd_queue;
27 int apps_cmd_pipe_write_fd;
28 int apps_cmd_notify_pipe_write_fd;
29 int dispatch_thread_exit;
30 };
31 } /* namespace */
32
33 /*
34 * For each tracing session, update newly registered apps. The session list
35 * lock MUST be acquired before calling this.
36 */
37 static void update_ust_app(int app_sock)
38 {
39 struct ltt_session *sess, *stmp;
40 const struct ltt_session_list *session_list = session_get_list();
41 struct ust_app *app;
42
43 /* Consumer is in an ERROR state. Stop any application update. */
44 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
45 /* Stop the update process since the consumer is dead. */
46 return;
47 }
48
49 rcu_read_lock();
50 LTTNG_ASSERT(app_sock >= 0);
51 app = ust_app_find_by_sock(app_sock);
52 if (app == NULL) {
53 /*
54 * Application can be unregistered before so
55 * this is possible hence simply stopping the
56 * update.
57 */
58 DBG3("UST app update failed to find app sock %d",
59 app_sock);
60 goto unlock_rcu;
61 }
62
63 /* Update all event notifiers for the app. */
64 ust_app_global_update_event_notifier_rules(app);
65
66 /* For all tracing session(s) */
67 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
68 if (!session_get(sess)) {
69 continue;
70 }
71 session_lock(sess);
72 if (!sess->active || !sess->ust_session ||
73 !sess->ust_session->active) {
74 goto unlock_session;
75 }
76
77 ust_app_global_update(sess->ust_session, app);
78 unlock_session:
79 session_unlock(sess);
80 session_put(sess);
81 }
82
83 unlock_rcu:
84 rcu_read_unlock();
85 }
86
87 /*
88 * Sanitize the wait queue of the dispatch registration thread meaning removing
89 * invalid nodes from it. This is to avoid memory leaks for the case the UST
90 * notify socket is never received.
91 */
92 static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
93 {
94 int ret, nb_fd = 0, i;
95 unsigned int fd_added = 0;
96 struct lttng_poll_event events;
97 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
98
99 LTTNG_ASSERT(wait_queue);
100
101 lttng_poll_init(&events);
102
103 /* Just skip everything for an empty queue. */
104 if (!wait_queue->count) {
105 goto end;
106 }
107
108 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
109 if (ret < 0) {
110 goto error_create;
111 }
112
113 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
114 &wait_queue->head, head) {
115 LTTNG_ASSERT(wait_node->app);
116 ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN);
117 if (ret < 0) {
118 goto error;
119 }
120
121 fd_added = 1;
122 }
123
124 if (!fd_added) {
125 goto end;
126 }
127
128 /*
129 * Poll but don't block so we can quickly identify the faulty events and
130 * clean them afterwards from the wait queue.
131 */
132 ret = lttng_poll_wait(&events, 0);
133 if (ret < 0) {
134 goto error;
135 }
136 nb_fd = ret;
137
138 for (i = 0; i < nb_fd; i++) {
139 /* Get faulty FD. */
140 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
141 int pollfd = LTTNG_POLL_GETFD(&events, i);
142
143 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
144 &wait_queue->head, head) {
145 if (pollfd == wait_node->app->sock &&
146 (revents & (LPOLLHUP | LPOLLERR))) {
147 cds_list_del(&wait_node->head);
148 wait_queue->count--;
149 ust_app_destroy(wait_node->app);
150 free(wait_node);
151 /*
152 * Silence warning of use-after-free in
153 * cds_list_for_each_entry_safe which uses
154 * __typeof__(*wait_node).
155 */
156 wait_node = NULL;
157 break;
158 } else {
159 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
160 goto error;
161 }
162 }
163 }
164
165 if (nb_fd > 0) {
166 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
167 }
168
169 end:
170 lttng_poll_clean(&events);
171 return;
172
173 error:
174 lttng_poll_clean(&events);
175 error_create:
176 ERR("Unable to sanitize wait queue");
177 return;
178 }
179
180 /*
181 * Send a socket to a thread This is called from the dispatch UST registration
182 * thread once all sockets are set for the application.
183 *
184 * The sock value can be invalid, we don't really care, the thread will handle
185 * it and make the necessary cleanup if so.
186 *
187 * On success, return 0 else a negative value being the errno message of the
188 * write().
189 */
190 static int send_socket_to_thread(int fd, int sock)
191 {
192 ssize_t ret;
193
194 /*
195 * It's possible that the FD is set as invalid with -1 concurrently just
196 * before calling this function being a shutdown state of the thread.
197 */
198 if (fd < 0) {
199 ret = -EBADF;
200 goto error;
201 }
202
203 ret = lttng_write(fd, &sock, sizeof(sock));
204 if (ret < sizeof(sock)) {
205 PERROR("write apps pipe %d", fd);
206 if (ret < 0) {
207 ret = -errno;
208 }
209 goto error;
210 }
211
212 /* All good. Don't send back the write positive ret value. */
213 ret = 0;
214 error:
215 return (int) ret;
216 }
217
218 static void cleanup_ust_dispatch_thread(void *data)
219 {
220 free(data);
221 }
222
223 /*
224 * Dispatch request from the registration threads to the application
225 * communication thread.
226 */
227 static void *thread_dispatch_ust_registration(void *data)
228 {
229 int ret, err = -1;
230 struct cds_wfcq_node *node;
231 struct ust_command *ust_cmd = NULL;
232 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
233 struct ust_reg_wait_queue wait_queue = {
234 .count = 0,
235 .head = {},
236 };
237 struct thread_notifiers *notifiers = (thread_notifiers *) data;
238
239 rcu_register_thread();
240
241 health_register(the_health_sessiond,
242 HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
243
244 if (testpoint(sessiond_thread_app_reg_dispatch)) {
245 goto error_testpoint;
246 }
247
248 health_code_update();
249
250 CDS_INIT_LIST_HEAD(&wait_queue.head);
251
252 DBG("[thread] Dispatch UST command started");
253
254 for (;;) {
255 health_code_update();
256
257 /* Atomically prepare the queue futex */
258 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
259
260 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
261 break;
262 }
263
264 do {
265 struct ust_app *app = NULL;
266 ust_cmd = NULL;
267
268 /*
269 * Make sure we don't have node(s) that have hung up before receiving
270 * the notify socket. This is to clean the list in order to avoid
271 * memory leaks from notify socket that are never seen.
272 */
273 sanitize_wait_queue(&wait_queue);
274
275 health_code_update();
276 /* Dequeue command for registration */
277 node = cds_wfcq_dequeue_blocking(
278 &notifiers->ust_cmd_queue->head,
279 &notifiers->ust_cmd_queue->tail);
280 if (node == NULL) {
281 DBG("Woken up but nothing in the UST command queue");
282 /* Continue thread execution */
283 break;
284 }
285
286 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
287
288 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
289 " gid:%d sock:%d name:%s (version %d.%d)",
290 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
291 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
292 ust_cmd->sock, ust_cmd->reg_msg.name,
293 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
294
295 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
296 wait_node = zmalloc<ust_reg_wait_node>();
297 if (!wait_node) {
298 PERROR("zmalloc wait_node dispatch");
299 ret = close(ust_cmd->sock);
300 if (ret < 0) {
301 PERROR("close ust sock dispatch %d", ust_cmd->sock);
302 }
303 lttng_fd_put(LTTNG_FD_APPS, 1);
304 free(ust_cmd);
305 ust_cmd = NULL;
306 goto error;
307 }
308 CDS_INIT_LIST_HEAD(&wait_node->head);
309
310 /* Create application object if socket is CMD. */
311 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
312 ust_cmd->sock);
313 if (!wait_node->app) {
314 ret = close(ust_cmd->sock);
315 if (ret < 0) {
316 PERROR("close ust sock dispatch %d", ust_cmd->sock);
317 }
318 lttng_fd_put(LTTNG_FD_APPS, 1);
319 free(wait_node);
320 wait_node = NULL;
321 free(ust_cmd);
322 ust_cmd = NULL;
323 continue;
324 }
325 /*
326 * Add application to the wait queue so we can set the notify
327 * socket before putting this object in the global ht.
328 */
329 cds_list_add(&wait_node->head, &wait_queue.head);
330 wait_queue.count++;
331
332 free(ust_cmd);
333 ust_cmd = NULL;
334 /*
335 * We have to continue here since we don't have the notify
336 * socket and the application MUST be added to the hash table
337 * only at that moment.
338 */
339 continue;
340 } else {
341 /*
342 * Look for the application in the local wait queue and set the
343 * notify socket if found.
344 */
345 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
346 &wait_queue.head, head) {
347 health_code_update();
348 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
349 wait_node->app->notify_sock = ust_cmd->sock;
350 cds_list_del(&wait_node->head);
351 wait_queue.count--;
352 app = wait_node->app;
353 free(wait_node);
354 wait_node = NULL;
355 DBG3("UST app notify socket %d is set", ust_cmd->sock);
356 break;
357 }
358 }
359
360 /*
361 * With no application at this stage the received socket is
362 * basically useless so close it before we free the cmd data
363 * structure for good.
364 */
365 if (!app) {
366 ret = close(ust_cmd->sock);
367 if (ret < 0) {
368 PERROR("close ust sock dispatch %d", ust_cmd->sock);
369 }
370 lttng_fd_put(LTTNG_FD_APPS, 1);
371 }
372 free(ust_cmd);
373 ust_cmd = NULL;
374 }
375
376 if (app) {
377 /*
378 * @session_lock_list
379 *
380 * Lock the global session list so from the register up to the
381 * registration done message, no thread can see the application
382 * and change its state.
383 */
384 session_lock_list();
385 rcu_read_lock();
386
387 /*
388 * Add application to the global hash table. This needs to be
389 * done before the update to the UST registry can locate the
390 * application.
391 */
392 ust_app_add(app);
393
394 /* Set app version. This call will print an error if needed. */
395 (void) ust_app_version(app);
396
397 (void) ust_app_setup_event_notifier_group(app);
398
399 /* Send notify socket through the notify pipe. */
400 ret = send_socket_to_thread(
401 notifiers->apps_cmd_notify_pipe_write_fd,
402 app->notify_sock);
403 if (ret < 0) {
404 rcu_read_unlock();
405 session_unlock_list();
406 /*
407 * No notify thread, stop the UST tracing. However, this is
408 * not an internal error of the this thread thus setting
409 * the health error code to a normal exit.
410 */
411 err = 0;
412 goto error;
413 }
414
415 /*
416 * Update newly registered application with the tracing
417 * registry info already enabled information.
418 */
419 update_ust_app(app->sock);
420
421 /*
422 * Don't care about return value. Let the manage apps threads
423 * handle app unregistration upon socket close.
424 */
425 (void) ust_app_register_done(app);
426
427 /*
428 * Even if the application socket has been closed, send the app
429 * to the thread and unregistration will take place at that
430 * place.
431 */
432 ret = send_socket_to_thread(
433 notifiers->apps_cmd_pipe_write_fd,
434 app->sock);
435 if (ret < 0) {
436 rcu_read_unlock();
437 session_unlock_list();
438 /*
439 * No apps. thread, stop the UST tracing. However, this is
440 * not an internal error of the this thread thus setting
441 * the health error code to a normal exit.
442 */
443 err = 0;
444 goto error;
445 }
446
447 rcu_read_unlock();
448 session_unlock_list();
449 }
450 } while (node != NULL);
451
452 health_poll_entry();
453 /* Futex wait on queue. Blocking call on futex() */
454 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
455 health_poll_exit();
456 }
457 /* Normal exit, no error */
458 err = 0;
459
460 error:
461 /* Clean up wait queue. */
462 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
463 &wait_queue.head, head) {
464 cds_list_del(&wait_node->head);
465 wait_queue.count--;
466 free(wait_node);
467 }
468
469 /* Empty command queue. */
470 for (;;) {
471 /* Dequeue command for registration */
472 node = cds_wfcq_dequeue_blocking(
473 &notifiers->ust_cmd_queue->head,
474 &notifiers->ust_cmd_queue->tail);
475 if (node == NULL) {
476 break;
477 }
478 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
479 ret = close(ust_cmd->sock);
480 if (ret < 0) {
481 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
482 }
483 lttng_fd_put(LTTNG_FD_APPS, 1);
484 free(ust_cmd);
485 }
486
487 error_testpoint:
488 DBG("Dispatch thread dying");
489 if (err) {
490 health_error();
491 ERR("Health error occurred in %s", __func__);
492 }
493 health_unregister(the_health_sessiond);
494 rcu_unregister_thread();
495 return NULL;
496 }
497
498 static bool shutdown_ust_dispatch_thread(void *data)
499 {
500 struct thread_notifiers *notifiers = (thread_notifiers *) data;
501
502 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
503 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
504 return true;
505 }
506
507 bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
508 int apps_cmd_pipe_write_fd,
509 int apps_cmd_notify_pipe_write_fd)
510 {
511 struct lttng_thread *thread;
512 struct thread_notifiers *notifiers;
513
514 notifiers = zmalloc<thread_notifiers>();
515 if (!notifiers) {
516 goto error;
517 }
518 notifiers->ust_cmd_queue = cmd_queue;
519 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
520 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
521
522 thread = lttng_thread_create("UST registration dispatch",
523 thread_dispatch_ust_registration,
524 shutdown_ust_dispatch_thread,
525 cleanup_ust_dispatch_thread,
526 notifiers);
527 if (!thread) {
528 goto error;
529 }
530 lttng_thread_put(thread);
531 return true;
532 error:
533 free(notifiers);
534 return false;
535 }
This page took 0.040605 seconds and 5 git commands to generate.