2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include <common/pipe.h>
23 #include <common/utils.h>
25 #include "manage-consumer.h"
26 #include "testpoint.h"
27 #include "health-sessiond.h"
30 #include "ust-consumer.h"
32 struct thread_notifiers
{
33 struct lttng_pipe
*quit_pipe
;
34 struct consumer_data
*consumer_data
;
36 int initialization_result
;
39 static void mark_thread_as_ready(struct thread_notifiers
*notifiers
)
41 DBG("Marking consumer management thread as ready");
42 notifiers
->initialization_result
= 0;
43 sem_post(¬ifiers
->ready
);
46 static void mark_thread_intialization_as_failed(
47 struct thread_notifiers
*notifiers
)
49 ERR("Consumer management thread entering error state");
50 notifiers
->initialization_result
= -1;
51 sem_post(¬ifiers
->ready
);
54 static void wait_until_thread_is_ready(struct thread_notifiers
*notifiers
)
56 DBG("Waiting for consumer management thread to be ready");
57 sem_wait(¬ifiers
->ready
);
58 DBG("Consumer management thread is ready");
62 * This thread manage the consumer error sent back to the session daemon.
64 void *thread_consumer_management(void *data
)
66 int sock
= -1, i
, ret
, pollfd
, err
= -1, should_quit
= 0;
67 uint32_t revents
, nb_fd
;
68 enum lttcomm_return_code code
;
69 struct lttng_poll_event events
;
70 struct thread_notifiers
*notifiers
= data
;
71 struct consumer_data
*consumer_data
= notifiers
->consumer_data
;
72 const int quit_pipe_read_fd
= lttng_pipe_get_readfd(notifiers
->quit_pipe
);
73 struct consumer_socket
*cmd_socket_wrapper
= NULL
;
75 DBG("[thread] Manage consumer started");
77 rcu_register_thread();
80 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_CONSUMER
);
85 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
86 * metadata_sock. Nothing more will be added to this poll set.
88 ret
= lttng_poll_create(&events
, 3, LTTNG_CLOEXEC
);
90 mark_thread_intialization_as_failed(notifiers
);
94 ret
= lttng_poll_add(&events
, quit_pipe_read_fd
, LPOLLIN
| LPOLLERR
);
96 mark_thread_intialization_as_failed(notifiers
);
101 * The error socket here is already in a listening state which was done
102 * just before spawning this thread to avoid a race between the consumer
103 * daemon exec trying to connect and the listen() call.
105 ret
= lttng_poll_add(&events
, consumer_data
->err_sock
, LPOLLIN
| LPOLLRDHUP
);
107 mark_thread_intialization_as_failed(notifiers
);
111 health_code_update();
113 /* Infinite blocking call, waiting for transmission */
116 if (testpoint(sessiond_thread_manage_consumer
)) {
117 mark_thread_intialization_as_failed(notifiers
);
121 ret
= lttng_poll_wait(&events
, -1);
124 mark_thread_intialization_as_failed(notifiers
);
130 for (i
= 0; i
< nb_fd
; i
++) {
131 /* Fetch once the poll data */
132 revents
= LTTNG_POLL_GETEV(&events
, i
);
133 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
135 health_code_update();
137 /* Thread quit pipe has been closed. Killing thread. */
138 if (pollfd
== quit_pipe_read_fd
) {
140 mark_thread_intialization_as_failed(notifiers
);
142 } else if (pollfd
== consumer_data
->err_sock
) {
143 /* Event on the registration socket */
144 if (revents
& LPOLLIN
) {
146 } else if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
147 ERR("consumer err socket poll error");
148 mark_thread_intialization_as_failed(notifiers
);
151 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
152 mark_thread_intialization_as_failed(notifiers
);
158 sock
= lttcomm_accept_unix_sock(consumer_data
->err_sock
);
160 mark_thread_intialization_as_failed(notifiers
);
165 * Set the CLOEXEC flag. Return code is useless because either way, the
168 (void) utils_set_fd_cloexec(sock
);
170 health_code_update();
172 DBG2("Receiving code from consumer err_sock");
174 /* Getting status code from kconsumerd */
175 ret
= lttcomm_recv_unix_sock(sock
, &code
,
176 sizeof(enum lttcomm_return_code
));
178 mark_thread_intialization_as_failed(notifiers
);
182 health_code_update();
183 if (code
!= LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
) {
184 ERR("consumer error when waiting for SOCK_READY : %s",
185 lttcomm_get_readable_code(-code
));
186 mark_thread_intialization_as_failed(notifiers
);
190 /* Connect both command and metadata sockets. */
191 consumer_data
->cmd_sock
=
192 lttcomm_connect_unix_sock(
193 consumer_data
->cmd_unix_sock_path
);
194 consumer_data
->metadata_fd
=
195 lttcomm_connect_unix_sock(
196 consumer_data
->cmd_unix_sock_path
);
197 if (consumer_data
->cmd_sock
< 0 || consumer_data
->metadata_fd
< 0) {
198 PERROR("consumer connect cmd socket");
199 mark_thread_intialization_as_failed(notifiers
);
203 consumer_data
->metadata_sock
.fd_ptr
= &consumer_data
->metadata_fd
;
205 /* Create metadata socket lock. */
206 consumer_data
->metadata_sock
.lock
= zmalloc(sizeof(pthread_mutex_t
));
207 if (consumer_data
->metadata_sock
.lock
== NULL
) {
208 PERROR("zmalloc pthread mutex");
209 mark_thread_intialization_as_failed(notifiers
);
212 pthread_mutex_init(consumer_data
->metadata_sock
.lock
, NULL
);
214 DBG("Consumer command socket ready (fd: %d)", consumer_data
->cmd_sock
);
215 DBG("Consumer metadata socket ready (fd: %d)",
216 consumer_data
->metadata_fd
);
219 * Remove the consumerd error sock since we've established a connection.
221 ret
= lttng_poll_del(&events
, consumer_data
->err_sock
);
223 mark_thread_intialization_as_failed(notifiers
);
227 /* Add new accepted error socket. */
228 ret
= lttng_poll_add(&events
, sock
, LPOLLIN
| LPOLLRDHUP
);
230 mark_thread_intialization_as_failed(notifiers
);
234 /* Add metadata socket that is successfully connected. */
235 ret
= lttng_poll_add(&events
, consumer_data
->metadata_fd
,
236 LPOLLIN
| LPOLLRDHUP
);
238 mark_thread_intialization_as_failed(notifiers
);
242 health_code_update();
245 * Transfer the write-end of the channel monitoring pipe to the consumer
246 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
248 cmd_socket_wrapper
= consumer_allocate_socket(&consumer_data
->cmd_sock
);
249 if (!cmd_socket_wrapper
) {
250 mark_thread_intialization_as_failed(notifiers
);
253 cmd_socket_wrapper
->lock
= &consumer_data
->lock
;
255 pthread_mutex_lock(cmd_socket_wrapper
->lock
);
256 ret
= consumer_init(cmd_socket_wrapper
, sessiond_uuid
);
258 ERR("Failed to send sessiond uuid to consumer daemon");
259 mark_thread_intialization_as_failed(notifiers
);
260 pthread_mutex_unlock(cmd_socket_wrapper
->lock
);
263 pthread_mutex_unlock(cmd_socket_wrapper
->lock
);
265 ret
= consumer_send_channel_monitor_pipe(cmd_socket_wrapper
,
266 consumer_data
->channel_monitor_pipe
);
268 mark_thread_intialization_as_failed(notifiers
);
272 /* Discard the socket wrapper as it is no longer needed. */
273 consumer_destroy_socket(cmd_socket_wrapper
);
274 cmd_socket_wrapper
= NULL
;
276 /* The thread is completely initialized, signal that it is ready. */
277 mark_thread_as_ready(notifiers
);
279 /* Infinite blocking call, waiting for transmission */
281 health_code_update();
283 /* Exit the thread because the thread quit pipe has been triggered. */
285 /* Not a health error. */
291 ret
= lttng_poll_wait(&events
, -1);
299 for (i
= 0; i
< nb_fd
; i
++) {
300 /* Fetch once the poll data */
301 revents
= LTTNG_POLL_GETEV(&events
, i
);
302 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
304 health_code_update();
307 * Thread quit pipe has been triggered, flag that we should stop
308 * but continue the current loop to handle potential data from
311 if (pollfd
== quit_pipe_read_fd
) {
313 } else if (pollfd
== sock
) {
314 /* Event on the consumerd socket */
315 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)
316 && !(revents
& LPOLLIN
)) {
317 ERR("consumer err socket second poll error");
320 health_code_update();
321 /* Wait for any kconsumerd error */
322 ret
= lttcomm_recv_unix_sock(sock
, &code
,
323 sizeof(enum lttcomm_return_code
));
325 ERR("consumer closed the command socket");
329 ERR("consumer return code : %s",
330 lttcomm_get_readable_code(-code
));
333 } else if (pollfd
== consumer_data
->metadata_fd
) {
334 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)
335 && !(revents
& LPOLLIN
)) {
336 ERR("consumer err metadata socket second poll error");
339 /* UST metadata requests */
340 ret
= ust_consumer_metadata_request(
341 &consumer_data
->metadata_sock
);
343 ERR("Handling metadata request");
347 /* No need for an else branch all FDs are tested prior. */
349 health_code_update();
355 * We lock here because we are about to close the sockets and some other
356 * thread might be using them so get exclusive access which will abort all
357 * other consumer command by other threads.
359 pthread_mutex_lock(&consumer_data
->lock
);
361 /* Immediately set the consumerd state to stopped */
362 if (consumer_data
->type
== LTTNG_CONSUMER_KERNEL
) {
363 uatomic_set(&kernel_consumerd_state
, CONSUMER_ERROR
);
364 } else if (consumer_data
->type
== LTTNG_CONSUMER64_UST
||
365 consumer_data
->type
== LTTNG_CONSUMER32_UST
) {
366 uatomic_set(&ust_consumerd_state
, CONSUMER_ERROR
);
368 /* Code flow error... */
372 if (consumer_data
->err_sock
>= 0) {
373 ret
= close(consumer_data
->err_sock
);
377 consumer_data
->err_sock
= -1;
379 if (consumer_data
->cmd_sock
>= 0) {
380 ret
= close(consumer_data
->cmd_sock
);
384 consumer_data
->cmd_sock
= -1;
386 if (consumer_data
->metadata_sock
.fd_ptr
&&
387 *consumer_data
->metadata_sock
.fd_ptr
>= 0) {
388 ret
= close(*consumer_data
->metadata_sock
.fd_ptr
);
400 unlink(consumer_data
->err_unix_sock_path
);
401 unlink(consumer_data
->cmd_unix_sock_path
);
402 pthread_mutex_unlock(&consumer_data
->lock
);
404 /* Cleanup metadata socket mutex. */
405 if (consumer_data
->metadata_sock
.lock
) {
406 pthread_mutex_destroy(consumer_data
->metadata_sock
.lock
);
407 free(consumer_data
->metadata_sock
.lock
);
409 lttng_poll_clean(&events
);
411 if (cmd_socket_wrapper
) {
412 consumer_destroy_socket(cmd_socket_wrapper
);
417 ERR("Health error occurred in %s", __func__
);
419 health_unregister(health_sessiond
);
420 DBG("consumer thread cleanup completed");
422 rcu_thread_offline();
423 rcu_unregister_thread();
428 static bool shutdown_consumer_management_thread(void *data
)
430 struct thread_notifiers
*notifiers
= data
;
431 const int write_fd
= lttng_pipe_get_writefd(notifiers
->quit_pipe
);
433 return notify_thread_pipe(write_fd
) == 1;
436 static void cleanup_consumer_management_thread(void *data
)
438 struct thread_notifiers
*notifiers
= data
;
440 lttng_pipe_destroy(notifiers
->quit_pipe
);
444 bool launch_consumer_management_thread(struct consumer_data
*consumer_data
)
446 struct lttng_pipe
*quit_pipe
;
447 struct thread_notifiers
*notifiers
= NULL
;
448 struct lttng_thread
*thread
;
450 notifiers
= zmalloc(sizeof(*notifiers
));
455 quit_pipe
= lttng_pipe_open(FD_CLOEXEC
);
459 notifiers
->quit_pipe
= quit_pipe
;
460 notifiers
->consumer_data
= consumer_data
;
461 sem_init(¬ifiers
->ready
, 0, 0);
463 thread
= lttng_thread_create("Consumer management",
464 thread_consumer_management
,
465 shutdown_consumer_management_thread
,
466 cleanup_consumer_management_thread
,
471 wait_until_thread_is_ready(notifiers
);
472 lttng_thread_put(thread
);
473 if (notifiers
->initialization_result
) {
478 cleanup_consumer_management_thread(notifiers
);