Commit | Line | Data |
---|---|---|
dd5875e7 JG |
1 | /* |
2 | * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca> | |
3 | * Mathieu Desnoyers <mathieu.desnoyers@efficios.com> | |
4 | * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
5 | * | |
6 | * This program is free software; you can redistribute it and/or modify | |
7 | * it under the terms of the GNU General Public License, version 2 only, | |
8 | * as published by the Free Software Foundation. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, | |
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
13 | * GNU General Public License for more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License along | |
16 | * with this program; if not, write to the Free Software Foundation, Inc., | |
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
18 | */ | |
19 | ||
20 | #include <stddef.h> | |
21 | #include <stdlib.h> | |
22 | #include <urcu.h> | |
23 | #include <common/futex.h> | |
24 | #include <common/macros.h> | |
25 | ||
26 | #include "dispatch.h" | |
27 | #include "ust-app.h" | |
28 | #include "testpoint.h" | |
29 | #include "fd-limit.h" | |
30 | #include "health-sessiond.h" | |
31 | #include "lttng-sessiond.h" | |
32 | #include "thread.h" | |
33 | ||
34 | struct thread_notifiers { | |
35 | struct ust_cmd_queue *ust_cmd_queue; | |
36 | int apps_cmd_pipe_write_fd; | |
37 | int apps_cmd_notify_pipe_write_fd; | |
38 | int dispatch_thread_exit; | |
39 | }; | |
40 | ||
41 | /* | |
42 | * For each tracing session, update newly registered apps. The session list | |
43 | * lock MUST be acquired before calling this. | |
44 | */ | |
45 | static void update_ust_app(int app_sock) | |
46 | { | |
47 | struct ltt_session *sess, *stmp; | |
48 | const struct ltt_session_list *session_list = session_get_list(); | |
49 | ||
50 | /* Consumer is in an ERROR state. Stop any application update. */ | |
51 | if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { | |
52 | /* Stop the update process since the consumer is dead. */ | |
53 | return; | |
54 | } | |
55 | ||
56 | /* For all tracing session(s) */ | |
57 | cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) { | |
58 | struct ust_app *app; | |
59 | ||
60 | if (!session_get(sess)) { | |
61 | continue; | |
62 | } | |
63 | session_lock(sess); | |
f559728d JR |
64 | if (!sess->active || !sess->ust_session || |
65 | !sess->ust_session->active) { | |
dd5875e7 JG |
66 | goto unlock_session; |
67 | } | |
68 | ||
69 | rcu_read_lock(); | |
70 | assert(app_sock >= 0); | |
71 | app = ust_app_find_by_sock(app_sock); | |
72 | if (app == NULL) { | |
73 | /* | |
74 | * Application can be unregistered before so | |
75 | * this is possible hence simply stopping the | |
76 | * update. | |
77 | */ | |
78 | DBG3("UST app update failed to find app sock %d", | |
79 | app_sock); | |
80 | goto unlock_rcu; | |
81 | } | |
82 | ust_app_global_update(sess->ust_session, app); | |
83 | unlock_rcu: | |
84 | rcu_read_unlock(); | |
85 | unlock_session: | |
86 | session_unlock(sess); | |
87 | session_put(sess); | |
88 | } | |
89 | } | |
90 | ||
91 | /* | |
92 | * Sanitize the wait queue of the dispatch registration thread meaning removing | |
93 | * invalid nodes from it. This is to avoid memory leaks for the case the UST | |
94 | * notify socket is never received. | |
95 | */ | |
96 | static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) | |
97 | { | |
98 | int ret, nb_fd = 0, i; | |
99 | unsigned int fd_added = 0; | |
100 | struct lttng_poll_event events; | |
101 | struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; | |
102 | ||
103 | assert(wait_queue); | |
104 | ||
105 | lttng_poll_init(&events); | |
106 | ||
107 | /* Just skip everything for an empty queue. */ | |
108 | if (!wait_queue->count) { | |
109 | goto end; | |
110 | } | |
111 | ||
112 | ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); | |
113 | if (ret < 0) { | |
114 | goto error_create; | |
115 | } | |
116 | ||
117 | cds_list_for_each_entry_safe(wait_node, tmp_wait_node, | |
118 | &wait_queue->head, head) { | |
119 | assert(wait_node->app); | |
120 | ret = lttng_poll_add(&events, wait_node->app->sock, | |
121 | LPOLLHUP | LPOLLERR); | |
122 | if (ret < 0) { | |
123 | goto error; | |
124 | } | |
125 | ||
126 | fd_added = 1; | |
127 | } | |
128 | ||
129 | if (!fd_added) { | |
130 | goto end; | |
131 | } | |
132 | ||
133 | /* | |
134 | * Poll but don't block so we can quickly identify the faulty events and | |
135 | * clean them afterwards from the wait queue. | |
136 | */ | |
137 | ret = lttng_poll_wait(&events, 0); | |
138 | if (ret < 0) { | |
139 | goto error; | |
140 | } | |
141 | nb_fd = ret; | |
142 | ||
143 | for (i = 0; i < nb_fd; i++) { | |
144 | /* Get faulty FD. */ | |
145 | uint32_t revents = LTTNG_POLL_GETEV(&events, i); | |
146 | int pollfd = LTTNG_POLL_GETFD(&events, i); | |
147 | ||
dd5875e7 JG |
148 | cds_list_for_each_entry_safe(wait_node, tmp_wait_node, |
149 | &wait_queue->head, head) { | |
150 | if (pollfd == wait_node->app->sock && | |
151 | (revents & (LPOLLHUP | LPOLLERR))) { | |
152 | cds_list_del(&wait_node->head); | |
153 | wait_queue->count--; | |
154 | ust_app_destroy(wait_node->app); | |
155 | free(wait_node); | |
156 | /* | |
157 | * Silence warning of use-after-free in | |
158 | * cds_list_for_each_entry_safe which uses | |
159 | * __typeof__(*wait_node). | |
160 | */ | |
161 | wait_node = NULL; | |
162 | break; | |
163 | } else { | |
164 | ERR("Unexpected poll events %u for sock %d", revents, pollfd); | |
165 | goto error; | |
166 | } | |
167 | } | |
168 | } | |
169 | ||
170 | if (nb_fd > 0) { | |
171 | DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); | |
172 | } | |
173 | ||
174 | end: | |
175 | lttng_poll_clean(&events); | |
176 | return; | |
177 | ||
178 | error: | |
179 | lttng_poll_clean(&events); | |
180 | error_create: | |
181 | ERR("Unable to sanitize wait queue"); | |
182 | return; | |
183 | } | |
184 | ||
185 | /* | |
186 | * Send a socket to a thread This is called from the dispatch UST registration | |
187 | * thread once all sockets are set for the application. | |
188 | * | |
189 | * The sock value can be invalid, we don't really care, the thread will handle | |
190 | * it and make the necessary cleanup if so. | |
191 | * | |
192 | * On success, return 0 else a negative value being the errno message of the | |
193 | * write(). | |
194 | */ | |
195 | static int send_socket_to_thread(int fd, int sock) | |
196 | { | |
197 | ssize_t ret; | |
198 | ||
199 | /* | |
200 | * It's possible that the FD is set as invalid with -1 concurrently just | |
201 | * before calling this function being a shutdown state of the thread. | |
202 | */ | |
203 | if (fd < 0) { | |
204 | ret = -EBADF; | |
205 | goto error; | |
206 | } | |
207 | ||
208 | ret = lttng_write(fd, &sock, sizeof(sock)); | |
209 | if (ret < sizeof(sock)) { | |
210 | PERROR("write apps pipe %d", fd); | |
211 | if (ret < 0) { | |
212 | ret = -errno; | |
213 | } | |
214 | goto error; | |
215 | } | |
216 | ||
217 | /* All good. Don't send back the write positive ret value. */ | |
218 | ret = 0; | |
219 | error: | |
220 | return (int) ret; | |
221 | } | |
222 | ||
223 | static void cleanup_ust_dispatch_thread(void *data) | |
224 | { | |
225 | free(data); | |
226 | } | |
227 | ||
228 | /* | |
229 | * Dispatch request from the registration threads to the application | |
230 | * communication thread. | |
231 | */ | |
232 | static void *thread_dispatch_ust_registration(void *data) | |
233 | { | |
234 | int ret, err = -1; | |
235 | struct cds_wfcq_node *node; | |
236 | struct ust_command *ust_cmd = NULL; | |
237 | struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; | |
238 | struct ust_reg_wait_queue wait_queue = { | |
239 | .count = 0, | |
240 | }; | |
241 | struct thread_notifiers *notifiers = data; | |
242 | ||
243 | rcu_register_thread(); | |
244 | ||
245 | health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); | |
246 | ||
247 | if (testpoint(sessiond_thread_app_reg_dispatch)) { | |
248 | goto error_testpoint; | |
249 | } | |
250 | ||
251 | health_code_update(); | |
252 | ||
253 | CDS_INIT_LIST_HEAD(&wait_queue.head); | |
254 | ||
255 | DBG("[thread] Dispatch UST command started"); | |
256 | ||
257 | for (;;) { | |
258 | health_code_update(); | |
259 | ||
260 | /* Atomically prepare the queue futex */ | |
261 | futex_nto1_prepare(¬ifiers->ust_cmd_queue->futex); | |
262 | ||
263 | if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) { | |
264 | break; | |
265 | } | |
266 | ||
267 | do { | |
268 | struct ust_app *app = NULL; | |
269 | ust_cmd = NULL; | |
270 | ||
271 | /* | |
272 | * Make sure we don't have node(s) that have hung up before receiving | |
273 | * the notify socket. This is to clean the list in order to avoid | |
274 | * memory leaks from notify socket that are never seen. | |
275 | */ | |
276 | sanitize_wait_queue(&wait_queue); | |
277 | ||
278 | health_code_update(); | |
279 | /* Dequeue command for registration */ | |
280 | node = cds_wfcq_dequeue_blocking( | |
281 | ¬ifiers->ust_cmd_queue->head, | |
282 | ¬ifiers->ust_cmd_queue->tail); | |
283 | if (node == NULL) { | |
284 | DBG("Woken up but nothing in the UST command queue"); | |
285 | /* Continue thread execution */ | |
286 | break; | |
287 | } | |
288 | ||
289 | ust_cmd = caa_container_of(node, struct ust_command, node); | |
290 | ||
291 | DBG("Dispatching UST registration pid:%d ppid:%d uid:%d" | |
292 | " gid:%d sock:%d name:%s (version %d.%d)", | |
293 | ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, | |
294 | ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, | |
295 | ust_cmd->sock, ust_cmd->reg_msg.name, | |
296 | ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); | |
297 | ||
298 | if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) { | |
299 | wait_node = zmalloc(sizeof(*wait_node)); | |
300 | if (!wait_node) { | |
301 | PERROR("zmalloc wait_node dispatch"); | |
302 | ret = close(ust_cmd->sock); | |
303 | if (ret < 0) { | |
304 | PERROR("close ust sock dispatch %d", ust_cmd->sock); | |
305 | } | |
306 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
307 | free(ust_cmd); | |
3577d7d8 | 308 | ust_cmd = NULL; |
dd5875e7 JG |
309 | goto error; |
310 | } | |
311 | CDS_INIT_LIST_HEAD(&wait_node->head); | |
312 | ||
313 | /* Create application object if socket is CMD. */ | |
314 | wait_node->app = ust_app_create(&ust_cmd->reg_msg, | |
315 | ust_cmd->sock); | |
316 | if (!wait_node->app) { | |
317 | ret = close(ust_cmd->sock); | |
318 | if (ret < 0) { | |
319 | PERROR("close ust sock dispatch %d", ust_cmd->sock); | |
320 | } | |
321 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
322 | free(wait_node); | |
3577d7d8 | 323 | wait_node = NULL; |
dd5875e7 | 324 | free(ust_cmd); |
3577d7d8 | 325 | ust_cmd = NULL; |
dd5875e7 JG |
326 | continue; |
327 | } | |
328 | /* | |
329 | * Add application to the wait queue so we can set the notify | |
330 | * socket before putting this object in the global ht. | |
331 | */ | |
332 | cds_list_add(&wait_node->head, &wait_queue.head); | |
333 | wait_queue.count++; | |
334 | ||
335 | free(ust_cmd); | |
3577d7d8 | 336 | ust_cmd = NULL; |
dd5875e7 JG |
337 | /* |
338 | * We have to continue here since we don't have the notify | |
339 | * socket and the application MUST be added to the hash table | |
340 | * only at that moment. | |
341 | */ | |
342 | continue; | |
343 | } else { | |
344 | /* | |
345 | * Look for the application in the local wait queue and set the | |
346 | * notify socket if found. | |
347 | */ | |
348 | cds_list_for_each_entry_safe(wait_node, tmp_wait_node, | |
349 | &wait_queue.head, head) { | |
350 | health_code_update(); | |
351 | if (wait_node->app->pid == ust_cmd->reg_msg.pid) { | |
352 | wait_node->app->notify_sock = ust_cmd->sock; | |
353 | cds_list_del(&wait_node->head); | |
354 | wait_queue.count--; | |
355 | app = wait_node->app; | |
356 | free(wait_node); | |
3577d7d8 | 357 | wait_node = NULL; |
dd5875e7 JG |
358 | DBG3("UST app notify socket %d is set", ust_cmd->sock); |
359 | break; | |
360 | } | |
361 | } | |
362 | ||
363 | /* | |
364 | * With no application at this stage the received socket is | |
365 | * basically useless so close it before we free the cmd data | |
366 | * structure for good. | |
367 | */ | |
368 | if (!app) { | |
369 | ret = close(ust_cmd->sock); | |
370 | if (ret < 0) { | |
371 | PERROR("close ust sock dispatch %d", ust_cmd->sock); | |
372 | } | |
373 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
374 | } | |
375 | free(ust_cmd); | |
3577d7d8 | 376 | ust_cmd = NULL; |
dd5875e7 JG |
377 | } |
378 | ||
379 | if (app) { | |
380 | /* | |
381 | * @session_lock_list | |
382 | * | |
383 | * Lock the global session list so from the register up to the | |
384 | * registration done message, no thread can see the application | |
385 | * and change its state. | |
386 | */ | |
387 | session_lock_list(); | |
388 | rcu_read_lock(); | |
389 | ||
390 | /* | |
391 | * Add application to the global hash table. This needs to be | |
392 | * done before the update to the UST registry can locate the | |
393 | * application. | |
394 | */ | |
395 | ust_app_add(app); | |
396 | ||
397 | /* Set app version. This call will print an error if needed. */ | |
398 | (void) ust_app_version(app); | |
399 | ||
400 | /* Send notify socket through the notify pipe. */ | |
401 | ret = send_socket_to_thread( | |
402 | notifiers->apps_cmd_notify_pipe_write_fd, | |
403 | app->notify_sock); | |
404 | if (ret < 0) { | |
405 | rcu_read_unlock(); | |
406 | session_unlock_list(); | |
407 | /* | |
408 | * No notify thread, stop the UST tracing. However, this is | |
409 | * not an internal error of the this thread thus setting | |
410 | * the health error code to a normal exit. | |
411 | */ | |
412 | err = 0; | |
413 | goto error; | |
414 | } | |
415 | ||
416 | /* | |
417 | * Update newly registered application with the tracing | |
418 | * registry info already enabled information. | |
419 | */ | |
420 | update_ust_app(app->sock); | |
421 | ||
422 | /* | |
423 | * Don't care about return value. Let the manage apps threads | |
424 | * handle app unregistration upon socket close. | |
425 | */ | |
426 | (void) ust_app_register_done(app); | |
427 | ||
428 | /* | |
429 | * Even if the application socket has been closed, send the app | |
430 | * to the thread and unregistration will take place at that | |
431 | * place. | |
432 | */ | |
433 | ret = send_socket_to_thread( | |
434 | notifiers->apps_cmd_pipe_write_fd, | |
435 | app->sock); | |
436 | if (ret < 0) { | |
437 | rcu_read_unlock(); | |
438 | session_unlock_list(); | |
439 | /* | |
440 | * No apps. thread, stop the UST tracing. However, this is | |
441 | * not an internal error of the this thread thus setting | |
442 | * the health error code to a normal exit. | |
443 | */ | |
444 | err = 0; | |
445 | goto error; | |
446 | } | |
447 | ||
448 | rcu_read_unlock(); | |
449 | session_unlock_list(); | |
450 | } | |
451 | } while (node != NULL); | |
452 | ||
453 | health_poll_entry(); | |
454 | /* Futex wait on queue. Blocking call on futex() */ | |
455 | futex_nto1_wait(¬ifiers->ust_cmd_queue->futex); | |
456 | health_poll_exit(); | |
457 | } | |
458 | /* Normal exit, no error */ | |
459 | err = 0; | |
460 | ||
461 | error: | |
462 | /* Clean up wait queue. */ | |
463 | cds_list_for_each_entry_safe(wait_node, tmp_wait_node, | |
464 | &wait_queue.head, head) { | |
465 | cds_list_del(&wait_node->head); | |
466 | wait_queue.count--; | |
467 | free(wait_node); | |
468 | } | |
469 | ||
470 | /* Empty command queue. */ | |
471 | for (;;) { | |
472 | /* Dequeue command for registration */ | |
473 | node = cds_wfcq_dequeue_blocking( | |
474 | ¬ifiers->ust_cmd_queue->head, | |
475 | ¬ifiers->ust_cmd_queue->tail); | |
476 | if (node == NULL) { | |
477 | break; | |
478 | } | |
479 | ust_cmd = caa_container_of(node, struct ust_command, node); | |
480 | ret = close(ust_cmd->sock); | |
481 | if (ret < 0) { | |
482 | PERROR("close ust sock exit dispatch %d", ust_cmd->sock); | |
483 | } | |
484 | lttng_fd_put(LTTNG_FD_APPS, 1); | |
485 | free(ust_cmd); | |
486 | } | |
487 | ||
488 | error_testpoint: | |
489 | DBG("Dispatch thread dying"); | |
490 | if (err) { | |
491 | health_error(); | |
492 | ERR("Health error occurred in %s", __func__); | |
493 | } | |
494 | health_unregister(health_sessiond); | |
495 | rcu_unregister_thread(); | |
496 | return NULL; | |
497 | } | |
498 | ||
499 | static bool shutdown_ust_dispatch_thread(void *data) | |
500 | { | |
501 | struct thread_notifiers *notifiers = data; | |
502 | ||
503 | CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1); | |
504 | futex_nto1_wake(¬ifiers->ust_cmd_queue->futex); | |
505 | return true; | |
506 | } | |
507 | ||
508 | bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue, | |
509 | int apps_cmd_pipe_write_fd, | |
510 | int apps_cmd_notify_pipe_write_fd) | |
511 | { | |
512 | struct lttng_thread *thread; | |
513 | struct thread_notifiers *notifiers; | |
514 | ||
515 | notifiers = zmalloc(sizeof(*notifiers)); | |
516 | if (!notifiers) { | |
517 | goto error; | |
518 | } | |
519 | notifiers->ust_cmd_queue = cmd_queue; | |
520 | notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd; | |
521 | notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd; | |
522 | ||
523 | thread = lttng_thread_create("UST registration dispatch", | |
524 | thread_dispatch_ust_registration, | |
525 | shutdown_ust_dispatch_thread, | |
526 | cleanup_ust_dispatch_thread, | |
527 | notifiers); | |
528 | if (!thread) { | |
529 | goto error; | |
530 | } | |
531 | lttng_thread_put(thread); | |
532 | return true; | |
533 | error: | |
534 | free(notifiers); | |
535 | return false; | |
536 | } |