fix: relayd: unaligned access in trace_chunk_registry_ht_key_hash
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include "dispatch.hpp"
11 #include "fd-limit.hpp"
12 #include "health-sessiond.hpp"
13 #include "lttng-sessiond.hpp"
14 #include "testpoint.hpp"
15 #include "thread.hpp"
16 #include "ust-app.hpp"
17
18 #include <common/futex.hpp>
19 #include <common/macros.hpp>
20 #include <common/urcu.hpp>
21
22 #include <stddef.h>
23 #include <stdlib.h>
24 #include <urcu.h>
25
26 namespace {
27 struct thread_notifiers {
28 struct ust_cmd_queue *ust_cmd_queue;
29 int apps_cmd_pipe_write_fd;
30 int apps_cmd_notify_pipe_write_fd;
31 int dispatch_thread_exit;
32 };
33 } /* namespace */
34
35 /*
36 * For each tracing session, update newly registered apps. The session list
37 * lock MUST be acquired before calling this.
38 */
39 static void update_ust_app(int app_sock)
40 {
41 struct ltt_session *sess, *stmp;
42 const struct ltt_session_list *session_list = session_get_list();
43 struct ust_app *app;
44
45 /* Consumer is in an ERROR state. Stop any application update. */
46 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
47 /* Stop the update process since the consumer is dead. */
48 return;
49 }
50
51 lttng::urcu::read_lock_guard read_lock;
52 LTTNG_ASSERT(app_sock >= 0);
53 app = ust_app_find_by_sock(app_sock);
54 if (app == nullptr) {
55 /*
56 * Application can be unregistered before so
57 * this is possible hence simply stopping the
58 * update.
59 */
60 DBG3("UST app update failed to find app sock %d", app_sock);
61 return;
62 }
63
64 /* Update all event notifiers for the app. */
65 ust_app_global_update_event_notifier_rules(app);
66
67 /* For all tracing session(s) */
68 cds_list_for_each_entry_safe (sess, stmp, &session_list->head, list) {
69 if (!session_get(sess)) {
70 continue;
71 }
72 session_lock(sess);
73 if (!sess->active || !sess->ust_session || !sess->ust_session->active) {
74 goto unlock_session;
75 }
76
77 ust_app_global_update(sess->ust_session, app);
78 unlock_session:
79 session_unlock(sess);
80 session_put(sess);
81 }
82 }
83
84 /*
85 * Sanitize the wait queue of the dispatch registration thread meaning removing
86 * invalid nodes from it. This is to avoid memory leaks for the case the UST
87 * notify socket is never received.
88 */
89 static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
90 {
91 int ret, nb_fd = 0, i;
92 unsigned int fd_added = 0;
93 struct lttng_poll_event events;
94 struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node;
95
96 LTTNG_ASSERT(wait_queue);
97
98 lttng_poll_init(&events);
99
100 /* Just skip everything for an empty queue. */
101 if (!wait_queue->count) {
102 goto end;
103 }
104
105 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
106 if (ret < 0) {
107 goto error_create;
108 }
109
110 cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) {
111 LTTNG_ASSERT(wait_node->app);
112 ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN);
113 if (ret < 0) {
114 goto error;
115 }
116
117 fd_added = 1;
118 }
119
120 if (!fd_added) {
121 goto end;
122 }
123
124 /*
125 * Poll but don't block so we can quickly identify the faulty events and
126 * clean them afterwards from the wait queue.
127 */
128 ret = lttng_poll_wait(&events, 0);
129 if (ret < 0) {
130 goto error;
131 }
132 nb_fd = ret;
133
134 for (i = 0; i < nb_fd; i++) {
135 /* Get faulty FD. */
136 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
137 int pollfd = LTTNG_POLL_GETFD(&events, i);
138
139 cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) {
140 if (pollfd == wait_node->app->sock && (revents & (LPOLLHUP | LPOLLERR))) {
141 cds_list_del(&wait_node->head);
142 wait_queue->count--;
143 ust_app_put(wait_node->app);
144 free(wait_node);
145
146 /*
147 * Silence warning of use-after-free in
148 * cds_list_for_each_entry_safe which uses
149 * __typeof__(*wait_node).
150 */
151 wait_node = nullptr;
152 break;
153 } else {
154 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
155 goto error;
156 }
157 }
158 }
159
160 if (nb_fd > 0) {
161 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
162 }
163
164 end:
165 lttng_poll_clean(&events);
166 return;
167
168 error:
169 lttng_poll_clean(&events);
170 error_create:
171 ERR("Unable to sanitize wait queue");
172 return;
173 }
174
175 /*
176 * Send a socket to a thread This is called from the dispatch UST registration
177 * thread once all sockets are set for the application.
178 *
179 * The sock value can be invalid, we don't really care, the thread will handle
180 * it and make the necessary cleanup if so.
181 *
182 * On success, return 0 else a negative value being the errno message of the
183 * write().
184 */
185 static int send_socket_to_thread(int fd, int sock)
186 {
187 ssize_t ret;
188
189 /*
190 * It's possible that the FD is set as invalid with -1 concurrently just
191 * before calling this function being a shutdown state of the thread.
192 */
193 if (fd < 0) {
194 ret = -EBADF;
195 goto error;
196 }
197
198 ret = lttng_write(fd, &sock, sizeof(sock));
199 if (ret < sizeof(sock)) {
200 PERROR("write apps pipe %d", fd);
201 if (ret < 0) {
202 ret = -errno;
203 }
204 goto error;
205 }
206
207 /* All good. Don't send back the write positive ret value. */
208 ret = 0;
209 error:
210 return (int) ret;
211 }
212
213 static void cleanup_ust_dispatch_thread(void *data)
214 {
215 free(data);
216 }
217
218 /*
219 * Dispatch request from the registration threads to the application
220 * communication thread.
221 */
222 static void *thread_dispatch_ust_registration(void *data)
223 {
224 int ret, err = -1;
225 struct cds_wfcq_node *node;
226 struct ust_command *ust_cmd = nullptr;
227 struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node;
228 struct ust_reg_wait_queue wait_queue = {
229 .count = 0,
230 .head = {},
231 };
232 struct thread_notifiers *notifiers = (thread_notifiers *) data;
233
234 rcu_register_thread();
235
236 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
237
238 if (testpoint(sessiond_thread_app_reg_dispatch)) {
239 goto error_testpoint;
240 }
241
242 health_code_update();
243
244 CDS_INIT_LIST_HEAD(&wait_queue.head);
245
246 DBG("[thread] Dispatch UST command started");
247
248 for (;;) {
249 health_code_update();
250
251 /* Atomically prepare the queue futex */
252 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
253
254 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
255 break;
256 }
257
258 do {
259 struct ust_app *app = nullptr;
260 ust_cmd = nullptr;
261
262 /*
263 * Make sure we don't have node(s) that have hung up before receiving
264 * the notify socket. This is to clean the list in order to avoid
265 * memory leaks from notify socket that are never seen.
266 */
267 sanitize_wait_queue(&wait_queue);
268
269 health_code_update();
270 /* Dequeue command for registration */
271 node = cds_wfcq_dequeue_blocking(&notifiers->ust_cmd_queue->head,
272 &notifiers->ust_cmd_queue->tail);
273 if (node == nullptr) {
274 DBG("Woken up but nothing in the UST command queue");
275 /* Continue thread execution */
276 break;
277 }
278
279 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
280
281 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
282 " gid:%d sock:%d name:%s (version %d.%d)",
283 ust_cmd->reg_msg.pid,
284 ust_cmd->reg_msg.ppid,
285 ust_cmd->reg_msg.uid,
286 ust_cmd->reg_msg.gid,
287 ust_cmd->sock,
288 ust_cmd->reg_msg.name,
289 ust_cmd->reg_msg.major,
290 ust_cmd->reg_msg.minor);
291
292 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
293 wait_node = zmalloc<ust_reg_wait_node>();
294 if (!wait_node) {
295 PERROR("zmalloc wait_node dispatch");
296 ret = close(ust_cmd->sock);
297 if (ret < 0) {
298 PERROR("close ust sock dispatch %d", ust_cmd->sock);
299 }
300 lttng_fd_put(LTTNG_FD_APPS, 1);
301 free(ust_cmd);
302 ust_cmd = nullptr;
303 goto error;
304 }
305 CDS_INIT_LIST_HEAD(&wait_node->head);
306
307 /* Create application object if socket is CMD. */
308 wait_node->app = ust_app_create(&ust_cmd->reg_msg, ust_cmd->sock);
309 if (!wait_node->app) {
310 ret = close(ust_cmd->sock);
311 if (ret < 0) {
312 PERROR("close ust sock dispatch %d", ust_cmd->sock);
313 }
314 lttng_fd_put(LTTNG_FD_APPS, 1);
315 free(wait_node);
316 wait_node = nullptr;
317 free(ust_cmd);
318 ust_cmd = nullptr;
319 continue;
320 }
321 /*
322 * Add application to the wait queue so we can set the notify
323 * socket before putting this object in the global ht.
324 */
325 cds_list_add(&wait_node->head, &wait_queue.head);
326 wait_queue.count++;
327
328 free(ust_cmd);
329 ust_cmd = nullptr;
330 /*
331 * We have to continue here since we don't have the notify
332 * socket and the application MUST be added to the hash table
333 * only at that moment.
334 */
335 continue;
336 } else {
337 /*
338 * Look for the application in the local wait queue and set the
339 * notify socket if found.
340 */
341 cds_list_for_each_entry_safe (
342 wait_node, tmp_wait_node, &wait_queue.head, head) {
343 health_code_update();
344 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
345 wait_node->app->notify_sock = ust_cmd->sock;
346 cds_list_del(&wait_node->head);
347 wait_queue.count--;
348 app = wait_node->app;
349 free(wait_node);
350 wait_node = nullptr;
351 DBG3("UST app notify socket %d is set",
352 ust_cmd->sock);
353 break;
354 }
355 }
356
357 /*
358 * With no application at this stage the received socket is
359 * basically useless so close it before we free the cmd data
360 * structure for good.
361 */
362 if (!app) {
363 ret = close(ust_cmd->sock);
364 if (ret < 0) {
365 PERROR("close ust sock dispatch %d", ust_cmd->sock);
366 }
367 lttng_fd_put(LTTNG_FD_APPS, 1);
368 }
369 free(ust_cmd);
370 ust_cmd = nullptr;
371 }
372
373 if (app) {
374 /*
375 * @session_lock_list
376 *
377 * Lock the global session list so from the register up to the
378 * registration done message, no thread can see the application
379 * and change its state.
380 */
381 session_lock_list();
382 lttng::urcu::read_lock_guard read_lock;
383
384 /*
385 * Add application to the global hash table. This needs to be
386 * done before the update to the UST registry can locate the
387 * application.
388 */
389 ust_app_add(app);
390
391 /* Set app version. This call will print an error if needed. */
392 (void) ust_app_version(app);
393
394 (void) ust_app_setup_event_notifier_group(app);
395
396 /* Send notify socket through the notify pipe. */
397 ret = send_socket_to_thread(
398 notifiers->apps_cmd_notify_pipe_write_fd, app->notify_sock);
399 if (ret < 0) {
400 session_unlock_list();
401 /*
402 * No notify thread, stop the UST tracing. However, this is
403 * not an internal error of the this thread thus setting
404 * the health error code to a normal exit.
405 */
406 err = 0;
407 goto error;
408 }
409
410 /*
411 * Update newly registered application with the tracing
412 * registry info already enabled information.
413 */
414 update_ust_app(app->sock);
415
416 /*
417 * Don't care about return value. Let the manage apps threads
418 * handle app unregistration upon socket close.
419 */
420 (void) ust_app_register_done(app);
421
422 /*
423 * Even if the application socket has been closed, send the app
424 * to the thread and unregistration will take place at that
425 * place.
426 */
427 ret = send_socket_to_thread(notifiers->apps_cmd_pipe_write_fd,
428 app->sock);
429 if (ret < 0) {
430 session_unlock_list();
431 /*
432 * No apps. thread, stop the UST tracing. However, this is
433 * not an internal error of the this thread thus setting
434 * the health error code to a normal exit.
435 */
436 err = 0;
437 goto error;
438 }
439
440 session_unlock_list();
441 }
442 } while (node != nullptr);
443
444 health_poll_entry();
445 /* Futex wait on queue. Blocking call on futex() */
446 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
447 health_poll_exit();
448 }
449 /* Normal exit, no error */
450 err = 0;
451
452 error:
453 /* Clean up wait queue. */
454 cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue.head, head) {
455 cds_list_del(&wait_node->head);
456 wait_queue.count--;
457 free(wait_node);
458 }
459
460 /* Empty command queue. */
461 for (;;) {
462 /* Dequeue command for registration */
463 node = cds_wfcq_dequeue_blocking(&notifiers->ust_cmd_queue->head,
464 &notifiers->ust_cmd_queue->tail);
465 if (node == nullptr) {
466 break;
467 }
468 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
469 ret = close(ust_cmd->sock);
470 if (ret < 0) {
471 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
472 }
473 lttng_fd_put(LTTNG_FD_APPS, 1);
474 free(ust_cmd);
475 }
476
477 error_testpoint:
478 DBG("Dispatch thread dying");
479 if (err) {
480 health_error();
481 ERR("Health error occurred in %s", __func__);
482 }
483 health_unregister(the_health_sessiond);
484 rcu_unregister_thread();
485 return nullptr;
486 }
487
488 static bool shutdown_ust_dispatch_thread(void *data)
489 {
490 struct thread_notifiers *notifiers = (thread_notifiers *) data;
491
492 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
493 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
494 return true;
495 }
496
497 bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
498 int apps_cmd_pipe_write_fd,
499 int apps_cmd_notify_pipe_write_fd)
500 {
501 struct lttng_thread *thread;
502 struct thread_notifiers *notifiers;
503
504 notifiers = zmalloc<thread_notifiers>();
505 if (!notifiers) {
506 goto error;
507 }
508 notifiers->ust_cmd_queue = cmd_queue;
509 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
510 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
511
512 thread = lttng_thread_create("UST registration dispatch",
513 thread_dispatch_ust_registration,
514 shutdown_ust_dispatch_thread,
515 cleanup_ust_dispatch_thread,
516 notifiers);
517 if (!thread) {
518 goto error;
519 }
520 lttng_thread_put(thread);
521 return true;
522 error:
523 free(notifiers);
524 return false;
525 }
This page took 0.039605 seconds and 4 git commands to generate.