Clean code base from redundant verification
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.c
CommitLineData
4ec029ed
JG
1/*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include <signal.h>
21
22#include <common/pipe.h>
23#include <common/utils.h>
24
25#include "manage-consumer.h"
26#include "testpoint.h"
27#include "health-sessiond.h"
28#include "utils.h"
29#include "thread.h"
30#include "ust-consumer.h"
31
32struct thread_notifiers {
33 struct lttng_pipe *quit_pipe;
34 struct consumer_data *consumer_data;
35 sem_t ready;
36 int initialization_result;
37};
38
39static void mark_thread_as_ready(struct thread_notifiers *notifiers)
40{
41 DBG("Marking consumer management thread as ready");
42 notifiers->initialization_result = 0;
43 sem_post(&notifiers->ready);
44}
45
46static void mark_thread_intialization_as_failed(
47 struct thread_notifiers *notifiers)
48{
49 ERR("Consumer management thread entering error state");
50 notifiers->initialization_result = -1;
51 sem_post(&notifiers->ready);
52}
53
54static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
55{
56 DBG("Waiting for consumer management thread to be ready");
57 sem_wait(&notifiers->ready);
58 DBG("Consumer management thread is ready");
59}
60
61/*
62 * This thread manage the consumer error sent back to the session daemon.
63 */
64void *thread_consumer_management(void *data)
65{
66 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
67 uint32_t revents, nb_fd;
68 enum lttcomm_return_code code;
69 struct lttng_poll_event events;
70 struct thread_notifiers *notifiers = data;
71 struct consumer_data *consumer_data = notifiers->consumer_data;
72 const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
73 struct consumer_socket *cmd_socket_wrapper = NULL;
74
75 DBG("[thread] Manage consumer started");
76
77 rcu_register_thread();
78 rcu_thread_online();
79
80 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
81
82 health_code_update();
83
84 /*
85 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
86 * metadata_sock. Nothing more will be added to this poll set.
87 */
88 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
89 if (ret < 0) {
90 mark_thread_intialization_as_failed(notifiers);
91 goto error_poll;
92 }
93
94 ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
95 if (ret < 0) {
96 mark_thread_intialization_as_failed(notifiers);
97 goto error;
98 }
99
100 /*
101 * The error socket here is already in a listening state which was done
102 * just before spawning this thread to avoid a race between the consumer
103 * daemon exec trying to connect and the listen() call.
104 */
105 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
106 if (ret < 0) {
107 mark_thread_intialization_as_failed(notifiers);
108 goto error;
109 }
110
111 health_code_update();
112
113 /* Infinite blocking call, waiting for transmission */
114 health_poll_entry();
115
116 if (testpoint(sessiond_thread_manage_consumer)) {
117 mark_thread_intialization_as_failed(notifiers);
118 goto error;
119 }
120
121 ret = lttng_poll_wait(&events, -1);
122 health_poll_exit();
123 if (ret < 0) {
124 mark_thread_intialization_as_failed(notifiers);
125 goto error;
126 }
127
128 nb_fd = ret;
129
130 for (i = 0; i < nb_fd; i++) {
131 /* Fetch once the poll data */
132 revents = LTTNG_POLL_GETEV(&events, i);
133 pollfd = LTTNG_POLL_GETFD(&events, i);
134
135 health_code_update();
136
4ec029ed
JG
137 /* Thread quit pipe has been closed. Killing thread. */
138 if (pollfd == quit_pipe_read_fd) {
139 err = 0;
140 mark_thread_intialization_as_failed(notifiers);
141 goto exit;
142 } else if (pollfd == consumer_data->err_sock) {
143 /* Event on the registration socket */
144 if (revents & LPOLLIN) {
145 continue;
146 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
147 ERR("consumer err socket poll error");
148 mark_thread_intialization_as_failed(notifiers);
149 goto error;
150 } else {
151 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
152 mark_thread_intialization_as_failed(notifiers);
153 goto error;
154 }
155 }
156 }
157
158 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
159 if (sock < 0) {
160 mark_thread_intialization_as_failed(notifiers);
161 goto error;
162 }
163
164 /*
165 * Set the CLOEXEC flag. Return code is useless because either way, the
166 * show must go on.
167 */
168 (void) utils_set_fd_cloexec(sock);
169
170 health_code_update();
171
172 DBG2("Receiving code from consumer err_sock");
173
174 /* Getting status code from kconsumerd */
175 ret = lttcomm_recv_unix_sock(sock, &code,
176 sizeof(enum lttcomm_return_code));
177 if (ret <= 0) {
178 mark_thread_intialization_as_failed(notifiers);
179 goto error;
180 }
181
182 health_code_update();
183 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
184 ERR("consumer error when waiting for SOCK_READY : %s",
185 lttcomm_get_readable_code(-code));
186 mark_thread_intialization_as_failed(notifiers);
187 goto error;
188 }
189
190 /* Connect both command and metadata sockets. */
191 consumer_data->cmd_sock =
192 lttcomm_connect_unix_sock(
193 consumer_data->cmd_unix_sock_path);
194 consumer_data->metadata_fd =
195 lttcomm_connect_unix_sock(
196 consumer_data->cmd_unix_sock_path);
197 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
198 PERROR("consumer connect cmd socket");
199 mark_thread_intialization_as_failed(notifiers);
200 goto error;
201 }
202
203 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
204
205 /* Create metadata socket lock. */
206 consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
207 if (consumer_data->metadata_sock.lock == NULL) {
208 PERROR("zmalloc pthread mutex");
209 mark_thread_intialization_as_failed(notifiers);
210 goto error;
211 }
212 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
213
214 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
215 DBG("Consumer metadata socket ready (fd: %d)",
216 consumer_data->metadata_fd);
217
218 /*
219 * Remove the consumerd error sock since we've established a connection.
220 */
221 ret = lttng_poll_del(&events, consumer_data->err_sock);
222 if (ret < 0) {
223 mark_thread_intialization_as_failed(notifiers);
224 goto error;
225 }
226
227 /* Add new accepted error socket. */
228 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
229 if (ret < 0) {
230 mark_thread_intialization_as_failed(notifiers);
231 goto error;
232 }
233
234 /* Add metadata socket that is successfully connected. */
235 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
236 LPOLLIN | LPOLLRDHUP);
237 if (ret < 0) {
238 mark_thread_intialization_as_failed(notifiers);
239 goto error;
240 }
241
242 health_code_update();
243
244 /*
245 * Transfer the write-end of the channel monitoring and rotate pipe
246 * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
247 */
248 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
249 if (!cmd_socket_wrapper) {
250 mark_thread_intialization_as_failed(notifiers);
251 goto error;
252 }
253 cmd_socket_wrapper->lock = &consumer_data->lock;
254
255 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
256 consumer_data->channel_monitor_pipe);
257 if (ret) {
258 mark_thread_intialization_as_failed(notifiers);
259 goto error;
260 }
261
262 /* Discard the socket wrapper as it is no longer needed. */
263 consumer_destroy_socket(cmd_socket_wrapper);
264 cmd_socket_wrapper = NULL;
265
266 /* The thread is completely initialized, signal that it is ready. */
267 mark_thread_as_ready(notifiers);
268
269 /* Infinite blocking call, waiting for transmission */
270 while (1) {
271 health_code_update();
272
273 /* Exit the thread because the thread quit pipe has been triggered. */
274 if (should_quit) {
275 /* Not a health error. */
276 err = 0;
277 goto exit;
278 }
279
280 health_poll_entry();
281 ret = lttng_poll_wait(&events, -1);
282 health_poll_exit();
283 if (ret < 0) {
284 goto error;
285 }
286
287 nb_fd = ret;
288
289 for (i = 0; i < nb_fd; i++) {
290 /* Fetch once the poll data */
291 revents = LTTNG_POLL_GETEV(&events, i);
292 pollfd = LTTNG_POLL_GETFD(&events, i);
293
294 health_code_update();
295
4ec029ed
JG
296 /*
297 * Thread quit pipe has been triggered, flag that we should stop
298 * but continue the current loop to handle potential data from
299 * consumer.
300 */
301 if (pollfd == quit_pipe_read_fd) {
302 should_quit = 1;
303 } else if (pollfd == sock) {
304 /* Event on the consumerd socket */
305 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
306 && !(revents & LPOLLIN)) {
307 ERR("consumer err socket second poll error");
308 goto error;
309 }
310 health_code_update();
311 /* Wait for any kconsumerd error */
312 ret = lttcomm_recv_unix_sock(sock, &code,
313 sizeof(enum lttcomm_return_code));
314 if (ret <= 0) {
315 ERR("consumer closed the command socket");
316 goto error;
317 }
318
319 ERR("consumer return code : %s",
320 lttcomm_get_readable_code(-code));
321
322 goto exit;
323 } else if (pollfd == consumer_data->metadata_fd) {
324 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
325 && !(revents & LPOLLIN)) {
326 ERR("consumer err metadata socket second poll error");
327 goto error;
328 }
329 /* UST metadata requests */
330 ret = ust_consumer_metadata_request(
331 &consumer_data->metadata_sock);
332 if (ret < 0) {
333 ERR("Handling metadata request");
334 goto error;
335 }
336 }
337 /* No need for an else branch all FDs are tested prior. */
338 }
339 health_code_update();
340 }
341
342exit:
343error:
344 /*
345 * We lock here because we are about to close the sockets and some other
346 * thread might be using them so get exclusive access which will abort all
347 * other consumer command by other threads.
348 */
349 pthread_mutex_lock(&consumer_data->lock);
350
351 /* Immediately set the consumerd state to stopped */
352 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
353 uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
354 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
355 consumer_data->type == LTTNG_CONSUMER32_UST) {
356 uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
357 } else {
358 /* Code flow error... */
359 assert(0);
360 }
361
362 if (consumer_data->err_sock >= 0) {
363 ret = close(consumer_data->err_sock);
364 if (ret) {
365 PERROR("close");
366 }
367 consumer_data->err_sock = -1;
368 }
369 if (consumer_data->cmd_sock >= 0) {
370 ret = close(consumer_data->cmd_sock);
371 if (ret) {
372 PERROR("close");
373 }
374 consumer_data->cmd_sock = -1;
375 }
376 if (consumer_data->metadata_sock.fd_ptr &&
377 *consumer_data->metadata_sock.fd_ptr >= 0) {
378 ret = close(*consumer_data->metadata_sock.fd_ptr);
379 if (ret) {
380 PERROR("close");
381 }
382 }
383 if (sock >= 0) {
384 ret = close(sock);
385 if (ret) {
386 PERROR("close");
387 }
388 }
389
390 unlink(consumer_data->err_unix_sock_path);
391 unlink(consumer_data->cmd_unix_sock_path);
392 pthread_mutex_unlock(&consumer_data->lock);
393
394 /* Cleanup metadata socket mutex. */
395 if (consumer_data->metadata_sock.lock) {
396 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
397 free(consumer_data->metadata_sock.lock);
398 }
399 lttng_poll_clean(&events);
400
401 if (cmd_socket_wrapper) {
402 consumer_destroy_socket(cmd_socket_wrapper);
403 }
404error_poll:
405 if (err) {
406 health_error();
407 ERR("Health error occurred in %s", __func__);
408 }
409 health_unregister(health_sessiond);
410 DBG("consumer thread cleanup completed");
411
412 rcu_thread_offline();
413 rcu_unregister_thread();
414
415 return NULL;
416}
417
418static bool shutdown_consumer_management_thread(void *data)
419{
420 struct thread_notifiers *notifiers = data;
421 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
422
423 return notify_thread_pipe(write_fd) == 1;
424}
425
426static void cleanup_consumer_management_thread(void *data)
427{
428 struct thread_notifiers *notifiers = data;
429
430 lttng_pipe_destroy(notifiers->quit_pipe);
431 free(notifiers);
432}
433
434bool launch_consumer_management_thread(struct consumer_data *consumer_data)
435{
436 struct lttng_pipe *quit_pipe;
437 struct thread_notifiers *notifiers = NULL;
438 struct lttng_thread *thread;
439
4ec029ed
JG
440 notifiers = zmalloc(sizeof(*notifiers));
441 if (!notifiers) {
21fa020e
JG
442 goto error_alloc;
443 }
444
445 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
446 if (!quit_pipe) {
4ec029ed
JG
447 goto error;
448 }
449 notifiers->quit_pipe = quit_pipe;
450 notifiers->consumer_data = consumer_data;
451 sem_init(&notifiers->ready, 0, 0);
452
453 thread = lttng_thread_create("Consumer management",
454 thread_consumer_management,
455 shutdown_consumer_management_thread,
456 cleanup_consumer_management_thread,
457 notifiers);
458 if (!thread) {
459 goto error;
460 }
461 wait_until_thread_is_ready(notifiers);
462 lttng_thread_put(thread);
463 if (notifiers->initialization_result) {
bd3739b0 464 return false;
4ec029ed
JG
465 }
466 return true;
467error:
468 cleanup_consumer_management_thread(notifiers);
21fa020e 469error_alloc:
4ec029ed
JG
470 return false;
471}
This page took 0.039524 seconds and 4 git commands to generate.