Fix: syscall event rule: emission sites not compared in is_equal
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.cpp
CommitLineData
4ec029ed 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4ec029ed 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
4ec029ed 7 *
4ec029ed
JG
8 */
9
28ab034a 10#include "health-sessiond.hpp"
c9e313bc
SM
11#include "manage-consumer.hpp"
12#include "testpoint.hpp"
c9e313bc
SM
13#include "thread.hpp"
14#include "ust-consumer.hpp"
28ab034a
JG
15#include "utils.hpp"
16
17#include <common/pipe.hpp>
18#include <common/utils.hpp>
19
671e39d7 20#include <fcntl.h>
28ab034a 21#include <signal.h>
4ec029ed 22
f1494934 23namespace {
4ec029ed
JG
24struct thread_notifiers {
25 struct lttng_pipe *quit_pipe;
26 struct consumer_data *consumer_data;
52c50f8f 27 sem_t ready;
4ec029ed
JG
28 int initialization_result;
29};
f1494934 30} /* namespace */
4ec029ed
JG
31
32static void mark_thread_as_ready(struct thread_notifiers *notifiers)
33{
34 DBG("Marking consumer management thread as ready");
35 notifiers->initialization_result = 0;
36 sem_post(&notifiers->ready);
37}
38
28ab034a 39static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers)
4ec029ed 40{
52c50f8f 41 ERR("Consumer management thread entering error state");
4ec029ed
JG
42 notifiers->initialization_result = -1;
43 sem_post(&notifiers->ready);
44}
45
46static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
47{
48 DBG("Waiting for consumer management thread to be ready");
49 sem_wait(&notifiers->ready);
50 DBG("Consumer management thread is ready");
51}
52
53/*
54 * This thread manage the consumer error sent back to the session daemon.
55 */
0e0b3d3a 56static void *thread_consumer_management(void *data)
4ec029ed 57{
8a00688e
MJ
58 int sock = -1, i, ret, err = -1, should_quit = 0;
59 uint32_t nb_fd;
4ec029ed
JG
60 enum lttcomm_return_code code;
61 struct lttng_poll_event events;
7966af57 62 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed 63 struct consumer_data *consumer_data = notifiers->consumer_data;
8a00688e 64 const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
cd9adb8b 65 struct consumer_socket *cmd_socket_wrapper = nullptr;
4ec029ed
JG
66
67 DBG("[thread] Manage consumer started");
68
69 rcu_register_thread();
70 rcu_thread_online();
71
412d7227 72 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
4ec029ed
JG
73
74 health_code_update();
75
76 /*
77 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
78 * metadata_sock. Nothing more will be added to this poll set.
79 */
80 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
81 if (ret < 0) {
82 mark_thread_intialization_as_failed(notifiers);
83 goto error_poll;
84 }
85
1524f98c 86 ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
4ec029ed
JG
87 if (ret < 0) {
88 mark_thread_intialization_as_failed(notifiers);
89 goto error;
90 }
91
92 /*
93 * The error socket here is already in a listening state which was done
94 * just before spawning this thread to avoid a race between the consumer
95 * daemon exec trying to connect and the listen() call.
96 */
97 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
98 if (ret < 0) {
99 mark_thread_intialization_as_failed(notifiers);
100 goto error;
101 }
102
103 health_code_update();
104
105 /* Infinite blocking call, waiting for transmission */
106 health_poll_entry();
107
108 if (testpoint(sessiond_thread_manage_consumer)) {
109 mark_thread_intialization_as_failed(notifiers);
110 goto error;
111 }
112
113 ret = lttng_poll_wait(&events, -1);
114 health_poll_exit();
115 if (ret < 0) {
116 mark_thread_intialization_as_failed(notifiers);
117 goto error;
118 }
119
120 nb_fd = ret;
121
122 for (i = 0; i < nb_fd; i++) {
123 /* Fetch once the poll data */
8a00688e
MJ
124 const auto revents = LTTNG_POLL_GETEV(&events, i);
125 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
4ec029ed
JG
126
127 health_code_update();
128
8a00688e
MJ
129 /* Activity on thread quit pipe, exiting. */
130 if (pollfd == thread_quit_pipe_fd) {
131 DBG("Activity on thread quit pipe");
4ec029ed
JG
132 err = 0;
133 mark_thread_intialization_as_failed(notifiers);
134 goto exit;
135 } else if (pollfd == consumer_data->err_sock) {
136 /* Event on the registration socket */
137 if (revents & LPOLLIN) {
138 continue;
139 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
140 ERR("consumer err socket poll error");
141 mark_thread_intialization_as_failed(notifiers);
142 goto error;
143 } else {
144 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
145 mark_thread_intialization_as_failed(notifiers);
146 goto error;
147 }
148 }
149 }
150
151 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
152 if (sock < 0) {
153 mark_thread_intialization_as_failed(notifiers);
154 goto error;
155 }
156
157 /*
158 * Set the CLOEXEC flag. Return code is useless because either way, the
159 * show must go on.
160 */
161 (void) utils_set_fd_cloexec(sock);
162
163 health_code_update();
164
165 DBG2("Receiving code from consumer err_sock");
166
fbd566c2
JG
167 /* Getting status code from consumerd */
168 {
169 std::int32_t comm_code = 0;
170
171 ret = lttcomm_recv_unix_sock(sock, &comm_code, sizeof(comm_code));
172 code = static_cast<decltype(code)>(comm_code);
173 }
4ec029ed
JG
174 if (ret <= 0) {
175 mark_thread_intialization_as_failed(notifiers);
176 goto error;
177 }
178
179 health_code_update();
180 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
181 ERR("consumer error when waiting for SOCK_READY : %s",
28ab034a 182 lttcomm_get_readable_code((lttcomm_return_code) -code));
4ec029ed
JG
183 mark_thread_intialization_as_failed(notifiers);
184 goto error;
185 }
186
187 /* Connect both command and metadata sockets. */
28ab034a
JG
188 consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
189 consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
4ec029ed
JG
190 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
191 PERROR("consumer connect cmd socket");
192 mark_thread_intialization_as_failed(notifiers);
193 goto error;
194 }
195
196 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
197
198 /* Create metadata socket lock. */
64803277 199 consumer_data->metadata_sock.lock = zmalloc<pthread_mutex_t>();
cd9adb8b 200 if (consumer_data->metadata_sock.lock == nullptr) {
4ec029ed
JG
201 PERROR("zmalloc pthread mutex");
202 mark_thread_intialization_as_failed(notifiers);
203 goto error;
204 }
cd9adb8b 205 pthread_mutex_init(consumer_data->metadata_sock.lock, nullptr);
4ec029ed
JG
206
207 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
28ab034a 208 DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd);
4ec029ed
JG
209
210 /*
211 * Remove the consumerd error sock since we've established a connection.
212 */
213 ret = lttng_poll_del(&events, consumer_data->err_sock);
214 if (ret < 0) {
215 mark_thread_intialization_as_failed(notifiers);
216 goto error;
217 }
218
219 /* Add new accepted error socket. */
220 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
221 if (ret < 0) {
222 mark_thread_intialization_as_failed(notifiers);
223 goto error;
224 }
225
226 /* Add metadata socket that is successfully connected. */
28ab034a 227 ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP);
4ec029ed
JG
228 if (ret < 0) {
229 mark_thread_intialization_as_failed(notifiers);
230 goto error;
231 }
232
233 health_code_update();
234
235 /*
09ede842
JG
236 * Transfer the write-end of the channel monitoring pipe to the consumer
237 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
4ec029ed
JG
238 */
239 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
240 if (!cmd_socket_wrapper) {
241 mark_thread_intialization_as_failed(notifiers);
242 goto error;
243 }
244 cmd_socket_wrapper->lock = &consumer_data->lock;
245
09ede842 246 pthread_mutex_lock(cmd_socket_wrapper->lock);
412d7227 247 ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
09ede842
JG
248 if (ret) {
249 ERR("Failed to send sessiond uuid to consumer daemon");
250 mark_thread_intialization_as_failed(notifiers);
251 pthread_mutex_unlock(cmd_socket_wrapper->lock);
252 goto error;
253 }
254 pthread_mutex_unlock(cmd_socket_wrapper->lock);
255
4ec029ed 256 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
28ab034a 257 consumer_data->channel_monitor_pipe);
4ec029ed
JG
258 if (ret) {
259 mark_thread_intialization_as_failed(notifiers);
260 goto error;
261 }
262
263 /* Discard the socket wrapper as it is no longer needed. */
264 consumer_destroy_socket(cmd_socket_wrapper);
cd9adb8b 265 cmd_socket_wrapper = nullptr;
4ec029ed
JG
266
267 /* The thread is completely initialized, signal that it is ready. */
268 mark_thread_as_ready(notifiers);
269
270 /* Infinite blocking call, waiting for transmission */
cd9adb8b 271 while (true) {
4ec029ed
JG
272 health_code_update();
273
274 /* Exit the thread because the thread quit pipe has been triggered. */
275 if (should_quit) {
276 /* Not a health error. */
277 err = 0;
278 goto exit;
279 }
280
281 health_poll_entry();
282 ret = lttng_poll_wait(&events, -1);
283 health_poll_exit();
284 if (ret < 0) {
285 goto error;
286 }
287
288 nb_fd = ret;
289
290 for (i = 0; i < nb_fd; i++) {
291 /* Fetch once the poll data */
8a00688e
MJ
292 const auto revents = LTTNG_POLL_GETEV(&events, i);
293 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
4ec029ed
JG
294
295 health_code_update();
296
4ec029ed
JG
297 /*
298 * Thread quit pipe has been triggered, flag that we should stop
299 * but continue the current loop to handle potential data from
300 * consumer.
301 */
8a00688e 302 if (pollfd == thread_quit_pipe_fd) {
4ec029ed
JG
303 should_quit = 1;
304 } else if (pollfd == sock) {
305 /* Event on the consumerd socket */
28ab034a
JG
306 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
307 !(revents & LPOLLIN)) {
4ec029ed
JG
308 ERR("consumer err socket second poll error");
309 goto error;
310 }
311 health_code_update();
fbd566c2
JG
312 /* Wait for any consumerd error */
313 {
314 std::int32_t comm_code = 0;
315
316 ret = lttcomm_recv_unix_sock(
317 sock, &comm_code, sizeof(comm_code));
318 code = static_cast<decltype(code)>(comm_code);
319 }
4ec029ed
JG
320 if (ret <= 0) {
321 ERR("consumer closed the command socket");
322 goto error;
323 }
324
325 ERR("consumer return code : %s",
28ab034a 326 lttcomm_get_readable_code((lttcomm_return_code) -code));
4ec029ed
JG
327
328 goto exit;
329 } else if (pollfd == consumer_data->metadata_fd) {
28ab034a
JG
330 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
331 !(revents & LPOLLIN)) {
4ec029ed
JG
332 ERR("consumer err metadata socket second poll error");
333 goto error;
334 }
335 /* UST metadata requests */
28ab034a 336 ret = ust_consumer_metadata_request(&consumer_data->metadata_sock);
4ec029ed
JG
337 if (ret < 0) {
338 ERR("Handling metadata request");
339 goto error;
340 }
341 }
342 /* No need for an else branch all FDs are tested prior. */
343 }
344 health_code_update();
345 }
346
347exit:
348error:
349 /*
350 * We lock here because we are about to close the sockets and some other
351 * thread might be using them so get exclusive access which will abort all
352 * other consumer command by other threads.
353 */
354 pthread_mutex_lock(&consumer_data->lock);
355
356 /* Immediately set the consumerd state to stopped */
357 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
412d7227 358 uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
4ec029ed 359 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
28ab034a 360 consumer_data->type == LTTNG_CONSUMER32_UST) {
412d7227 361 uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
4ec029ed
JG
362 } else {
363 /* Code flow error... */
a0377dfe 364 abort();
4ec029ed
JG
365 }
366
367 if (consumer_data->err_sock >= 0) {
368 ret = close(consumer_data->err_sock);
369 if (ret) {
370 PERROR("close");
371 }
372 consumer_data->err_sock = -1;
373 }
374 if (consumer_data->cmd_sock >= 0) {
375 ret = close(consumer_data->cmd_sock);
376 if (ret) {
377 PERROR("close");
378 }
379 consumer_data->cmd_sock = -1;
380 }
28ab034a 381 if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) {
4ec029ed
JG
382 ret = close(*consumer_data->metadata_sock.fd_ptr);
383 if (ret) {
384 PERROR("close");
385 }
386 }
387 if (sock >= 0) {
388 ret = close(sock);
389 if (ret) {
390 PERROR("close");
391 }
392 }
393
394 unlink(consumer_data->err_unix_sock_path);
395 unlink(consumer_data->cmd_unix_sock_path);
396 pthread_mutex_unlock(&consumer_data->lock);
397
398 /* Cleanup metadata socket mutex. */
399 if (consumer_data->metadata_sock.lock) {
400 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
401 free(consumer_data->metadata_sock.lock);
402 }
403 lttng_poll_clean(&events);
404
405 if (cmd_socket_wrapper) {
406 consumer_destroy_socket(cmd_socket_wrapper);
407 }
408error_poll:
409 if (err) {
410 health_error();
411 ERR("Health error occurred in %s", __func__);
412 }
412d7227 413 health_unregister(the_health_sessiond);
4ec029ed
JG
414 DBG("consumer thread cleanup completed");
415
416 rcu_thread_offline();
417 rcu_unregister_thread();
418
cd9adb8b 419 return nullptr;
4ec029ed
JG
420}
421
422static bool shutdown_consumer_management_thread(void *data)
423{
7966af57 424 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
425 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
426
427 return notify_thread_pipe(write_fd) == 1;
428}
429
430static void cleanup_consumer_management_thread(void *data)
431{
7966af57 432 struct thread_notifiers *notifiers = (thread_notifiers *) data;
4ec029ed
JG
433
434 lttng_pipe_destroy(notifiers->quit_pipe);
435 free(notifiers);
436}
437
438bool launch_consumer_management_thread(struct consumer_data *consumer_data)
439{
440 struct lttng_pipe *quit_pipe;
cd9adb8b 441 struct thread_notifiers *notifiers = nullptr;
4ec029ed
JG
442 struct lttng_thread *thread;
443
64803277 444 notifiers = zmalloc<thread_notifiers>();
4ec029ed 445 if (!notifiers) {
21fa020e
JG
446 goto error_alloc;
447 }
448
449 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
450 if (!quit_pipe) {
4ec029ed
JG
451 goto error;
452 }
453 notifiers->quit_pipe = quit_pipe;
454 notifiers->consumer_data = consumer_data;
455 sem_init(&notifiers->ready, 0, 0);
456
457 thread = lttng_thread_create("Consumer management",
28ab034a
JG
458 thread_consumer_management,
459 shutdown_consumer_management_thread,
460 cleanup_consumer_management_thread,
461 notifiers);
4ec029ed
JG
462 if (!thread) {
463 goto error;
464 }
465 wait_until_thread_is_ready(notifiers);
466 lttng_thread_put(thread);
e0252788 467 return notifiers->initialization_result == 0;
4ec029ed
JG
468error:
469 cleanup_consumer_management_thread(notifiers);
21fa020e 470error_alloc:
4ec029ed
JG
471 return false;
472}
This page took 0.083057 seconds and 5 git commands to generate.