Fix: sessiond: double socket close on allocation failure
[lttng-tools.git] / src / bin / lttng-sessiond / dispatch.c
CommitLineData
5d1b0219
JG
1/*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include <stddef.h>
21#include <stdlib.h>
22#include <urcu.h>
23#include <common/futex.h>
24#include <common/macros.h>
25
26#include "dispatch.h"
27#include "ust-app.h"
28#include "testpoint.h"
29#include "fd-limit.h"
30#include "health-sessiond.h"
31#include "lttng-sessiond.h"
32#include "thread.h"
33
34struct thread_notifiers {
35 struct ust_cmd_queue *ust_cmd_queue;
36 int apps_cmd_pipe_write_fd;
37 int apps_cmd_notify_pipe_write_fd;
38 int dispatch_thread_exit;
39};
40
41/*
42 * For each tracing session, update newly registered apps. The session list
43 * lock MUST be acquired before calling this.
44 */
45static void update_ust_app(int app_sock)
46{
47 struct ltt_session *sess, *stmp;
48 const struct ltt_session_list *session_list = session_get_list();
49
50 /* Consumer is in an ERROR state. Stop any application update. */
51 if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
52 /* Stop the update process since the consumer is dead. */
53 return;
54 }
55
56 /* For all tracing session(s) */
57 cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) {
58 struct ust_app *app;
59
60 if (!session_get(sess)) {
61 continue;
62 }
63 session_lock(sess);
88e3c2f5 64 if (!sess->active || !sess->ust_session) {
5d1b0219
JG
65 goto unlock_session;
66 }
67
68 rcu_read_lock();
69 assert(app_sock >= 0);
70 app = ust_app_find_by_sock(app_sock);
71 if (app == NULL) {
72 /*
73 * Application can be unregistered before so
74 * this is possible hence simply stopping the
75 * update.
76 */
77 DBG3("UST app update failed to find app sock %d",
78 app_sock);
79 goto unlock_rcu;
80 }
81 ust_app_global_update(sess->ust_session, app);
82 unlock_rcu:
83 rcu_read_unlock();
84 unlock_session:
85 session_unlock(sess);
86 session_put(sess);
87 }
88}
89
90/*
91 * Sanitize the wait queue of the dispatch registration thread meaning removing
92 * invalid nodes from it. This is to avoid memory leaks for the case the UST
93 * notify socket is never received.
94 */
95static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
96{
97 int ret, nb_fd = 0, i;
98 unsigned int fd_added = 0;
99 struct lttng_poll_event events;
100 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
101
102 assert(wait_queue);
103
104 lttng_poll_init(&events);
105
106 /* Just skip everything for an empty queue. */
107 if (!wait_queue->count) {
108 goto end;
109 }
110
111 ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC);
112 if (ret < 0) {
113 goto error_create;
114 }
115
116 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
117 &wait_queue->head, head) {
118 assert(wait_node->app);
119 ret = lttng_poll_add(&events, wait_node->app->sock,
120 LPOLLHUP | LPOLLERR);
121 if (ret < 0) {
122 goto error;
123 }
124
125 fd_added = 1;
126 }
127
128 if (!fd_added) {
129 goto end;
130 }
131
132 /*
133 * Poll but don't block so we can quickly identify the faulty events and
134 * clean them afterwards from the wait queue.
135 */
136 ret = lttng_poll_wait(&events, 0);
137 if (ret < 0) {
138 goto error;
139 }
140 nb_fd = ret;
141
142 for (i = 0; i < nb_fd; i++) {
143 /* Get faulty FD. */
144 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
145 int pollfd = LTTNG_POLL_GETFD(&events, i);
146
5d1b0219
JG
147 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
148 &wait_queue->head, head) {
149 if (pollfd == wait_node->app->sock &&
150 (revents & (LPOLLHUP | LPOLLERR))) {
151 cds_list_del(&wait_node->head);
152 wait_queue->count--;
153 ust_app_destroy(wait_node->app);
154 free(wait_node);
155 /*
156 * Silence warning of use-after-free in
157 * cds_list_for_each_entry_safe which uses
158 * __typeof__(*wait_node).
159 */
160 wait_node = NULL;
161 break;
162 } else {
163 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
164 goto error;
165 }
166 }
167 }
168
169 if (nb_fd > 0) {
170 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd);
171 }
172
173end:
174 lttng_poll_clean(&events);
175 return;
176
177error:
178 lttng_poll_clean(&events);
179error_create:
180 ERR("Unable to sanitize wait queue");
181 return;
182}
183
184/*
185 * Send a socket to a thread This is called from the dispatch UST registration
186 * thread once all sockets are set for the application.
187 *
188 * The sock value can be invalid, we don't really care, the thread will handle
189 * it and make the necessary cleanup if so.
190 *
191 * On success, return 0 else a negative value being the errno message of the
192 * write().
193 */
194static int send_socket_to_thread(int fd, int sock)
195{
196 ssize_t ret;
197
198 /*
199 * It's possible that the FD is set as invalid with -1 concurrently just
200 * before calling this function being a shutdown state of the thread.
201 */
202 if (fd < 0) {
203 ret = -EBADF;
204 goto error;
205 }
206
207 ret = lttng_write(fd, &sock, sizeof(sock));
208 if (ret < sizeof(sock)) {
209 PERROR("write apps pipe %d", fd);
210 if (ret < 0) {
211 ret = -errno;
212 }
213 goto error;
214 }
215
216 /* All good. Don't send back the write positive ret value. */
217 ret = 0;
218error:
219 return (int) ret;
220}
221
222static void cleanup_ust_dispatch_thread(void *data)
223{
224 free(data);
225}
226
227/*
228 * Dispatch request from the registration threads to the application
229 * communication thread.
230 */
231static void *thread_dispatch_ust_registration(void *data)
232{
233 int ret, err = -1;
234 struct cds_wfcq_node *node;
235 struct ust_command *ust_cmd = NULL;
236 struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node;
237 struct ust_reg_wait_queue wait_queue = {
238 .count = 0,
239 };
240 struct thread_notifiers *notifiers = data;
241
242 rcu_register_thread();
243
244 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH);
245
246 if (testpoint(sessiond_thread_app_reg_dispatch)) {
247 goto error_testpoint;
248 }
249
250 health_code_update();
251
252 CDS_INIT_LIST_HEAD(&wait_queue.head);
253
254 DBG("[thread] Dispatch UST command started");
255
256 for (;;) {
257 health_code_update();
258
259 /* Atomically prepare the queue futex */
260 futex_nto1_prepare(&notifiers->ust_cmd_queue->futex);
261
262 if (CMM_LOAD_SHARED(notifiers->dispatch_thread_exit)) {
263 break;
264 }
265
266 do {
267 struct ust_app *app = NULL;
268 ust_cmd = NULL;
269
270 /*
271 * Make sure we don't have node(s) that have hung up before receiving
272 * the notify socket. This is to clean the list in order to avoid
273 * memory leaks from notify socket that are never seen.
274 */
275 sanitize_wait_queue(&wait_queue);
276
277 health_code_update();
278 /* Dequeue command for registration */
279 node = cds_wfcq_dequeue_blocking(
280 &notifiers->ust_cmd_queue->head,
281 &notifiers->ust_cmd_queue->tail);
282 if (node == NULL) {
283 DBG("Woken up but nothing in the UST command queue");
284 /* Continue thread execution */
285 break;
286 }
287
288 ust_cmd = caa_container_of(node, struct ust_command, node);
289
290 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
291 " gid:%d sock:%d name:%s (version %d.%d)",
292 ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
293 ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
294 ust_cmd->sock, ust_cmd->reg_msg.name,
295 ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
296
297 if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) {
298 wait_node = zmalloc(sizeof(*wait_node));
299 if (!wait_node) {
300 PERROR("zmalloc wait_node dispatch");
301 ret = close(ust_cmd->sock);
302 if (ret < 0) {
303 PERROR("close ust sock dispatch %d", ust_cmd->sock);
304 }
305 lttng_fd_put(LTTNG_FD_APPS, 1);
306 free(ust_cmd);
58f835e1 307 ust_cmd = NULL;
5d1b0219
JG
308 goto error;
309 }
310 CDS_INIT_LIST_HEAD(&wait_node->head);
311
312 /* Create application object if socket is CMD. */
313 wait_node->app = ust_app_create(&ust_cmd->reg_msg,
314 ust_cmd->sock);
315 if (!wait_node->app) {
316 ret = close(ust_cmd->sock);
317 if (ret < 0) {
318 PERROR("close ust sock dispatch %d", ust_cmd->sock);
319 }
320 lttng_fd_put(LTTNG_FD_APPS, 1);
321 free(wait_node);
58f835e1 322 wait_node = NULL;
5d1b0219 323 free(ust_cmd);
58f835e1 324 ust_cmd = NULL;
5d1b0219
JG
325 continue;
326 }
327 /*
328 * Add application to the wait queue so we can set the notify
329 * socket before putting this object in the global ht.
330 */
331 cds_list_add(&wait_node->head, &wait_queue.head);
332 wait_queue.count++;
333
334 free(ust_cmd);
58f835e1 335 ust_cmd = NULL;
5d1b0219
JG
336 /*
337 * We have to continue here since we don't have the notify
338 * socket and the application MUST be added to the hash table
339 * only at that moment.
340 */
341 continue;
342 } else {
343 /*
344 * Look for the application in the local wait queue and set the
345 * notify socket if found.
346 */
347 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
348 &wait_queue.head, head) {
349 health_code_update();
350 if (wait_node->app->pid == ust_cmd->reg_msg.pid) {
351 wait_node->app->notify_sock = ust_cmd->sock;
352 cds_list_del(&wait_node->head);
353 wait_queue.count--;
354 app = wait_node->app;
355 free(wait_node);
58f835e1 356 wait_node = NULL;
5d1b0219
JG
357 DBG3("UST app notify socket %d is set", ust_cmd->sock);
358 break;
359 }
360 }
361
362 /*
363 * With no application at this stage the received socket is
364 * basically useless so close it before we free the cmd data
365 * structure for good.
366 */
367 if (!app) {
368 ret = close(ust_cmd->sock);
369 if (ret < 0) {
370 PERROR("close ust sock dispatch %d", ust_cmd->sock);
371 }
372 lttng_fd_put(LTTNG_FD_APPS, 1);
373 }
374 free(ust_cmd);
58f835e1 375 ust_cmd = NULL;
5d1b0219
JG
376 }
377
378 if (app) {
379 /*
380 * @session_lock_list
381 *
382 * Lock the global session list so from the register up to the
383 * registration done message, no thread can see the application
384 * and change its state.
385 */
386 session_lock_list();
387 rcu_read_lock();
388
389 /*
390 * Add application to the global hash table. This needs to be
391 * done before the update to the UST registry can locate the
392 * application.
393 */
394 ust_app_add(app);
395
396 /* Set app version. This call will print an error if needed. */
397 (void) ust_app_version(app);
398
399 /* Send notify socket through the notify pipe. */
400 ret = send_socket_to_thread(
401 notifiers->apps_cmd_notify_pipe_write_fd,
402 app->notify_sock);
403 if (ret < 0) {
404 rcu_read_unlock();
405 session_unlock_list();
406 /*
407 * No notify thread, stop the UST tracing. However, this is
408 * not an internal error of the this thread thus setting
409 * the health error code to a normal exit.
410 */
411 err = 0;
412 goto error;
413 }
414
415 /*
416 * Update newly registered application with the tracing
417 * registry info already enabled information.
418 */
419 update_ust_app(app->sock);
420
421 /*
422 * Don't care about return value. Let the manage apps threads
423 * handle app unregistration upon socket close.
424 */
425 (void) ust_app_register_done(app);
426
427 /*
428 * Even if the application socket has been closed, send the app
429 * to the thread and unregistration will take place at that
430 * place.
431 */
432 ret = send_socket_to_thread(
433 notifiers->apps_cmd_pipe_write_fd,
434 app->sock);
435 if (ret < 0) {
436 rcu_read_unlock();
437 session_unlock_list();
438 /*
439 * No apps. thread, stop the UST tracing. However, this is
440 * not an internal error of the this thread thus setting
441 * the health error code to a normal exit.
442 */
443 err = 0;
444 goto error;
445 }
446
447 rcu_read_unlock();
448 session_unlock_list();
449 }
450 } while (node != NULL);
451
452 health_poll_entry();
453 /* Futex wait on queue. Blocking call on futex() */
454 futex_nto1_wait(&notifiers->ust_cmd_queue->futex);
455 health_poll_exit();
456 }
457 /* Normal exit, no error */
458 err = 0;
459
460error:
461 /* Clean up wait queue. */
462 cds_list_for_each_entry_safe(wait_node, tmp_wait_node,
463 &wait_queue.head, head) {
464 cds_list_del(&wait_node->head);
465 wait_queue.count--;
466 free(wait_node);
467 }
468
469 /* Empty command queue. */
470 for (;;) {
471 /* Dequeue command for registration */
472 node = cds_wfcq_dequeue_blocking(
473 &notifiers->ust_cmd_queue->head,
474 &notifiers->ust_cmd_queue->tail);
475 if (node == NULL) {
476 break;
477 }
478 ust_cmd = caa_container_of(node, struct ust_command, node);
479 ret = close(ust_cmd->sock);
480 if (ret < 0) {
481 PERROR("close ust sock exit dispatch %d", ust_cmd->sock);
482 }
483 lttng_fd_put(LTTNG_FD_APPS, 1);
484 free(ust_cmd);
485 }
486
487error_testpoint:
488 DBG("Dispatch thread dying");
489 if (err) {
490 health_error();
491 ERR("Health error occurred in %s", __func__);
492 }
493 health_unregister(health_sessiond);
494 rcu_unregister_thread();
495 return NULL;
496}
497
498static bool shutdown_ust_dispatch_thread(void *data)
499{
500 struct thread_notifiers *notifiers = data;
501
502 CMM_STORE_SHARED(notifiers->dispatch_thread_exit, 1);
503 futex_nto1_wake(&notifiers->ust_cmd_queue->futex);
504 return true;
505}
506
507bool launch_ust_dispatch_thread(struct ust_cmd_queue *cmd_queue,
508 int apps_cmd_pipe_write_fd,
509 int apps_cmd_notify_pipe_write_fd)
510{
511 struct lttng_thread *thread;
512 struct thread_notifiers *notifiers;
513
514 notifiers = zmalloc(sizeof(*notifiers));
515 if (!notifiers) {
516 goto error;
517 }
518 notifiers->ust_cmd_queue = cmd_queue;
519 notifiers->apps_cmd_pipe_write_fd = apps_cmd_pipe_write_fd;
520 notifiers->apps_cmd_notify_pipe_write_fd = apps_cmd_notify_pipe_write_fd;
521
522 thread = lttng_thread_create("UST registration dispatch",
523 thread_dispatch_ust_registration,
524 shutdown_ust_dispatch_thread,
525 cleanup_ust_dispatch_thread,
526 notifiers);
527 if (!thread) {
528 goto error;
529 }
530 lttng_thread_put(thread);
531 return true;
532error:
533 free(notifiers);
534 return false;
535}
This page took 0.04417 seconds and 4 git commands to generate.