clang-tidy: add a subset of cppcoreguidelines and other style checks
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.cpp
CommitLineData
5d1b0219 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5d1b0219 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
5d1b0219 7 *
5d1b0219
JG
8 */
9
c9e313bc 10#include "dispatch.hpp"
c9e313bc
SM
11#include "fd-limit.hpp"
12#include "health-sessiond.hpp"
13#include "lttng-sessiond.hpp"
28ab034a 14#include "testpoint.hpp"
c9e313bc 15#include "thread.hpp"
28ab034a
JG
16#include "ust-app.hpp"
17
18#include <common/futex.hpp>
19#include <common/macros.hpp>
20
21#include <stddef.h>
22#include <stdlib.h>
23#include <urcu.h>
5d1b0219 24
f1494934 25namespace {
5d1b0219
JG
26struct thread_notifiers {
27 struct ust_cmd_queue *ust_cmd_queue;
28 int apps_cmd_pipe_write_fd;
29 int apps_cmd_notify_pipe_write_fd;
30 int dispatch_thread_exit;
31};
f1494934 32} /* namespace */
5d1b0219
JG
33
34/*
35 * For each tracing session, update newly registered apps. The session list
36 * lock MUST be acquired before calling this.
37 */
38static void update_ust_app(int app_sock)
39{
40 struct ltt_session *sess, *stmp;
41 const struct ltt_session_list *session_list = session_get_list();
70670472 42 struct ust_app *app;
5d1b0219
JG
43
44 /* Consumer is in an ERROR state. Stop any application update. */
412d7227 45 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
5d1b0219
JG
46 /* Stop the update process since the consumer is dead. */
47 return;
48 }
49
70670472 50 rcu_read_lock();
a0377dfe 51 LTTNG_ASSERT(app_sock >= 0);
70670472 52 app = ust_app_find_by_sock(app_sock);
cd9adb8b 53 if (app == nullptr) {
70670472
JR
54 /*
55 * Application can be unregistered before so
56 * this is possible hence simply stopping the
57 * update.
58 */
28ab034a 59 DBG3("UST app update failed to find app sock %d", app_sock);
70670472
JR
60 goto unlock_rcu;
61 }
62
63 /* Update all event notifiers for the app. */
64 ust_app_global_update_event_notifier_rules(app);
65
5d1b0219 66 /* For all tracing session(s) */
28ab034a 67 cds_list_for_each_entry_safe (sess, stmp, &session_list->head, list) {
5d1b0219
JG
68 if (!session_get(sess)) {
69 continue;
70 }
71 session_lock(sess);
28ab034a 72 if (!sess->active || !sess->ust_session || !sess->ust_session->active) {
5d1b0219
JG
73 goto unlock_session;
74 }
75
5d1b0219 76 ust_app_global_update(sess->ust_session, app);
5d1b0219
JG
77 unlock_session:
78 session_unlock(sess);
79 session_put(sess);
80 }
70670472
JR
81
82unlock_rcu:
83 rcu_read_unlock();
5d1b0219
JG
84}
85
86/*
87 * Sanitize the wait queue of the dispatch registration thread meaning removing
88 * invalid nodes from it. This is to avoid memory leaks for the case the UST
89 * notify socket is never received.
90 */
91static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
92{
93 int ret, nb_fd = 0, i;
94 unsigned int fd_added = 0;
95 struct lttng_poll_event events;
cd9adb8b 96 struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node;
5d1b0219 97
a0377dfe 98 LTTNG_ASSERT(wait_queue);
5d1b0219
JG
99
100 lttng_poll_init(&events);
101
102 /* Just skip everything for an empty queue. */
103 if (!wait_queue->count) {
104 goto end;
105 }
106
107 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
108 if (ret < 0) {
109 goto error_create;
110 }
111
28ab034a 112 cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) {
a0377dfe 113 LTTNG_ASSERT(wait_node->app);
1524f98c 114 ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN);
5d1b0219
JG
115 if (ret < 0) {
116 goto error;
117 }
118
119 fd_added = 1;
120 }
121
122 if (!fd_added) {
123 goto end;
124 }
125
126 /*
127 * Poll but don't block so we can quickly identify the faulty events and
128 * clean them afterwards from the wait queue.
129 */
130 ret = lttng_poll_wait(&events, 0);
131 if (ret < 0) {
132 goto error;
133 }
134 nb_fd = ret;
135
136 for (i = 0; i < nb_fd; i++) {
137 /* Get faulty FD. */
138 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
139 int pollfd = LTTNG_POLL_GETFD(&events, i);
140
28ab034a
JG
141 cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue->head, head) {
142 if (pollfd == wait_node->app->sock && (revents & (LPOLLHUP | LPOLLERR))) {
5d1b0219
JG
143 cds_list_del(&wait_node->head);
144 wait_queue->count--;
145 ust_app_destroy(wait_node->app);
146 free(wait_node);
147 /*
148 * Silence warning of use-after-free in
149 * cds_list_for_each_entry_safe which uses
150 * __typeof__(*wait_node).
151 */
cd9adb8b 152 wait_node = nullptr;
5d1b0219
JG
153 break;
154 } else {
155 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
156 goto error;
157 }
158 }
159 }
160
161 if (nb_fd > 0) {
162 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
163 }
164
165end:
166 lttng_poll_clean(&events);
167 return;
168
169error:
170 lttng_poll_clean(&events);
171error_create:
172 ERR("Unable to sanitize wait queue");
173 return;
174}
175
176/*
177 * Send a socket to a thread This is called from the dispatch UST registration
178 * thread once all sockets are set for the application.
179 *
180 * The sock value can be invalid, we don't really care, the thread will handle
181 * it and make the necessary cleanup if so.
182 *
183 * On success, return 0 else a negative value being the errno message of the
184 * write().
185 */
186static int send_socket_to_thread(int fd, int sock)
187{
188 ssize_t ret;
189
190 /*
191 * It's possible that the FD is set as invalid with -1 concurrently just
192 * before calling this function being a shutdown state of the thread.
193 */
194 if (fd < 0) {
195 ret = -EBADF;
196 goto error;
197 }
198
199 ret = lttng_write(fd, &sock, sizeof(sock));
200 if (ret < sizeof(sock)) {
201 PERROR("write apps pipe %d", fd);
202 if (ret < 0) {
203 ret = -errno;
204 }
205 goto error;
206 }
207
208 /* All good. Don't send back the write positive ret value. */
209 ret = 0;
210error:
211 return (int) ret;
212}
213
214static void cleanup_ust_dispatch_thread(void *data)
215{
216 free(data);
217}
218
219/*
220 * Dispatch request from the registration threads to the application
221 * communication thread.
222 */
223static void *thread_dispatch_ust_registration(void *data)
224{
225 int ret, err = -1;
226 struct cds_wfcq_node *node;
cd9adb8b
JG
227 struct ust_command *ust_cmd = nullptr;
228 struct ust_reg_wait_node *wait_node = nullptr, *tmp_wait_node;
5d1b0219
JG
229 struct ust_reg_wait_queue wait_queue = {
230 .count = 0,
1c9a0b0e 231 .head = {},
5d1b0219 232 };
7966af57 233 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
234
235 rcu_register_thread();
236
28ab034a 237 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
5d1b0219
JG
238
239 if (testpoint(sessiond_thread_app_reg_dispatch)) {
240 goto error_testpoint;
241 }
242
243 health_code_update();
244
245 CDS_INIT_LIST_HEAD(&wait_queue.head);
246
247 DBG("[thread] Dispatch UST command started");
248
249 for (;;) {
250 health_code_update();
251
252 /* Atomically prepare the queue futex */
253 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
254
255 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
256 break;
257 }
258
259 do {
cd9adb8b
JG
260 struct ust_app *app = nullptr;
261 ust_cmd = nullptr;
5d1b0219
JG
262
263 /*
264 * Make sure we don't have node(s) that have hung up before receiving
265 * the notify socket. This is to clean the list in order to avoid
266 * memory leaks from notify socket that are never seen.
267 */
268 sanitize_wait_queue(&wait_queue);
269
270 health_code_update();
271 /* Dequeue command for registration */
28ab034a
JG
272 node = cds_wfcq_dequeue_blocking(&notifiers->ust_cmd_queue->head,
273 &notifiers->ust_cmd_queue->tail);
cd9adb8b 274 if (node == nullptr) {
5d1b0219
JG
275 DBG("Woken up but nothing in the UST command queue");
276 /* Continue thread execution */
277 break;
278 }
279
0114db0e 280 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
281
282 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
28ab034a
JG
283 " gid:%d sock:%d name:%s (version %d.%d)",
284 ust_cmd->reg_msg.pid,
285 ust_cmd->reg_msg.ppid,
286 ust_cmd->reg_msg.uid,
287 ust_cmd->reg_msg.gid,
288 ust_cmd->sock,
289 ust_cmd->reg_msg.name,
290 ust_cmd->reg_msg.major,
291 ust_cmd->reg_msg.minor);
5d1b0219 292
b623cb6a 293 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
64803277 294 wait_node = zmalloc<ust_reg_wait_node>();
5d1b0219
JG
295 if (!wait_node) {
296 PERROR("zmalloc wait_node dispatch");
297 ret = close(ust_cmd->sock);
298 if (ret < 0) {
299 PERROR("close ust sock dispatch %d", ust_cmd->sock);
300 }
301 lttng_fd_put(LTTNG_FD_APPS, 1);
302 free(ust_cmd);
cd9adb8b 303 ust_cmd = nullptr;
5d1b0219
JG
304 goto error;
305 }
306 CDS_INIT_LIST_HEAD(&wait_node->head);
307
308 /* Create application object if socket is CMD. */
28ab034a 309 wait_node->app = ust_app_create(&ust_cmd->reg_msg, ust_cmd->sock);
5d1b0219
JG
310 if (!wait_node->app) {
311 ret = close(ust_cmd->sock);
312 if (ret < 0) {
313 PERROR("close ust sock dispatch %d", ust_cmd->sock);
314 }
315 lttng_fd_put(LTTNG_FD_APPS, 1);
316 free(wait_node);
cd9adb8b 317 wait_node = nullptr;
5d1b0219 318 free(ust_cmd);
cd9adb8b 319 ust_cmd = nullptr;
5d1b0219
JG
320 continue;
321 }
322 /*
323 * Add application to the wait queue so we can set the notify
324 * socket before putting this object in the global ht.
325 */
326 cds_list_add(&wait_node->head, &wait_queue.head);
327 wait_queue.count++;
328
329 free(ust_cmd);
cd9adb8b 330 ust_cmd = nullptr;
5d1b0219
JG
331 /*
332 * We have to continue here since we don't have the notify
333 * socket and the application MUST be added to the hash table
334 * only at that moment.
335 */
336 continue;
337 } else {
338 /*
339 * Look for the application in the local wait queue and set the
340 * notify socket if found.
341 */
28ab034a
JG
342 cds_list_for_each_entry_safe (
343 wait_node, tmp_wait_node, &wait_queue.head, head) {
5d1b0219
JG
344 health_code_update();
345 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
346 wait_node->app->notify_sock = ust_cmd->sock;
347 cds_list_del(&wait_node->head);
348 wait_queue.count--;
349 app = wait_node->app;
350 free(wait_node);
cd9adb8b 351 wait_node = nullptr;
28ab034a
JG
352 DBG3("UST app notify socket %d is set",
353 ust_cmd->sock);
5d1b0219
JG
354 break;
355 }
356 }
357
358 /*
359 * With no application at this stage the received socket is
360 * basically useless so close it before we free the cmd data
361 * structure for good.
362 */
363 if (!app) {
364 ret = close(ust_cmd->sock);
365 if (ret < 0) {
366 PERROR("close ust sock dispatch %d", ust_cmd->sock);
367 }
368 lttng_fd_put(LTTNG_FD_APPS, 1);
369 }
370 free(ust_cmd);
cd9adb8b 371 ust_cmd = nullptr;
5d1b0219
JG
372 }
373
374 if (app) {
375 /*
376 * @session_lock_list
377 *
378 * Lock the global session list so from the register up to the
379 * registration done message, no thread can see the application
380 * and change its state.
381 */
382 session_lock_list();
383 rcu_read_lock();
384
385 /*
386 * Add application to the global hash table. This needs to be
387 * done before the update to the UST registry can locate the
388 * application.
389 */
390 ust_app_add(app);
391
392 /* Set app version. This call will print an error if needed. */
393 (void) ust_app_version(app);
394
da873412
JR
395 (void) ust_app_setup_event_notifier_group(app);
396
5d1b0219
JG
397 /* Send notify socket through the notify pipe. */
398 ret = send_socket_to_thread(
28ab034a 399 notifiers->apps_cmd_notify_pipe_write_fd, app->notify_sock);
5d1b0219
JG
400 if (ret < 0) {
401 rcu_read_unlock();
402 session_unlock_list();
403 /*
404 * No notify thread, stop the UST tracing. However, this is
405 * not an internal error of the this thread thus setting
406 * the health error code to a normal exit.
407 */
408 err = 0;
409 goto error;
410 }
411
412 /*
413 * Update newly registered application with the tracing
414 * registry info already enabled information.
415 */
416 update_ust_app(app->sock);
417
418 /*
419 * Don't care about return value. Let the manage apps threads
420 * handle app unregistration upon socket close.
421 */
422 (void) ust_app_register_done(app);
423
424 /*
425 * Even if the application socket has been closed, send the app
426 * to the thread and unregistration will take place at that
427 * place.
428 */
28ab034a
JG
429 ret = send_socket_to_thread(notifiers->apps_cmd_pipe_write_fd,
430 app->sock);
5d1b0219
JG
431 if (ret < 0) {
432 rcu_read_unlock();
433 session_unlock_list();
434 /*
435 * No apps. thread, stop the UST tracing. However, this is
436 * not an internal error of the this thread thus setting
437 * the health error code to a normal exit.
438 */
439 err = 0;
440 goto error;
441 }
442
443 rcu_read_unlock();
444 session_unlock_list();
445 }
cd9adb8b 446 } while (node != nullptr);
5d1b0219
JG
447
448 health_poll_entry();
449 /* Futex wait on queue. Blocking call on futex() */
450 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
451 health_poll_exit();
452 }
453 /* Normal exit, no error */
454 err = 0;
455
456error:
457 /* Clean up wait queue. */
28ab034a 458 cds_list_for_each_entry_safe (wait_node, tmp_wait_node, &wait_queue.head, head) {
5d1b0219
JG
459 cds_list_del(&wait_node->head);
460 wait_queue.count--;
461 free(wait_node);
462 }
463
464 /* Empty command queue. */
465 for (;;) {
466 /* Dequeue command for registration */
28ab034a
JG
467 node = cds_wfcq_dequeue_blocking(&notifiers->ust_cmd_queue->head,
468 &notifiers->ust_cmd_queue->tail);
cd9adb8b 469 if (node == nullptr) {
5d1b0219
JG
470 break;
471 }
0114db0e 472 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
473 ret = close(ust_cmd->sock);
474 if (ret < 0) {
475 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
476 }
477 lttng_fd_put(LTTNG_FD_APPS, 1);
478 free(ust_cmd);
479 }
480
481error_testpoint:
482 DBG("Dispatch thread dying");
483 if (err) {
484 health_error();
485 ERR("Health error occurred in %s", __func__);
486 }
412d7227 487 health_unregister(the_health_sessiond);
5d1b0219 488 rcu_unregister_thread();
cd9adb8b 489 return nullptr;
5d1b0219
JG
490}
491
492static bool shutdown_ust_dispatch_thread(void *data)
493{
7966af57 494 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
495
496 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
497 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
498 return true;
499}
500
501bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
28ab034a
JG
502 int apps_cmd_pipe_write_fd,
503 int apps_cmd_notify_pipe_write_fd)
5d1b0219
JG
504{
505 struct lttng_thread *thread;
506 struct thread_notifiers *notifiers;
507
64803277 508 notifiers = zmalloc<thread_notifiers>();
5d1b0219
JG
509 if (!notifiers) {
510 goto error;
511 }
512 notifiers->ust_cmd_queue = cmd_queue;
513 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
514 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
515
516 thread = lttng_thread_create("UST registration dispatch",
28ab034a
JG
517 thread_dispatch_ust_registration,
518 shutdown_ust_dispatch_thread,
519 cleanup_ust_dispatch_thread,
520 notifiers);
5d1b0219
JG
521 if (!thread) {
522 goto error;
523 }
524 lttng_thread_put(thread);
525 return true;
526error:
527 free(notifiers);
528 return false;
529}
This page took 0.076369 seconds and 4 git commands to generate.