Run clang-format on the whole tree
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.cpp
CommitLineData
4ec029ed 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4ec029ed 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
4ec029ed 7 *
4ec029ed
JG
8 */
9
28ab034a 10#include "health-sessiond.hpp"
c9e313bc
SM
11#include "manage-consumer.hpp"
12#include "testpoint.hpp"
c9e313bc
SM
13#include "thread.hpp"
14#include "ust-consumer.hpp"
28ab034a
JG
15#include "utils.hpp"
16
17#include <common/pipe.hpp>
18#include <common/utils.hpp>
19
20#include <signal.h>
4ec029ed 21
f1494934 22namespace {
4ec029ed
JG
23struct thread_notifiers {
24 struct lttng_pipe *quit_pipe;
25 struct consumer_data *consumer_data;
52c50f8f 26 sem_t ready;
4ec029ed
JG
27 int initialization_result;
28};
f1494934 29} /* namespace */
4ec029ed
JG
30
31static void mark_thread_as_ready(struct thread_notifiers *notifiers)
32{
33 DBG("Marking consumer management thread as ready");
34 notifiers->initialization_result = 0;
35 sem_post(&notifiers->ready);
36}
37
28ab034a 38static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers)
4ec029ed 39{
52c50f8f 40 ERR("Consumer management thread entering error state");
4ec029ed
JG
41 notifiers->initialization_result = -1;
42 sem_post(&notifiers->ready);
43}
44
45static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
46{
47 DBG("Waiting for consumer management thread to be ready");
48 sem_wait(&notifiers->ready);
49 DBG("Consumer management thread is ready");
50}
51
52/*
53 * This thread manage the consumer error sent back to the session daemon.
54 */
0e0b3d3a 55static void *thread_consumer_management(void *data)
4ec029ed 56{
8a00688e
MJ
57 int sock = -1, i, ret, err = -1, should_quit = 0;
58 uint32_t nb_fd;
4ec029ed
JG
59 enum lttcomm_return_code code;
60 struct lttng_poll_event events;
7966af57 61 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed 62 struct consumer_data *consumer_data = notifiers->consumer_data;
8a00688e 63 const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
4ec029ed
JG
64 struct consumer_socket *cmd_socket_wrapper = NULL;
65
66 DBG("[thread] Manage consumer started");
67
68 rcu_register_thread();
69 rcu_thread_online();
70
412d7227 71 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
4ec029ed
JG
72
73 health_code_update();
74
75 /*
76 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
77 * metadata_sock. Nothing more will be added to this poll set.
78 */
79 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
80 if (ret < 0) {
81 mark_thread_intialization_as_failed(notifiers);
82 goto error_poll;
83 }
84
1524f98c 85 ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
4ec029ed
JG
86 if (ret < 0) {
87 mark_thread_intialization_as_failed(notifiers);
88 goto error;
89 }
90
91 /*
92 * The error socket here is already in a listening state which was done
93 * just before spawning this thread to avoid a race between the consumer
94 * daemon exec trying to connect and the listen() call.
95 */
96 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
97 if (ret < 0) {
98 mark_thread_intialization_as_failed(notifiers);
99 goto error;
100 }
101
102 health_code_update();
103
104 /* Infinite blocking call, waiting for transmission */
105 health_poll_entry();
106
107 if (testpoint(sessiond_thread_manage_consumer)) {
108 mark_thread_intialization_as_failed(notifiers);
109 goto error;
110 }
111
112 ret = lttng_poll_wait(&events, -1);
113 health_poll_exit();
114 if (ret < 0) {
115 mark_thread_intialization_as_failed(notifiers);
116 goto error;
117 }
118
119 nb_fd = ret;
120
121 for (i = 0; i < nb_fd; i++) {
122 /* Fetch once the poll data */
8a00688e
MJ
123 const auto revents = LTTNG_POLL_GETEV(&events, i);
124 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
4ec029ed
JG
125
126 health_code_update();
127
8a00688e
MJ
128 /* Activity on thread quit pipe, exiting. */
129 if (pollfd == thread_quit_pipe_fd) {
130 DBG("Activity on thread quit pipe");
4ec029ed
JG
131 err = 0;
132 mark_thread_intialization_as_failed(notifiers);
133 goto exit;
134 } else if (pollfd == consumer_data->err_sock) {
135 /* Event on the registration socket */
136 if (revents & LPOLLIN) {
137 continue;
138 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
139 ERR("consumer err socket poll error");
140 mark_thread_intialization_as_failed(notifiers);
141 goto error;
142 } else {
143 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
144 mark_thread_intialization_as_failed(notifiers);
145 goto error;
146 }
147 }
148 }
149
150 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
151 if (sock < 0) {
152 mark_thread_intialization_as_failed(notifiers);
153 goto error;
154 }
155
156 /*
157 * Set the CLOEXEC flag. Return code is useless because either way, the
158 * show must go on.
159 */
160 (void) utils_set_fd_cloexec(sock);
161
162 health_code_update();
163
164 DBG2("Receiving code from consumer err_sock");
165
166 /* Getting status code from kconsumerd */
28ab034a 167 ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
4ec029ed
JG
168 if (ret <= 0) {
169 mark_thread_intialization_as_failed(notifiers);
170 goto error;
171 }
172
173 health_code_update();
174 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
175 ERR("consumer error when waiting for SOCK_READY : %s",
28ab034a 176 lttcomm_get_readable_code((lttcomm_return_code) -code));
4ec029ed
JG
177 mark_thread_intialization_as_failed(notifiers);
178 goto error;
179 }
180
181 /* Connect both command and metadata sockets. */
28ab034a
JG
182 consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
183 consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
4ec029ed
JG
184 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
185 PERROR("consumer connect cmd socket");
186 mark_thread_intialization_as_failed(notifiers);
187 goto error;
188 }
189
190 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
191
192 /* Create metadata socket lock. */
64803277 193 consumer_data->metadata_sock.lock = zmalloc<pthread_mutex_t>();
4ec029ed
JG
194 if (consumer_data->metadata_sock.lock == NULL) {
195 PERROR("zmalloc pthread mutex");
196 mark_thread_intialization_as_failed(notifiers);
197 goto error;
198 }
199 pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
200
201 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
28ab034a 202 DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd);
4ec029ed
JG
203
204 /*
205 * Remove the consumerd error sock since we've established a connection.
206 */
207 ret = lttng_poll_del(&events, consumer_data->err_sock);
208 if (ret < 0) {
209 mark_thread_intialization_as_failed(notifiers);
210 goto error;
211 }
212
213 /* Add new accepted error socket. */
214 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
215 if (ret < 0) {
216 mark_thread_intialization_as_failed(notifiers);
217 goto error;
218 }
219
220 /* Add metadata socket that is successfully connected. */
28ab034a 221 ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP);
4ec029ed
JG
222 if (ret < 0) {
223 mark_thread_intialization_as_failed(notifiers);
224 goto error;
225 }
226
227 health_code_update();
228
229 /*
09ede842
JG
230 * Transfer the write-end of the channel monitoring pipe to the consumer
231 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
4ec029ed
JG
232 */
233 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
234 if (!cmd_socket_wrapper) {
235 mark_thread_intialization_as_failed(notifiers);
236 goto error;
237 }
238 cmd_socket_wrapper->lock = &consumer_data->lock;
239
09ede842 240 pthread_mutex_lock(cmd_socket_wrapper->lock);
412d7227 241 ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
09ede842
JG
242 if (ret) {
243 ERR("Failed to send sessiond uuid to consumer daemon");
244 mark_thread_intialization_as_failed(notifiers);
245 pthread_mutex_unlock(cmd_socket_wrapper->lock);
246 goto error;
247 }
248 pthread_mutex_unlock(cmd_socket_wrapper->lock);
249
4ec029ed 250 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
28ab034a 251 consumer_data->channel_monitor_pipe);
4ec029ed
JG
252 if (ret) {
253 mark_thread_intialization_as_failed(notifiers);
254 goto error;
255 }
256
257 /* Discard the socket wrapper as it is no longer needed. */
258 consumer_destroy_socket(cmd_socket_wrapper);
259 cmd_socket_wrapper = NULL;
260
261 /* The thread is completely initialized, signal that it is ready. */
262 mark_thread_as_ready(notifiers);
263
264 /* Infinite blocking call, waiting for transmission */
265 while (1) {
266 health_code_update();
267
268 /* Exit the thread because the thread quit pipe has been triggered. */
269 if (should_quit) {
270 /* Not a health error. */
271 err = 0;
272 goto exit;
273 }
274
275 health_poll_entry();
276 ret = lttng_poll_wait(&events, -1);
277 health_poll_exit();
278 if (ret < 0) {
279 goto error;
280 }
281
282 nb_fd = ret;
283
284 for (i = 0; i < nb_fd; i++) {
285 /* Fetch once the poll data */
8a00688e
MJ
286 const auto revents = LTTNG_POLL_GETEV(&events, i);
287 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
4ec029ed
JG
288
289 health_code_update();
290
4ec029ed
JG
291 /*
292 * Thread quit pipe has been triggered, flag that we should stop
293 * but continue the current loop to handle potential data from
294 * consumer.
295 */
8a00688e 296 if (pollfd == thread_quit_pipe_fd) {
4ec029ed
JG
297 should_quit = 1;
298 } else if (pollfd == sock) {
299 /* Event on the consumerd socket */
28ab034a
JG
300 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
301 !(revents & LPOLLIN)) {
4ec029ed
JG
302 ERR("consumer err socket second poll error");
303 goto error;
304 }
305 health_code_update();
306 /* Wait for any kconsumerd error */
28ab034a
JG
307 ret = lttcomm_recv_unix_sock(
308 sock, &code, sizeof(enum lttcomm_return_code));
4ec029ed
JG
309 if (ret <= 0) {
310 ERR("consumer closed the command socket");
311 goto error;
312 }
313
314 ERR("consumer return code : %s",
28ab034a 315 lttcomm_get_readable_code((lttcomm_return_code) -code));
4ec029ed
JG
316
317 goto exit;
318 } else if (pollfd == consumer_data->metadata_fd) {
28ab034a
JG
319 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
320 !(revents & LPOLLIN)) {
4ec029ed
JG
321 ERR("consumer err metadata socket second poll error");
322 goto error;
323 }
324 /* UST metadata requests */
28ab034a 325 ret = ust_consumer_metadata_request(&consumer_data->metadata_sock);
4ec029ed
JG
326 if (ret < 0) {
327 ERR("Handling metadata request");
328 goto error;
329 }
330 }
331 /* No need for an else branch all FDs are tested prior. */
332 }
333 health_code_update();
334 }
335
336exit:
337error:
338 /*
339 * We lock here because we are about to close the sockets and some other
340 * thread might be using them so get exclusive access which will abort all
341 * other consumer command by other threads.
342 */
343 pthread_mutex_lock(&consumer_data->lock);
344
345 /* Immediately set the consumerd state to stopped */
346 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
412d7227 347 uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
4ec029ed 348 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
28ab034a 349 consumer_data->type == LTTNG_CONSUMER32_UST) {
412d7227 350 uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
4ec029ed
JG
351 } else {
352 /* Code flow error... */
a0377dfe 353 abort();
4ec029ed
JG
354 }
355
356 if (consumer_data->err_sock >= 0) {
357 ret = close(consumer_data->err_sock);
358 if (ret) {
359 PERROR("close");
360 }
361 consumer_data->err_sock = -1;
362 }
363 if (consumer_data->cmd_sock >= 0) {
364 ret = close(consumer_data->cmd_sock);
365 if (ret) {
366 PERROR("close");
367 }
368 consumer_data->cmd_sock = -1;
369 }
28ab034a 370 if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) {
4ec029ed
JG
371 ret = close(*consumer_data->metadata_sock.fd_ptr);
372 if (ret) {
373 PERROR("close");
374 }
375 }
376 if (sock >= 0) {
377 ret = close(sock);
378 if (ret) {
379 PERROR("close");
380 }
381 }
382
383 unlink(consumer_data->err_unix_sock_path);
384 unlink(consumer_data->cmd_unix_sock_path);
385 pthread_mutex_unlock(&consumer_data->lock);
386
387 /* Cleanup metadata socket mutex. */
388 if (consumer_data->metadata_sock.lock) {
389 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
390 free(consumer_data->metadata_sock.lock);
391 }
392 lttng_poll_clean(&events);
393
394 if (cmd_socket_wrapper) {
395 consumer_destroy_socket(cmd_socket_wrapper);
396 }
397error_poll:
398 if (err) {
399 health_error();
400 ERR("Health error occurred in %s", __func__);
401 }
412d7227 402 health_unregister(the_health_sessiond);
4ec029ed
JG
403 DBG("consumer thread cleanup completed");
404
405 rcu_thread_offline();
406 rcu_unregister_thread();
407
408 return NULL;
409}
410
411static bool shutdown_consumer_management_thread(void *data)
412{
7966af57 413 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
414 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
415
416 return notify_thread_pipe(write_fd) == 1;
417}
418
419static void cleanup_consumer_management_thread(void *data)
420{
7966af57 421 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
422
423 lttng_pipe_destroy(notifiers->quit_pipe);
424 free(notifiers);
425}
426
427bool launch_consumer_management_thread(struct consumer_data *consumer_data)
428{
429 struct lttng_pipe *quit_pipe;
430 struct thread_notifiers *notifiers = NULL;
431 struct lttng_thread *thread;
432
64803277 433 notifiers = zmalloc<thread_notifiers>();
4ec029ed 434 if (!notifiers) {
21fa020e
JG
435 goto error_alloc;
436 }
437
438 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
439 if (!quit_pipe) {
4ec029ed
JG
440 goto error;
441 }
442 notifiers->quit_pipe = quit_pipe;
443 notifiers->consumer_data = consumer_data;
444 sem_init(&notifiers->ready, 0, 0);
445
446 thread = lttng_thread_create("Consumer management",
28ab034a
JG
447 thread_consumer_management,
448 shutdown_consumer_management_thread,
449 cleanup_consumer_management_thread,
450 notifiers);
4ec029ed
JG
451 if (!thread) {
452 goto error;
453 }
454 wait_until_thread_is_ready(notifiers);
455 lttng_thread_put(thread);
456 if (notifiers->initialization_result) {
bd3739b0 457 return false;
4ec029ed
JG
458 }
459 return true;
460error:
461 cleanup_consumer_management_thread(notifiers);
21fa020e 462error_alloc:
4ec029ed
JG
463 return false;
464}
This page took 0.073583 seconds and 4 git commands to generate.