Fix: consumerd: type confusion in lttng_consumer_send_error
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.c
CommitLineData
4ec029ed 1/*
90c106c6 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4ec029ed 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
4ec029ed 7 *
4ec029ed
JG
8 */
9
10#include <signal.h>
11
12#include <common/pipe.h>
13#include <common/utils.h>
14
15#include "manage-consumer.h"
16#include "testpoint.h"
17#include "health-sessiond.h"
18#include "utils.h"
19#include "thread.h"
20#include "ust-consumer.h"
21
22struct thread_notifiers {
23 struct lttng_pipe *quit_pipe;
24 struct consumer_data *consumer_data;
52c50f8f 25 sem_t ready;
4ec029ed
JG
26 int initialization_result;
27};
28
29static void mark_thread_as_ready(struct thread_notifiers *notifiers)
30{
31 DBG("Marking consumer management thread as ready");
32 notifiers->initialization_result = 0;
33 sem_post(&notifiers->ready);
34}
35
36static void mark_thread_intialization_as_failed(
37 struct thread_notifiers *notifiers)
38{
52c50f8f 39 ERR("Consumer management thread entering error state");
4ec029ed
JG
40 notifiers->initialization_result = -1;
41 sem_post(&notifiers->ready);
42}
43
44static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
45{
46 DBG("Waiting for consumer management thread to be ready");
47 sem_wait(&notifiers->ready);
48 DBG("Consumer management thread is ready");
49}
50
51/*
52 * This thread manage the consumer error sent back to the session daemon.
53 */
0e0b3d3a 54static void *thread_consumer_management(void *data)
4ec029ed
JG
55{
56 int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
57 uint32_t revents, nb_fd;
58 enum lttcomm_return_code code;
59 struct lttng_poll_event events;
60 struct thread_notifiers *notifiers = data;
61 struct consumer_data *consumer_data = notifiers->consumer_data;
62 const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
63 struct consumer_socket *cmd_socket_wrapper = NULL;
64
65 DBG("[thread] Manage consumer started");
66
67 rcu_register_thread();
68 rcu_thread_online();
69
412d7227 70 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
4ec029ed
JG
71
72 health_code_update();
73
74 /*
75 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
76 * metadata_sock. Nothing more will be added to this poll set.
77 */
78 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
79 if (ret < 0) {
80 mark_thread_intialization_as_failed(notifiers);
81 goto error_poll;
82 }
83
84 ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
85 if (ret < 0) {
86 mark_thread_intialization_as_failed(notifiers);
87 goto error;
88 }
89
90 /*
91 * The error socket here is already in a listening state which was done
92 * just before spawning this thread to avoid a race between the consumer
93 * daemon exec trying to connect and the listen() call.
94 */
95 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
96 if (ret < 0) {
97 mark_thread_intialization_as_failed(notifiers);
98 goto error;
99 }
100
101 health_code_update();
102
103 /* Infinite blocking call, waiting for transmission */
104 health_poll_entry();
105
106 if (testpoint(sessiond_thread_manage_consumer)) {
107 mark_thread_intialization_as_failed(notifiers);
108 goto error;
109 }
110
111 ret = lttng_poll_wait(&events, -1);
112 health_poll_exit();
113 if (ret < 0) {
114 mark_thread_intialization_as_failed(notifiers);
115 goto error;
116 }
117
118 nb_fd = ret;
119
120 for (i = 0; i < nb_fd; i++) {
121 /* Fetch once the poll data */
122 revents = LTTNG_POLL_GETEV(&events, i);
123 pollfd = LTTNG_POLL_GETFD(&events, i);
124
125 health_code_update();
126
4ec029ed
JG
127 /* Thread quit pipe has been closed. Killing thread. */
128 if (pollfd == quit_pipe_read_fd) {
129 err = 0;
130 mark_thread_intialization_as_failed(notifiers);
131 goto exit;
132 } else if (pollfd == consumer_data->err_sock) {
133 /* Event on the registration socket */
134 if (revents & LPOLLIN) {
135 continue;
136 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
137 ERR("consumer err socket poll error");
138 mark_thread_intialization_as_failed(notifiers);
139 goto error;
140 } else {
141 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
142 mark_thread_intialization_as_failed(notifiers);
143 goto error;
144 }
145 }
146 }
147
148 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
149 if (sock < 0) {
150 mark_thread_intialization_as_failed(notifiers);
151 goto error;
152 }
153
154 /*
155 * Set the CLOEXEC flag. Return code is useless because either way, the
156 * show must go on.
157 */
158 (void) utils_set_fd_cloexec(sock);
159
160 health_code_update();
161
162 DBG2("Receiving code from consumer err_sock");
163
a73d9496
JG
164 /* Getting status code from consumerd */
165 {
166 int32_t comm_code = 0;
167
168 ret = lttcomm_recv_unix_sock(sock, &comm_code, sizeof(comm_code));
169 code = (typeof(code)) comm_code;
170 }
4ec029ed
JG
171 if (ret <= 0) {
172 mark_thread_intialization_as_failed(notifiers);
173 goto error;
174 }
175
176 health_code_update();
177 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
178 ERR("consumer error when waiting for SOCK_READY : %s",
179 lttcomm_get_readable_code(-code));
180 mark_thread_intialization_as_failed(notifiers);
181 goto error;
182 }
183
184 /* Connect both command and metadata sockets. */
185 consumer_data->cmd_sock =
186 lttcomm_connect_unix_sock(
187 consumer_data->cmd_unix_sock_path);
188 consumer_data->metadata_fd =
189 lttcomm_connect_unix_sock(
190 consumer_data->cmd_unix_sock_path);
191 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
192 PERROR("consumer connect cmd socket");
193 mark_thread_intialization_as_failed(notifiers);
194 goto error;
195 }
196
197 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
198
199 /* Create metadata socket lock. */
200 consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
201 if (consumer_data->metadata_sock.lock == NULL) {
202 PERROR("zmalloc pthread mutex");
203 mark_thread_intialization_as_failed(notifiers);
204 goto error;
205 }
206 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
207
208 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
209 DBG("Consumer metadata socket ready (fd: %d)",
210 consumer_data->metadata_fd);
211
212 /*
213 * Remove the consumerd error sock since we've established a connection.
214 */
215 ret = lttng_poll_del(&events, consumer_data->err_sock);
216 if (ret < 0) {
217 mark_thread_intialization_as_failed(notifiers);
218 goto error;
219 }
220
221 /* Add new accepted error socket. */
222 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
223 if (ret < 0) {
224 mark_thread_intialization_as_failed(notifiers);
225 goto error;
226 }
227
228 /* Add metadata socket that is successfully connected. */
229 ret = lttng_poll_add(&events, consumer_data->metadata_fd,
230 LPOLLIN | LPOLLRDHUP);
231 if (ret < 0) {
232 mark_thread_intialization_as_failed(notifiers);
233 goto error;
234 }
235
236 health_code_update();
237
238 /*
09ede842
JG
239 * Transfer the write-end of the channel monitoring pipe to the consumer
240 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
4ec029ed
JG
241 */
242 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
243 if (!cmd_socket_wrapper) {
244 mark_thread_intialization_as_failed(notifiers);
245 goto error;
246 }
247 cmd_socket_wrapper->lock = &consumer_data->lock;
248
09ede842 249 pthread_mutex_lock(cmd_socket_wrapper->lock);
412d7227 250 ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
09ede842
JG
251 if (ret) {
252 ERR("Failed to send sessiond uuid to consumer daemon");
253 mark_thread_intialization_as_failed(notifiers);
254 pthread_mutex_unlock(cmd_socket_wrapper->lock);
255 goto error;
256 }
257 pthread_mutex_unlock(cmd_socket_wrapper->lock);
258
4ec029ed
JG
259 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
260 consumer_data->channel_monitor_pipe);
261 if (ret) {
262 mark_thread_intialization_as_failed(notifiers);
263 goto error;
264 }
265
266 /* Discard the socket wrapper as it is no longer needed. */
267 consumer_destroy_socket(cmd_socket_wrapper);
268 cmd_socket_wrapper = NULL;
269
270 /* The thread is completely initialized, signal that it is ready. */
271 mark_thread_as_ready(notifiers);
272
273 /* Infinite blocking call, waiting for transmission */
274 while (1) {
275 health_code_update();
276
277 /* Exit the thread because the thread quit pipe has been triggered. */
278 if (should_quit) {
279 /* Not a health error. */
280 err = 0;
281 goto exit;
282 }
283
284 health_poll_entry();
285 ret = lttng_poll_wait(&events, -1);
286 health_poll_exit();
287 if (ret < 0) {
288 goto error;
289 }
290
291 nb_fd = ret;
292
293 for (i = 0; i < nb_fd; i++) {
294 /* Fetch once the poll data */
295 revents = LTTNG_POLL_GETEV(&events, i);
296 pollfd = LTTNG_POLL_GETFD(&events, i);
297
298 health_code_update();
299
4ec029ed
JG
300 /*
301 * Thread quit pipe has been triggered, flag that we should stop
302 * but continue the current loop to handle potential data from
303 * consumer.
304 */
305 if (pollfd == quit_pipe_read_fd) {
306 should_quit = 1;
307 } else if (pollfd == sock) {
308 /* Event on the consumerd socket */
309 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
310 && !(revents & LPOLLIN)) {
311 ERR("consumer err socket second poll error");
312 goto error;
313 }
314 health_code_update();
a73d9496
JG
315 /* Wait for any consumerd error */
316 {
317 int32_t comm_code = 0;
318
319 ret = lttcomm_recv_unix_sock(
320 sock, &comm_code, sizeof(comm_code));
321 code = (typeof(code)) comm_code;
322 }
4ec029ed
JG
323 if (ret <= 0) {
324 ERR("consumer closed the command socket");
325 goto error;
326 }
327
328 ERR("consumer return code : %s",
329 lttcomm_get_readable_code(-code));
330
331 goto exit;
332 } else if (pollfd == consumer_data->metadata_fd) {
333 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
334 && !(revents & LPOLLIN)) {
335 ERR("consumer err metadata socket second poll error");
336 goto error;
337 }
338 /* UST metadata requests */
339 ret = ust_consumer_metadata_request(
340 &consumer_data->metadata_sock);
341 if (ret < 0) {
342 ERR("Handling metadata request");
343 goto error;
344 }
345 }
346 /* No need for an else branch all FDs are tested prior. */
347 }
348 health_code_update();
349 }
350
351exit:
352error:
353 /*
354 * We lock here because we are about to close the sockets and some other
355 * thread might be using them so get exclusive access which will abort all
356 * other consumer command by other threads.
357 */
358 pthread_mutex_lock(&consumer_data->lock);
359
360 /* Immediately set the consumerd state to stopped */
361 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
412d7227 362 uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
4ec029ed
JG
363 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
364 consumer_data->type == LTTNG_CONSUMER32_UST) {
412d7227 365 uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
4ec029ed
JG
366 } else {
367 /* Code flow error... */
368 assert(0);
369 }
370
371 if (consumer_data->err_sock >= 0) {
372 ret = close(consumer_data->err_sock);
373 if (ret) {
374 PERROR("close");
375 }
376 consumer_data->err_sock = -1;
377 }
378 if (consumer_data->cmd_sock >= 0) {
379 ret = close(consumer_data->cmd_sock);
380 if (ret) {
381 PERROR("close");
382 }
383 consumer_data->cmd_sock = -1;
384 }
385 if (consumer_data->metadata_sock.fd_ptr &&
386 *consumer_data->metadata_sock.fd_ptr >= 0) {
387 ret = close(*consumer_data->metadata_sock.fd_ptr);
388 if (ret) {
389 PERROR("close");
390 }
391 }
392 if (sock >= 0) {
393 ret = close(sock);
394 if (ret) {
395 PERROR("close");
396 }
397 }
398
399 unlink(consumer_data->err_unix_sock_path);
400 unlink(consumer_data->cmd_unix_sock_path);
401 pthread_mutex_unlock(&consumer_data->lock);
402
403 /* Cleanup metadata socket mutex. */
404 if (consumer_data->metadata_sock.lock) {
405 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
406 free(consumer_data->metadata_sock.lock);
407 }
408 lttng_poll_clean(&events);
409
410 if (cmd_socket_wrapper) {
411 consumer_destroy_socket(cmd_socket_wrapper);
412 }
413error_poll:
414 if (err) {
415 health_error();
416 ERR("Health error occurred in %s", __func__);
417 }
412d7227 418 health_unregister(the_health_sessiond);
4ec029ed
JG
419 DBG("consumer thread cleanup completed");
420
421 rcu_thread_offline();
422 rcu_unregister_thread();
423
424 return NULL;
425}
426
427static bool shutdown_consumer_management_thread(void *data)
428{
429 struct thread_notifiers *notifiers = data;
430 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
431
432 return notify_thread_pipe(write_fd) == 1;
433}
434
435static void cleanup_consumer_management_thread(void *data)
436{
437 struct thread_notifiers *notifiers = data;
438
439 lttng_pipe_destroy(notifiers->quit_pipe);
440 free(notifiers);
441}
442
443bool launch_consumer_management_thread(struct consumer_data *consumer_data)
444{
445 struct lttng_pipe *quit_pipe;
446 struct thread_notifiers *notifiers = NULL;
447 struct lttng_thread *thread;
448
4ec029ed
JG
449 notifiers = zmalloc(sizeof(*notifiers));
450 if (!notifiers) {
21fa020e
JG
451 goto error_alloc;
452 }
453
454 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
455 if (!quit_pipe) {
4ec029ed
JG
456 goto error;
457 }
458 notifiers->quit_pipe = quit_pipe;
459 notifiers->consumer_data = consumer_data;
460 sem_init(&notifiers->ready, 0, 0);
461
462 thread = lttng_thread_create("Consumer management",
463 thread_consumer_management,
464 shutdown_consumer_management_thread,
465 cleanup_consumer_management_thread,
466 notifiers);
467 if (!thread) {
468 goto error;
469 }
470 wait_until_thread_is_ready(notifiers);
471 lttng_thread_put(thread);
472 if (notifiers->initialization_result) {
bd3739b0 473 return false;
4ec029ed
JG
474 }
475 return true;
476error:
477 cleanup_consumer_management_thread(notifiers);
21fa020e 478error_alloc:
4ec029ed
JG
479 return false;
480}
This page took 0.061351 seconds and 4 git commands to generate.