52d253d47d3578923d5fbf69c1ce52b438ee5330
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.c
1 /*
2 * Copyright (C) 2011 David Goulet <david.goulet@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include <stddef.h>
11 #include <stdlib.h>
12 #include <urcu.h>
13 #include <common/futex.h>
14 #include <common/macros.h>
15
16 #include "dispatch.h"
17 #include "ust-app.h"
18 #include "testpoint.h"
19 #include "fd-limit.h"
20 #include "health-sessiond.h"
21 #include "lttng-sessiond.h"
22 #include "thread.h"
23
24 struct thread_notifiers {
25 struct ust_cmd_queue *ust_cmd_queue;
26 int apps_cmd_pipe_write_fd;
27 int apps_cmd_notify_pipe_write_fd;
28 int dispatch_thread_exit;
29 };
30
31 /*
32 * For each tracing session, update newly registered apps. The session list
33 * lock MUST be acquired before calling this.
34 */
35 static void update_ust_app(int app_sock)
36 {
37 struct ltt_session *sess, *stmp;
38 const struct ltt_session_list *session_list = session_get_list();
39
40 /* Consumer is in an ERROR state. Stop any application update. */
41 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
42 /* Stop the update process since the consumer is dead. */
43 return;
44 }
45
46 /* For all tracing session(s) */
47 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
48 struct ust_app *app;
49
50 if (!session_get(sess)) {
51 continue;
52 }
53 session_lock(sess);
54 if (!sess->active || !sess->ust_session) {
55 goto unlock_session;
56 }
57
58 rcu_read_lock();
59 assert(app_sock >= 0);
60 app = ust_app_find_by_sock(app_sock);
61 if (app == NULL) {
62 /*
63 * Application can be unregistered before so
64 * this is possible hence simply stopping the
65 * update.
66 */
67 DBG3("UST app update failed to find app sock %d",
68 app_sock);
69 goto unlock_rcu;
70 }
71 ust_app_global_update(sess->ust_session, app);
72 unlock_rcu:
73 rcu_read_unlock();
74 unlock_session:
75 session_unlock(sess);
76 session_put(sess);
77 }
78 }
79
80 /*
81 * Sanitize the wait queue of the dispatch registration thread meaning removing
82 * invalid nodes from it. This is to avoid memory leaks for the case the UST
83 * notify socket is never received.
84 */
85 static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
86 {
87 int ret, nb_fd = 0, i;
88 unsigned int fd_added = 0;
89 struct lttng_poll_event events;
90 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
91
92 assert(wait_queue);
93
94 lttng_poll_init(&events);
95
96 /* Just skip everything for an empty queue. */
97 if (!wait_queue->count) {
98 goto end;
99 }
100
101 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
102 if (ret < 0) {
103 goto error_create;
104 }
105
106 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
107 &wait_queue->head, head) {
108 assert(wait_node->app);
109 ret = lttng_poll_add(&events, wait_node->app->sock,
110 LPOLLHUP | LPOLLERR);
111 if (ret < 0) {
112 goto error;
113 }
114
115 fd_added = 1;
116 }
117
118 if (!fd_added) {
119 goto end;
120 }
121
122 /*
123 * Poll but don't block so we can quickly identify the faulty events and
124 * clean them afterwards from the wait queue.
125 */
126 ret = lttng_poll_wait(&events, 0);
127 if (ret < 0) {
128 goto error;
129 }
130 nb_fd = ret;
131
132 for (i = 0; i < nb_fd; i++) {
133 /* Get faulty FD. */
134 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
135 int pollfd = LTTNG_POLL_GETFD(&events, i);
136
137 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
138 &wait_queue->head, head) {
139 if (pollfd == wait_node->app->sock &&
140 (revents & (LPOLLHUP | LPOLLERR))) {
141 cds_list_del(&wait_node->head);
142 wait_queue->count--;
143 ust_app_destroy(wait_node->app);
144 free(wait_node);
145 /*
146 * Silence warning of use-after-free in
147 * cds_list_for_each_entry_safe which uses
148 * __typeof__(*wait_node).
149 */
150 wait_node = NULL;
151 break;
152 } else {
153 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
154 goto error;
155 }
156 }
157 }
158
159 if (nb_fd > 0) {
160 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
161 }
162
163 end:
164 lttng_poll_clean(&events);
165 return;
166
167 error:
168 lttng_poll_clean(&events);
169 error_create:
170 ERR("Unable to sanitize wait queue");
171 return;
172 }
173
174 /*
175 * Send a socket to a thread This is called from the dispatch UST registration
176 * thread once all sockets are set for the application.
177 *
178 * The sock value can be invalid, we don't really care, the thread will handle
179 * it and make the necessary cleanup if so.
180 *
181 * On success, return 0 else a negative value being the errno message of the
182 * write().
183 */
184 static int send_socket_to_thread(int fd, int sock)
185 {
186 ssize_t ret;
187
188 /*
189 * It's possible that the FD is set as invalid with -1 concurrently just
190 * before calling this function being a shutdown state of the thread.
191 */
192 if (fd < 0) {
193 ret = -EBADF;
194 goto error;
195 }
196
197 ret = lttng_write(fd, &sock, sizeof(sock));
198 if (ret < sizeof(sock)) {
199 PERROR("write apps pipe %d", fd);
200 if (ret < 0) {
201 ret = -errno;
202 }
203 goto error;
204 }
205
206 /* All good. Don't send back the write positive ret value. */
207 ret = 0;
208 error:
209 return (int) ret;
210 }
211
212 static void cleanup_ust_dispatch_thread(void *data)
213 {
214 free(data);
215 }
216
217 /*
218 * Dispatch request from the registration threads to the application
219 * communication thread.
220 */
221 static void *thread_dispatch_ust_registration(void *data)
222 {
223 int ret, err = -1;
224 struct cds_wfcq_node *node;
225 struct ust_command *ust_cmd = NULL;
226 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
227 struct ust_reg_wait_queue wait_queue = {
228 .count = 0,
229 };
230 struct thread_notifiers *notifiers = data;
231
232 rcu_register_thread();
233
234 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
235
236 if (testpoint(sessiond_thread_app_reg_dispatch)) {
237 goto error_testpoint;
238 }
239
240 health_code_update();
241
242 CDS_INIT_LIST_HEAD(&wait_queue.head);
243
244 DBG("[thread] Dispatch UST command started");
245
246 for (;;) {
247 health_code_update();
248
249 /* Atomically prepare the queue futex */
250 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
251
252 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
253 break;
254 }
255
256 do {
257 struct ust_app *app = NULL;
258 ust_cmd = NULL;
259
260 /*
261 * Make sure we don't have node(s) that have hung up before receiving
262 * the notify socket. This is to clean the list in order to avoid
263 * memory leaks from notify socket that are never seen.
264 */
265 sanitize_wait_queue(&wait_queue);
266
267 health_code_update();
268 /* Dequeue command for registration */
269 node = cds_wfcq_dequeue_blocking(
270 &notifiers->ust_cmd_queue->head,
271 &notifiers->ust_cmd_queue->tail);
272 if (node == NULL) {
273 DBG("Woken up but nothing in the UST command queue");
274 /* Continue thread execution */
275 break;
276 }
277
278 ust_cmd = caa_container_of(node, struct ust_command, node);
279
280 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
281 " gid:%d sock:%d name:%s (version %d.%d)",
282 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
283 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
284 ust_cmd->sock, ust_cmd->reg_msg.name,
285 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
286
287 if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
288 wait_node = zmalloc(sizeof(*wait_node));
289 if (!wait_node) {
290 PERROR("zmalloc wait_node dispatch");
291 ret = close(ust_cmd->sock);
292 if (ret < 0) {
293 PERROR("close ust sock dispatch %d", ust_cmd->sock);
294 }
295 lttng_fd_put(LTTNG_FD_APPS, 1);
296 free(ust_cmd);
297 ust_cmd = NULL;
298 goto error;
299 }
300 CDS_INIT_LIST_HEAD(&wait_node->head);
301
302 /* Create application object if socket is CMD. */
303 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
304 ust_cmd->sock);
305 if (!wait_node->app) {
306 ret = close(ust_cmd->sock);
307 if (ret < 0) {
308 PERROR("close ust sock dispatch %d", ust_cmd->sock);
309 }
310 lttng_fd_put(LTTNG_FD_APPS, 1);
311 free(wait_node);
312 wait_node = NULL;
313 free(ust_cmd);
314 ust_cmd = NULL;
315 continue;
316 }
317 /*
318 * Add application to the wait queue so we can set the notify
319 * socket before putting this object in the global ht.
320 */
321 cds_list_add(&wait_node->head, &wait_queue.head);
322 wait_queue.count++;
323
324 free(ust_cmd);
325 ust_cmd = NULL;
326 /*
327 * We have to continue here since we don't have the notify
328 * socket and the application MUST be added to the hash table
329 * only at that moment.
330 */
331 continue;
332 } else {
333 /*
334 * Look for the application in the local wait queue and set the
335 * notify socket if found.
336 */
337 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
338 &wait_queue.head, head) {
339 health_code_update();
340 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
341 wait_node->app->notify_sock = ust_cmd->sock;
342 cds_list_del(&wait_node->head);
343 wait_queue.count--;
344 app = wait_node->app;
345 free(wait_node);
346 wait_node = NULL;
347 DBG3("UST app notify socket %d is set", ust_cmd->sock);
348 break;
349 }
350 }
351
352 /*
353 * With no application at this stage the received socket is
354 * basically useless so close it before we free the cmd data
355 * structure for good.
356 */
357 if (!app) {
358 ret = close(ust_cmd->sock);
359 if (ret < 0) {
360 PERROR("close ust sock dispatch %d", ust_cmd->sock);
361 }
362 lttng_fd_put(LTTNG_FD_APPS, 1);
363 }
364 free(ust_cmd);
365 ust_cmd = NULL;
366 }
367
368 if (app) {
369 /*
370 * @session_lock_list
371 *
372 * Lock the global session list so from the register up to the
373 * registration done message, no thread can see the application
374 * and change its state.
375 */
376 session_lock_list();
377 rcu_read_lock();
378
379 /*
380 * Add application to the global hash table. This needs to be
381 * done before the update to the UST registry can locate the
382 * application.
383 */
384 ust_app_add(app);
385
386 /* Set app version. This call will print an error if needed. */
387 (void) ust_app_version(app);
388
389 (void) ust_app_setup_event_notifier_group(app);
390
391 /* Send notify socket through the notify pipe. */
392 ret = send_socket_to_thread(
393 notifiers->apps_cmd_notify_pipe_write_fd,
394 app->notify_sock);
395 if (ret < 0) {
396 rcu_read_unlock();
397 session_unlock_list();
398 /*
399 * No notify thread, stop the UST tracing. However, this is
400 * not an internal error of the this thread thus setting
401 * the health error code to a normal exit.
402 */
403 err = 0;
404 goto error;
405 }
406
407 /*
408 * Update newly registered application with the tracing
409 * registry info already enabled information.
410 */
411 update_ust_app(app->sock);
412
413 /*
414 * Don't care about return value. Let the manage apps threads
415 * handle app unregistration upon socket close.
416 */
417 (void) ust_app_register_done(app);
418
419 /*
420 * Even if the application socket has been closed, send the app
421 * to the thread and unregistration will take place at that
422 * place.
423 */
424 ret = send_socket_to_thread(
425 notifiers->apps_cmd_pipe_write_fd,
426 app->sock);
427 if (ret < 0) {
428 rcu_read_unlock();
429 session_unlock_list();
430 /*
431 * No apps. thread, stop the UST tracing. However, this is
432 * not an internal error of the this thread thus setting
433 * the health error code to a normal exit.
434 */
435 err = 0;
436 goto error;
437 }
438
439 rcu_read_unlock();
440 session_unlock_list();
441 }
442 } while (node != NULL);
443
444 health_poll_entry();
445 /* Futex wait on queue. Blocking call on futex() */
446 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
447 health_poll_exit();
448 }
449 /* Normal exit, no error */
450 err = 0;
451
452 error:
453 /* Clean up wait queue. */
454 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
455 &wait_queue.head, head) {
456 cds_list_del(&wait_node->head);
457 wait_queue.count--;
458 free(wait_node);
459 }
460
461 /* Empty command queue. */
462 for (;;) {
463 /* Dequeue command for registration */
464 node = cds_wfcq_dequeue_blocking(
465 &notifiers->ust_cmd_queue->head,
466 &notifiers->ust_cmd_queue->tail);
467 if (node == NULL) {
468 break;
469 }
470 ust_cmd = caa_container_of(node, struct ust_command, node);
471 ret = close(ust_cmd->sock);
472 if (ret < 0) {
473 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
474 }
475 lttng_fd_put(LTTNG_FD_APPS, 1);
476 free(ust_cmd);
477 }
478
479 error_testpoint:
480 DBG("Dispatch thread dying");
481 if (err) {
482 health_error();
483 ERR("Health error occurred in %s", __func__);
484 }
485 health_unregister(health_sessiond);
486 rcu_unregister_thread();
487 return NULL;
488 }
489
490 static bool shutdown_ust_dispatch_thread(void *data)
491 {
492 struct thread_notifiers *notifiers = data;
493
494 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
495 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
496 return true;
497 }
498
499 bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
500 int apps_cmd_pipe_write_fd,
501 int apps_cmd_notify_pipe_write_fd)
502 {
503 struct lttng_thread *thread;
504 struct thread_notifiers *notifiers;
505
506 notifiers = zmalloc(sizeof(*notifiers));
507 if (!notifiers) {
508 goto error;
509 }
510 notifiers->ust_cmd_queue = cmd_queue;
511 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
512 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
513
514 thread = lttng_thread_create("UST registration dispatch",
515 thread_dispatch_ust_registration,
516 shutdown_ust_dispatch_thread,
517 cleanup_ust_dispatch_thread,
518 notifiers);
519 if (!thread) {
520 goto error;
521 }
522 lttng_thread_put(thread);
523 return true;
524 error:
525 free(notifiers);
526 return false;
527 }
This page took 0.040081 seconds and 4 git commands to generate.