Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.cpp
CommitLineData
5d1b0219 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5d1b0219 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
5d1b0219 7 *
5d1b0219
JG
8 */
9
c9e313bc 10#include "dispatch.hpp"
c9e313bc
SM
11#include "fd-limit.hpp"
12#include "health-sessiond.hpp"
13#include "lttng-sessiond.hpp"
28ab034a 14#include "testpoint.hpp"
c9e313bc 15#include "thread.hpp"
28ab034a
JG
16#include "ust-app.hpp"
17
18#include <common/futex.hpp>
19#include <common/macros.hpp>
56047f5a 20#include <common/urcu.hpp>
28ab034a
JG
21
22#include <stddef.h>
23#include <stdlib.h>
24#include <urcu.h>
5d1b0219 25
f1494934 26namespace {
5d1b0219
JG
27struct thread_notifiers {
28 struct ust_cmd_queue *ust_cmd_queue;
29 int apps_cmd_pipe_write_fd;
30 int apps_cmd_notify_pipe_write_fd;
31 int dispatch_thread_exit;
32};
f1494934 33} /* namespace */
5d1b0219
JG
34
35/*
36 * For each tracing session, update newly registered apps. The session list
37 * lock MUST be acquired before calling this.
38 */
39static void update_ust_app(int app_sock)
40{
5d1b0219 41 const struct ltt_session_list *session_list = session_get_list();
70670472 42 struct ust_app *app;
5d1b0219
JG
43
44 /* Consumer is in an ERROR state. Stop any application update. */
412d7227 45 if (uatomic_read(&the_ust_consumerd_state) == CONSUMER_ERROR) {
5d1b0219
JG
46 /* Stop the update process since the consumer is dead. */
47 return;
48 }
49
07c4863f 50 const lttng::urcu::read_lock_guard read_lock;
a0377dfe 51 LTTNG_ASSERT(app_sock >= 0);
70670472 52 app = ust_app_find_by_sock(app_sock);
cd9adb8b 53 if (app == nullptr) {
70670472
JR
54 /*
55 * Application can be unregistered before so
56 * this is possible hence simply stopping the
57 * update.
58 */
28ab034a 59 DBG3("UST app update failed to find app sock %d", app_sock);
56047f5a 60 return;
70670472
JR
61 }
62
63 /* Update all event notifiers for the app. */
64 ust_app_global_update_event_notifier_rules(app);
65
5d1b0219 66 /* For all tracing session(s) */
64a87eb6
JG
67 for (auto *sess : lttng::urcu::list_iteration_adapter<ltt_session, &ltt_session::list>(
68 session_list->head)) {
5d1b0219
JG
69 if (!session_get(sess)) {
70 continue;
71 }
72 session_lock(sess);
28ab034a 73 if (!sess->active || !sess->ust_session || !sess->ust_session->active) {
5d1b0219
JG
74 goto unlock_session;
75 }
76
5d1b0219 77 ust_app_global_update(sess->ust_session, app);
5d1b0219
JG
78 unlock_session:
79 session_unlock(sess);
80 session_put(sess);
81 }
82}
83
84/*
85 * Sanitize the wait queue of the dispatch registration thread meaning removing
86 * invalid nodes from it. This is to avoid memory leaks for the case the UST
87 * notify socket is never received.
88 */
89static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
90{
91 int ret, nb_fd = 0, i;
92 unsigned int fd_added = 0;
93 struct lttng_poll_event events;
5d1b0219 94
a0377dfe 95 LTTNG_ASSERT(wait_queue);
5d1b0219
JG
96
97 lttng_poll_init(&events);
98
99 /* Just skip everything for an empty queue. */
100 if (!wait_queue->count) {
101 goto end;
102 }
103
104 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
105 if (ret < 0) {
106 goto error_create;
107 }
108
64a87eb6
JG
109 for (auto *wait_node :
110 lttng::urcu::list_iteration_adapter<ust_reg_wait_node, &ust_reg_wait_node::head>(
111 wait_queue->head)) {
a0377dfe 112 LTTNG_ASSERT(wait_node->app);
1524f98c 113 ret = lttng_poll_add(&events, wait_node->app->sock, LPOLLIN);
5d1b0219
JG
114 if (ret < 0) {
115 goto error;
116 }
117
118 fd_added = 1;
119 }
120
121 if (!fd_added) {
122 goto end;
123 }
124
125 /*
126 * Poll but don't block so we can quickly identify the faulty events and
127 * clean them afterwards from the wait queue.
128 */
129 ret = lttng_poll_wait(&events, 0);
130 if (ret < 0) {
131 goto error;
132 }
133 nb_fd = ret;
134
135 for (i = 0; i < nb_fd; i++) {
136 /* Get faulty FD. */
07c4863f
JG
137 const uint32_t revents = LTTNG_POLL_GETEV(&events, i);
138 const int pollfd = LTTNG_POLL_GETFD(&events, i);
5d1b0219 139
64a87eb6
JG
140 for (auto *wait_node :
141 lttng::urcu::list_iteration_adapter<ust_reg_wait_node,
142 &ust_reg_wait_node::head>(
143 wait_queue->head)) {
28ab034a 144 if (pollfd == wait_node->app->sock && (revents & (LPOLLHUP | LPOLLERR))) {
5d1b0219
JG
145 cds_list_del(&wait_node->head);
146 wait_queue->count--;
a7db814e 147 ust_app_put(wait_node->app);
5d1b0219 148 free(wait_node);
a7db814e 149
cd9adb8b 150 wait_node = nullptr;
5d1b0219
JG
151 break;
152 } else {
153 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
154 goto error;
155 }
156 }
157 }
158
159 if (nb_fd > 0) {
160 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
161 }
162
163end:
164 lttng_poll_clean(&events);
165 return;
166
167error:
168 lttng_poll_clean(&events);
169error_create:
170 ERR("Unable to sanitize wait queue");
171 return;
172}
173
174/*
175 * Send a socket to a thread This is called from the dispatch UST registration
176 * thread once all sockets are set for the application.
177 *
178 * The sock value can be invalid, we don't really care, the thread will handle
179 * it and make the necessary cleanup if so.
180 *
181 * On success, return 0 else a negative value being the errno message of the
182 * write().
183 */
184static int send_socket_to_thread(int fd, int sock)
185{
186 ssize_t ret;
187
188 /*
189 * It's possible that the FD is set as invalid with -1 concurrently just
190 * before calling this function being a shutdown state of the thread.
191 */
192 if (fd < 0) {
193 ret = -EBADF;
194 goto error;
195 }
196
197 ret = lttng_write(fd, &sock, sizeof(sock));
198 if (ret < sizeof(sock)) {
199 PERROR("write apps pipe %d", fd);
200 if (ret < 0) {
201 ret = -errno;
202 }
203 goto error;
204 }
205
206 /* All good. Don't send back the write positive ret value. */
207 ret = 0;
208error:
209 return (int) ret;
210}
211
212static void cleanup_ust_dispatch_thread(void *data)
213{
214 free(data);
215}
216
217/*
218 * Dispatch request from the registration threads to the application
219 * communication thread.
220 */
221static void *thread_dispatch_ust_registration(void *data)
222{
223 int ret, err = -1;
224 struct cds_wfcq_node *node;
cd9adb8b 225 struct ust_command *ust_cmd = nullptr;
64a87eb6 226 struct ust_reg_wait_node *wait_node = nullptr;
5d1b0219
JG
227 struct ust_reg_wait_queue wait_queue = {
228 .count = 0,
1c9a0b0e 229 .head = {},
5d1b0219 230 };
7966af57 231 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
232
233 rcu_register_thread();
234
28ab034a 235 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
5d1b0219
JG
236
237 if (testpoint(sessiond_thread_app_reg_dispatch)) {
238 goto error_testpoint;
239 }
240
241 health_code_update();
242
243 CDS_INIT_LIST_HEAD(&wait_queue.head);
244
245 DBG("[thread] Dispatch UST command started");
246
247 for (;;) {
248 health_code_update();
249
250 /* Atomically prepare the queue futex */
251 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
252
253 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
254 break;
255 }
256
257 do {
cd9adb8b
JG
258 struct ust_app *app = nullptr;
259 ust_cmd = nullptr;
5d1b0219
JG
260
261 /*
262 * Make sure we don't have node(s) that have hung up before receiving
263 * the notify socket. This is to clean the list in order to avoid
264 * memory leaks from notify socket that are never seen.
265 */
266 sanitize_wait_queue(&wait_queue);
267
268 health_code_update();
269 /* Dequeue command for registration */
28ab034a
JG
270 node = cds_wfcq_dequeue_blocking(&notifiers->ust_cmd_queue->head,
271 &notifiers->ust_cmd_queue->tail);
cd9adb8b 272 if (node == nullptr) {
5d1b0219
JG
273 DBG("Woken up but nothing in the UST command queue");
274 /* Continue thread execution */
275 break;
276 }
277
0114db0e 278 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
279
280 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
28ab034a
JG
281 " gid:%d sock:%d name:%s (version %d.%d)",
282 ust_cmd->reg_msg.pid,
283 ust_cmd->reg_msg.ppid,
284 ust_cmd->reg_msg.uid,
285 ust_cmd->reg_msg.gid,
286 ust_cmd->sock,
287 ust_cmd->reg_msg.name,
288 ust_cmd->reg_msg.major,
289 ust_cmd->reg_msg.minor);
5d1b0219 290
b623cb6a 291 if (ust_cmd->reg_msg.type == LTTNG_UST_CTL_SOCKET_CMD) {
64803277 292 wait_node = zmalloc<ust_reg_wait_node>();
5d1b0219
JG
293 if (!wait_node) {
294 PERROR("zmalloc wait_node dispatch");
295 ret = close(ust_cmd->sock);
296 if (ret < 0) {
297 PERROR("close ust sock dispatch %d", ust_cmd->sock);
298 }
299 lttng_fd_put(LTTNG_FD_APPS, 1);
300 free(ust_cmd);
cd9adb8b 301 ust_cmd = nullptr;
5d1b0219
JG
302 goto error;
303 }
304 CDS_INIT_LIST_HEAD(&wait_node->head);
305
306 /* Create application object if socket is CMD. */
28ab034a 307 wait_node->app = ust_app_create(&ust_cmd->reg_msg, ust_cmd->sock);
5d1b0219
JG
308 if (!wait_node->app) {
309 ret = close(ust_cmd->sock);
310 if (ret < 0) {
311 PERROR("close ust sock dispatch %d", ust_cmd->sock);
312 }
313 lttng_fd_put(LTTNG_FD_APPS, 1);
314 free(wait_node);
cd9adb8b 315 wait_node = nullptr;
5d1b0219 316 free(ust_cmd);
cd9adb8b 317 ust_cmd = nullptr;
5d1b0219
JG
318 continue;
319 }
320 /*
321 * Add application to the wait queue so we can set the notify
322 * socket before putting this object in the global ht.
323 */
324 cds_list_add(&wait_node->head, &wait_queue.head);
325 wait_queue.count++;
326
327 free(ust_cmd);
cd9adb8b 328 ust_cmd = nullptr;
5d1b0219
JG
329 /*
330 * We have to continue here since we don't have the notify
331 * socket and the application MUST be added to the hash table
332 * only at that moment.
333 */
334 continue;
335 } else {
336 /*
337 * Look for the application in the local wait queue and set the
338 * notify socket if found.
339 */
64a87eb6
JG
340 for (auto *wait_node_in_queue :
341 lttng::urcu::list_iteration_adapter<ust_reg_wait_node,
342 &ust_reg_wait_node::head>(
343 wait_queue.head)) {
5d1b0219 344 health_code_update();
64a87eb6
JG
345 if (wait_node_in_queue->app->pid == ust_cmd->reg_msg.pid) {
346 wait_node_in_queue->app->notify_sock =
347 ust_cmd->sock;
348 cds_list_del(&wait_node_in_queue->head);
5d1b0219 349 wait_queue.count--;
64a87eb6
JG
350 app = wait_node_in_queue->app;
351 free(wait_node_in_queue);
352
28ab034a
JG
353 DBG3("UST app notify socket %d is set",
354 ust_cmd->sock);
5d1b0219
JG
355 break;
356 }
357 }
358
359 /*
360 * With no application at this stage the received socket is
361 * basically useless so close it before we free the cmd data
362 * structure for good.
363 */
364 if (!app) {
365 ret = close(ust_cmd->sock);
366 if (ret < 0) {
367 PERROR("close ust sock dispatch %d", ust_cmd->sock);
368 }
369 lttng_fd_put(LTTNG_FD_APPS, 1);
370 }
371 free(ust_cmd);
cd9adb8b 372 ust_cmd = nullptr;
5d1b0219
JG
373 }
374
375 if (app) {
376 /*
377 * @session_lock_list
378 *
379 * Lock the global session list so from the register up to the
380 * registration done message, no thread can see the application
381 * and change its state.
382 */
d9a970b7 383 const auto list_lock = lttng::sessiond::lock_session_list();
07c4863f 384 const lttng::urcu::read_lock_guard read_lock;
5d1b0219
JG
385
386 /*
387 * Add application to the global hash table. This needs to be
388 * done before the update to the UST registry can locate the
389 * application.
390 */
391 ust_app_add(app);
392
393 /* Set app version. This call will print an error if needed. */
394 (void) ust_app_version(app);
395
da873412
JR
396 (void) ust_app_setup_event_notifier_group(app);
397
5d1b0219
JG
398 /* Send notify socket through the notify pipe. */
399 ret = send_socket_to_thread(
28ab034a 400 notifiers->apps_cmd_notify_pipe_write_fd, app->notify_sock);
5d1b0219 401 if (ret < 0) {
5d1b0219
JG
402 /*
403 * No notify thread, stop the UST tracing. However, this is
404 * not an internal error of the this thread thus setting
405 * the health error code to a normal exit.
406 */
407 err = 0;
408 goto error;
409 }
410
411 /*
412 * Update newly registered application with the tracing
413 * registry info already enabled information.
414 */
415 update_ust_app(app->sock);
416
417 /*
418 * Don't care about return value. Let the manage apps threads
419 * handle app unregistration upon socket close.
420 */
421 (void) ust_app_register_done(app);
422
423 /*
424 * Even if the application socket has been closed, send the app
425 * to the thread and unregistration will take place at that
426 * place.
427 */
28ab034a
JG
428 ret = send_socket_to_thread(notifiers->apps_cmd_pipe_write_fd,
429 app->sock);
5d1b0219 430 if (ret < 0) {
5d1b0219
JG
431 /*
432 * No apps. thread, stop the UST tracing. However, this is
433 * not an internal error of the this thread thus setting
434 * the health error code to a normal exit.
435 */
436 err = 0;
437 goto error;
438 }
5d1b0219 439 }
cd9adb8b 440 } while (node != nullptr);
5d1b0219
JG
441
442 health_poll_entry();
443 /* Futex wait on queue. Blocking call on futex() */
444 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
445 health_poll_exit();
446 }
447 /* Normal exit, no error */
448 err = 0;
449
450error:
451 /* Clean up wait queue. */
64a87eb6
JG
452 for (auto *wait_node_in_queue :
453 lttng::urcu::list_iteration_adapter<ust_reg_wait_node, &ust_reg_wait_node::head>(
454 wait_queue.head)) {
455 cds_list_del(&wait_node_in_queue->head);
5d1b0219 456 wait_queue.count--;
64a87eb6 457 free(wait_node_in_queue);
5d1b0219
JG
458 }
459
460 /* Empty command queue. */
461 for (;;) {
462 /* Dequeue command for registration */
28ab034a
JG
463 node = cds_wfcq_dequeue_blocking(&notifiers->ust_cmd_queue->head,
464 &notifiers->ust_cmd_queue->tail);
cd9adb8b 465 if (node == nullptr) {
5d1b0219
JG
466 break;
467 }
0114db0e 468 ust_cmd = lttng::utils::container_of(node, &ust_command::node);
5d1b0219
JG
469 ret = close(ust_cmd->sock);
470 if (ret < 0) {
471 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
472 }
473 lttng_fd_put(LTTNG_FD_APPS, 1);
474 free(ust_cmd);
475 }
476
477error_testpoint:
478 DBG("Dispatch thread dying");
479 if (err) {
480 health_error();
481 ERR("Health error occurred in %s", __func__);
482 }
412d7227 483 health_unregister(the_health_sessiond);
5d1b0219 484 rcu_unregister_thread();
cd9adb8b 485 return nullptr;
5d1b0219
JG
486}
487
488static bool shutdown_ust_dispatch_thread(void *data)
489{
7966af57 490 struct thread_notifiers *notifiers = (thread_notifiers *) data;
5d1b0219
JG
491
492 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
493 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
494 return true;
495}
496
497bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
28ab034a
JG
498 int apps_cmd_pipe_write_fd,
499 int apps_cmd_notify_pipe_write_fd)
5d1b0219
JG
500{
501 struct lttng_thread *thread;
502 struct thread_notifiers *notifiers;
503
64803277 504 notifiers = zmalloc<thread_notifiers>();
5d1b0219
JG
505 if (!notifiers) {
506 goto error;
507 }
508 notifiers->ust_cmd_queue = cmd_queue;
509 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
510 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
511
512 thread = lttng_thread_create("UST registration dispatch",
28ab034a
JG
513 thread_dispatch_ust_registration,
514 shutdown_ust_dispatch_thread,
515 cleanup_ust_dispatch_thread,
516 notifiers);
5d1b0219
JG
517 if (!thread) {
518 goto error;
519 }
520 lttng_thread_put(thread);
521 return true;
522error:
523 free(notifiers);
524 return false;
525}
This page took 0.088454 seconds and 4 git commands to generate.