Docs: sessiond: document the role of an application's two sockets
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
1 /*
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9 #define _LGPL_SOURCE
10 #include "cmd.hpp"
11 #include "health-sessiond.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "notification-thread-commands.hpp"
14 #include "rotate.hpp"
15 #include "rotation-thread.hpp"
16 #include "session.hpp"
17 #include "thread.hpp"
18 #include "timer.hpp"
19 #include "utils.hpp"
20
21 #include <common/align.hpp>
22 #include <common/config/session-config.hpp>
23 #include <common/defaults.hpp>
24 #include <common/error.hpp>
25 #include <common/futex.hpp>
26 #include <common/hashtable/utils.hpp>
27 #include <common/kernel-ctl/kernel-ctl.hpp>
28 #include <common/time.hpp>
29 #include <common/urcu.hpp>
30 #include <common/utils.hpp>
31
32 #include <lttng/condition/condition-internal.hpp>
33 #include <lttng/location-internal.hpp>
34 #include <lttng/notification/channel-internal.hpp>
35 #include <lttng/notification/notification-internal.hpp>
36 #include <lttng/rotate-internal.hpp>
37 #include <lttng/trigger/trigger.h>
38
39 #include <inttypes.h>
40 #include <signal.h>
41 #include <sys/stat.h>
42 #include <time.h>
43 #include <urcu.h>
44 #include <urcu/list.h>
45
46 struct lttng_notification_channel *rotate_notification_channel = nullptr;
47
48 struct rotation_thread {
49 struct lttng_poll_event events;
50 };
51
52 /*
53 * The timer thread enqueues jobs and wakes up the rotation thread.
54 * When the rotation thread wakes up, it empties the queue.
55 */
56 struct rotation_thread_timer_queue {
57 struct lttng_pipe *event_pipe;
58 struct cds_list_head list;
59 pthread_mutex_t lock;
60 };
61
62 struct rotation_thread_handle {
63 struct rotation_thread_timer_queue *rotation_timer_queue;
64 /* Access to the notification thread cmd_queue */
65 struct notification_thread_handle *notification_thread_handle;
66 /* Thread-specific quit pipe. */
67 struct lttng_pipe *quit_pipe;
68 };
69
70 namespace {
71 struct rotation_thread_job {
72 enum rotation_thread_job_type type;
73 struct ltt_session *session;
74 /* List member in struct rotation_thread_timer_queue. */
75 struct cds_list_head head;
76 };
77 } /* namespace */
78
79 static const char *get_job_type_str(enum rotation_thread_job_type job_type)
80 {
81 switch (job_type) {
82 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
83 return "CHECK_PENDING_ROTATION";
84 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
85 return "SCHEDULED_ROTATION";
86 default:
87 abort();
88 }
89 }
90
91 struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
92 {
93 struct rotation_thread_timer_queue *queue = nullptr;
94
95 queue = zmalloc<rotation_thread_timer_queue>();
96 if (!queue) {
97 PERROR("Failed to allocate timer rotate queue");
98 goto end;
99 }
100
101 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
102 CDS_INIT_LIST_HEAD(&queue->list);
103 pthread_mutex_init(&queue->lock, nullptr);
104 end:
105 return queue;
106 }
107
108 void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
109 {
110 if (!queue) {
111 return;
112 }
113
114 lttng_pipe_destroy(queue->event_pipe);
115
116 pthread_mutex_lock(&queue->lock);
117 LTTNG_ASSERT(cds_list_empty(&queue->list));
118 pthread_mutex_unlock(&queue->lock);
119 pthread_mutex_destroy(&queue->lock);
120 free(queue);
121 }
122
123 /*
124 * Destroy the thread data previously created by the init function.
125 */
126 void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
127 {
128 lttng_pipe_destroy(handle->quit_pipe);
129 free(handle);
130 }
131
132 struct rotation_thread_handle *
133 rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
134 struct notification_thread_handle *notification_thread_handle)
135 {
136 struct rotation_thread_handle *handle;
137
138 handle = zmalloc<rotation_thread_handle>();
139 if (!handle) {
140 goto end;
141 }
142
143 handle->rotation_timer_queue = rotation_timer_queue;
144 handle->notification_thread_handle = notification_thread_handle;
145 handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
146 if (!handle->quit_pipe) {
147 goto error;
148 }
149
150 end:
151 return handle;
152 error:
153 rotation_thread_handle_destroy(handle);
154 return nullptr;
155 }
156
157 /*
158 * Called with the rotation_thread_timer_queue lock held.
159 * Return true if the same timer job already exists in the queue, false if not.
160 */
161 static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
162 enum rotation_thread_job_type job_type,
163 struct ltt_session *session)
164 {
165 bool exists = false;
166 struct rotation_thread_job *job;
167
168 cds_list_for_each_entry (job, &queue->list, head) {
169 if (job->session == session && job->type == job_type) {
170 exists = true;
171 goto end;
172 }
173 }
174 end:
175 return exists;
176 }
177
178 void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
179 enum rotation_thread_job_type job_type,
180 struct ltt_session *session)
181 {
182 int ret;
183 const char dummy = '!';
184 struct rotation_thread_job *job = nullptr;
185 const char *job_type_str = get_job_type_str(job_type);
186
187 pthread_mutex_lock(&queue->lock);
188 if (timer_job_exists(queue, job_type, session)) {
189 /*
190 * This timer job is already pending, we don't need to add
191 * it.
192 */
193 goto end;
194 }
195
196 job = zmalloc<rotation_thread_job>();
197 if (!job) {
198 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
199 job_type_str,
200 session->name);
201 goto end;
202 }
203 /* No reason for this to fail as the caller must hold a reference. */
204 (void) session_get(session);
205
206 job->session = session;
207 job->type = job_type;
208 cds_list_add_tail(&job->head, &queue->list);
209
210 ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
211 if (ret < 0) {
212 /*
213 * We do not want to block in the timer handler, the job has
214 * been enqueued in the list, the wakeup pipe is probably full,
215 * the job will be processed when the rotation_thread catches
216 * up.
217 */
218 DIAGNOSTIC_PUSH
219 DIAGNOSTIC_IGNORE_LOGICAL_OP
220 if (errno == EAGAIN || errno == EWOULDBLOCK) {
221 DIAGNOSTIC_POP
222 /*
223 * Not an error, but would be surprising and indicate
224 * that the rotation thread can't keep up with the
225 * current load.
226 */
227 DBG("Wake-up pipe of rotation thread job queue is full");
228 goto end;
229 }
230 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
231 job_type_str,
232 session->name);
233 goto end;
234 }
235
236 end:
237 pthread_mutex_unlock(&queue->lock);
238 }
239
240 static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
241 {
242 int ret;
243
244 /*
245 * Create pollset with size 3:
246 * - rotation thread quit pipe,
247 * - rotation thread timer queue pipe,
248 * - notification channel sock,
249 */
250 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
251 if (ret < 0) {
252 goto error;
253 }
254
255 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
256 if (ret < 0) {
257 ERR("Failed to add quit pipe read fd to poll set");
258 goto error;
259 }
260
261 ret = lttng_poll_add(
262 poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
263 if (ret < 0) {
264 ERR("Failed to add rotate_pending fd to poll set");
265 goto error;
266 }
267
268 return ret;
269 error:
270 lttng_poll_clean(poll_set);
271 return ret;
272 }
273
274 static void fini_thread_state(struct rotation_thread *state)
275 {
276 lttng_poll_clean(&state->events);
277 if (rotate_notification_channel) {
278 lttng_notification_channel_destroy(rotate_notification_channel);
279 }
280 }
281
282 static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
283 {
284 int ret;
285
286 memset(state, 0, sizeof(*state));
287 lttng_poll_init(&state->events);
288
289 ret = init_poll_set(&state->events, handle);
290 if (ret) {
291 ERR("Failed to initialize rotation thread poll set");
292 goto end;
293 }
294
295 rotate_notification_channel =
296 lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
297 if (!rotate_notification_channel) {
298 ERR("Could not create notification channel");
299 ret = -1;
300 goto end;
301 }
302 ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
303 if (ret < 0) {
304 ERR("Failed to add notification fd to pollset");
305 goto end;
306 }
307
308 end:
309 return ret;
310 }
311
312 static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
313 bool *_rotation_completed)
314 {
315 int ret = 0;
316 struct consumer_socket *socket;
317 struct cds_lfht_iter iter;
318 enum consumer_trace_chunk_exists_status exists_status;
319 uint64_t relayd_id;
320 bool chunk_exists_on_peer = false;
321 enum lttng_trace_chunk_status chunk_status;
322 lttng::urcu::read_lock_guard read_lock;
323
324 LTTNG_ASSERT(session->chunk_being_archived);
325
326 /*
327 * Check for a local pending rotation on all consumers (32-bit
328 * user space, 64-bit user space, and kernel).
329 */
330 if (!session->ust_session) {
331 goto skip_ust;
332 }
333
334 cds_lfht_for_each_entry (
335 session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
336 relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
337 -1ULL :
338 session->ust_session->consumer->net_seq_index;
339
340 pthread_mutex_lock(socket->lock);
341 ret = consumer_trace_chunk_exists(socket,
342 relayd_id,
343 session->id,
344 session->chunk_being_archived,
345 &exists_status);
346 if (ret) {
347 pthread_mutex_unlock(socket->lock);
348 ERR("Error occurred while checking rotation status on consumer daemon");
349 goto end;
350 }
351
352 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
353 pthread_mutex_unlock(socket->lock);
354 chunk_exists_on_peer = true;
355 goto end;
356 }
357 pthread_mutex_unlock(socket->lock);
358 }
359
360 skip_ust:
361 if (!session->kernel_session) {
362 goto skip_kernel;
363 }
364 cds_lfht_for_each_entry (
365 session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
366 pthread_mutex_lock(socket->lock);
367 relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
368 -1ULL :
369 session->kernel_session->consumer->net_seq_index;
370
371 ret = consumer_trace_chunk_exists(socket,
372 relayd_id,
373 session->id,
374 session->chunk_being_archived,
375 &exists_status);
376 if (ret) {
377 pthread_mutex_unlock(socket->lock);
378 ERR("Error occurred while checking rotation status on consumer daemon");
379 goto end;
380 }
381
382 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
383 pthread_mutex_unlock(socket->lock);
384 chunk_exists_on_peer = true;
385 goto end;
386 }
387 pthread_mutex_unlock(socket->lock);
388 }
389 skip_kernel:
390 end:
391
392 if (!chunk_exists_on_peer) {
393 uint64_t chunk_being_archived_id;
394
395 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
396 &chunk_being_archived_id);
397 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
398 DBG("Rotation of trace archive %" PRIu64
399 " of session \"%s\" is complete on all consumers",
400 chunk_being_archived_id,
401 session->name);
402 }
403 *_rotation_completed = !chunk_exists_on_peer;
404 if (ret) {
405 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
406 if (ret) {
407 ERR("Failed to reset rotation state of session \"%s\"", session->name);
408 }
409 }
410 }
411
412 /*
413 * Check if the last rotation was completed, called with session lock held.
414 * Should only return non-zero in the event of a fatal error. Doing so will
415 * shutdown the thread.
416 */
417 static int
418 check_session_rotation_pending(struct ltt_session *session,
419 struct notification_thread_handle *notification_thread_handle)
420 {
421 int ret;
422 struct lttng_trace_archive_location *location;
423 enum lttng_trace_chunk_status chunk_status;
424 bool rotation_completed = false;
425 const char *archived_chunk_name;
426 uint64_t chunk_being_archived_id;
427
428 if (!session->chunk_being_archived) {
429 ret = 0;
430 goto end;
431 }
432
433 chunk_status =
434 lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
435 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
436
437 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
438 session->name,
439 chunk_being_archived_id);
440
441 /*
442 * The rotation-pending check timer of a session is launched in
443 * one-shot mode. If the rotation is incomplete, the rotation
444 * thread will re-enable the pending-check timer.
445 *
446 * The timer thread can't stop the timer itself since it is involved
447 * in the check for the timer's quiescence.
448 */
449 ret = timer_session_rotation_pending_check_stop(session);
450 if (ret) {
451 goto check_ongoing_rotation;
452 }
453
454 check_session_rotation_pending_on_consumers(session, &rotation_completed);
455 if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
456 goto check_ongoing_rotation;
457 }
458
459 /*
460 * Now we can clear the "ONGOING" state in the session. New
461 * rotations can start now.
462 */
463 chunk_status = lttng_trace_chunk_get_name(
464 session->chunk_being_archived, &archived_chunk_name, nullptr);
465 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
466 free(session->last_archived_chunk_name);
467 session->last_archived_chunk_name = strdup(archived_chunk_name);
468 if (!session->last_archived_chunk_name) {
469 PERROR("Failed to duplicate archived chunk name");
470 }
471 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
472
473 if (!session->quiet_rotation) {
474 location = session_get_trace_archive_location(session);
475 ret = notification_thread_command_session_rotation_completed(
476 notification_thread_handle,
477 session->id,
478 session->last_archived_chunk_id.value,
479 location);
480 lttng_trace_archive_location_put(location);
481 if (ret != LTTNG_OK) {
482 ERR("Failed to notify notification thread of completed rotation for session %s",
483 session->name);
484 }
485 }
486
487 ret = 0;
488 check_ongoing_rotation:
489 if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
490 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
491 &chunk_being_archived_id);
492 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
493
494 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
495 chunk_being_archived_id,
496 session->name);
497 ret = timer_session_rotation_pending_check_start(session,
498 DEFAULT_ROTATE_PENDING_TIMER);
499 if (ret) {
500 ERR("Failed to re-enable rotation pending timer");
501 ret = -1;
502 goto end;
503 }
504 }
505
506 end:
507 return ret;
508 }
509
510 /* Call with the session and session_list locks held. */
511 static int launch_session_rotation(struct ltt_session *session)
512 {
513 int ret;
514 struct lttng_rotate_session_return rotation_return;
515
516 DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
517
518 ret = cmd_rotate_session(
519 session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
520 if (ret == LTTNG_OK) {
521 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
522 session->name);
523 } else {
524 /* Don't consider errors as fatal. */
525 DBG("Scheduled time-based rotation aborted for session %s: %s",
526 session->name,
527 lttng_strerror(ret));
528 }
529 return 0;
530 }
531
532 static int run_job(struct rotation_thread_job *job,
533 struct ltt_session *session,
534 struct notification_thread_handle *notification_thread_handle)
535 {
536 int ret;
537
538 switch (job->type) {
539 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
540 ret = launch_session_rotation(session);
541 break;
542 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
543 ret = check_session_rotation_pending(session, notification_thread_handle);
544 break;
545 default:
546 abort();
547 }
548 return ret;
549 }
550
551 static int handle_job_queue(struct rotation_thread_handle *handle,
552 struct rotation_thread *state __attribute__((unused)),
553 struct rotation_thread_timer_queue *queue)
554 {
555 int ret = 0;
556
557 for (;;) {
558 struct ltt_session *session;
559 struct rotation_thread_job *job;
560
561 /* Take the queue lock only to pop an element from the list. */
562 pthread_mutex_lock(&queue->lock);
563 if (cds_list_empty(&queue->list)) {
564 pthread_mutex_unlock(&queue->lock);
565 break;
566 }
567 job = cds_list_first_entry(&queue->list, typeof(*job), head);
568 cds_list_del(&job->head);
569 pthread_mutex_unlock(&queue->lock);
570
571 session_lock_list();
572 session = job->session;
573 if (!session) {
574 DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
575 /*
576 * This is a non-fatal error, and we cannot report it to
577 * the user (timer), so just print the error and
578 * continue the processing.
579 *
580 * While the timer thread will purge pending signals for
581 * a session on the session's destruction, it is
582 * possible for a job targeting that session to have
583 * already been queued before it was destroyed.
584 */
585 free(job);
586 session_put(session);
587 session_unlock_list();
588 continue;
589 }
590
591 session_lock(session);
592 ret = run_job(job, session, handle->notification_thread_handle);
593 session_unlock(session);
594 /* Release reference held by the job. */
595 session_put(session);
596 session_unlock_list();
597 free(job);
598 if (ret) {
599 goto end;
600 }
601 }
602
603 ret = 0;
604
605 end:
606 return ret;
607 }
608
609 static int handle_condition(const struct lttng_notification *notification,
610 struct notification_thread_handle *notification_thread_handle)
611 {
612 int ret = 0;
613 const char *condition_session_name = nullptr;
614 enum lttng_condition_type condition_type;
615 enum lttng_condition_status condition_status;
616 enum lttng_evaluation_status evaluation_status;
617 uint64_t consumed;
618 struct ltt_session *session;
619 const struct lttng_condition *condition =
620 lttng_notification_get_const_condition(notification);
621 const struct lttng_evaluation *evaluation =
622 lttng_notification_get_const_evaluation(notification);
623
624 condition_type = lttng_condition_get_type(condition);
625
626 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
627 ret = -1;
628 ERR("Condition type and session usage type are not the same");
629 goto end;
630 }
631
632 /* Fetch info to test */
633 condition_status = lttng_condition_session_consumed_size_get_session_name(
634 condition, &condition_session_name);
635 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
636 ERR("Session name could not be fetched");
637 ret = -1;
638 goto end;
639 }
640 evaluation_status =
641 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
642 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
643 ERR("Failed to get evaluation");
644 ret = -1;
645 goto end;
646 }
647
648 session_lock_list();
649 session = session_find_by_name(condition_session_name);
650 if (!session) {
651 DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
652 lttng_condition_type_str(condition_type),
653 condition_session_name);
654 /*
655 * Not a fatal error: a session can be destroyed before we get
656 * the chance to handle the notification.
657 */
658 ret = 0;
659 session_unlock_list();
660 goto end;
661 }
662 session_lock(session);
663
664 if (!lttng_trigger_is_equal(session->rotate_trigger,
665 lttng_notification_get_const_trigger(notification))) {
666 /* Notification does not originate from our rotation trigger. */
667 ret = 0;
668 goto end_unlock;
669 }
670
671 ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
672 if (ret) {
673 goto end_unlock;
674 }
675
676 ret = cmd_rotate_session(
677 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
678 switch (ret) {
679 case LTTNG_OK:
680 break;
681 case -LTTNG_ERR_ROTATION_PENDING:
682 DBG("Rotate already pending, subscribe to the next threshold value");
683 break;
684 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
685 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
686 break;
687 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
688 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
689 break;
690 default:
691 ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
692 ret = -1;
693 goto end_unlock;
694 }
695
696 ret = subscribe_session_consumed_size_rotation(
697 session, consumed + session->rotate_size, notification_thread_handle);
698 if (ret) {
699 ERR("Failed to subscribe to session consumed size condition");
700 goto end_unlock;
701 }
702 ret = 0;
703
704 end_unlock:
705 session_unlock(session);
706 session_put(session);
707 session_unlock_list();
708 end:
709 return ret;
710 }
711
712 static int handle_notification_channel(int fd __attribute__((unused)),
713 struct rotation_thread_handle *handle,
714 struct rotation_thread *state __attribute__((unused)))
715 {
716 int ret;
717 bool notification_pending;
718 struct lttng_notification *notification = nullptr;
719 enum lttng_notification_channel_status status;
720
721 status = lttng_notification_channel_has_pending_notification(rotate_notification_channel,
722 &notification_pending);
723 if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
724 ERR("Error occurred while checking for pending notification");
725 ret = -1;
726 goto end;
727 }
728
729 if (!notification_pending) {
730 ret = 0;
731 goto end;
732 }
733
734 /* Receive the next notification. */
735 status = lttng_notification_channel_get_next_notification(rotate_notification_channel,
736 &notification);
737
738 switch (status) {
739 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
740 break;
741 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
742 /* Not an error, we will wait for the next one */
743 ret = 0;
744 goto end;
745 ;
746 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
747 ERR("Notification channel was closed");
748 ret = -1;
749 goto end;
750 default:
751 /* Unhandled conditions / errors. */
752 ERR("Unknown notification channel status");
753 ret = -1;
754 goto end;
755 }
756
757 ret = handle_condition(notification, handle->notification_thread_handle);
758
759 end:
760 lttng_notification_destroy(notification);
761 return ret;
762 }
763
764 static void *thread_rotation(void *data)
765 {
766 int ret;
767 struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
768 struct rotation_thread thread;
769 int queue_pipe_fd;
770
771 DBG("Started rotation thread");
772 rcu_register_thread();
773 rcu_thread_online();
774 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
775 health_code_update();
776
777 if (!handle) {
778 ERR("Invalid thread context provided");
779 goto end;
780 }
781
782 queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
783
784 ret = init_thread_state(handle, &thread);
785 if (ret) {
786 goto error;
787 }
788
789 while (true) {
790 int fd_count, i;
791
792 health_poll_entry();
793 DBG("Entering poll wait");
794 ret = lttng_poll_wait(&thread.events, -1);
795 DBG("Poll wait returned (%i)", ret);
796 health_poll_exit();
797 if (ret < 0) {
798 /*
799 * Restart interrupted system call.
800 */
801 if (errno == EINTR) {
802 continue;
803 }
804 ERR("Error encountered during lttng_poll_wait (%i)", ret);
805 goto error;
806 }
807
808 fd_count = ret;
809 for (i = 0; i < fd_count; i++) {
810 int fd = LTTNG_POLL_GETFD(&thread.events, i);
811 uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
812
813 DBG("Handling fd (%i) activity (%u)", fd, revents);
814
815 if (revents & LPOLLERR) {
816 ERR("Polling returned an error on fd %i", fd);
817 goto error;
818 }
819
820 if (fd == rotate_notification_channel->socket) {
821 ret = handle_notification_channel(fd, handle, &thread);
822 if (ret) {
823 ERR("Error occurred while handling activity on notification channel socket");
824 goto error;
825 }
826 } else {
827 /* Job queue or quit pipe activity. */
828
829 /*
830 * The job queue is serviced if there is
831 * activity on the quit pipe to ensure it is
832 * flushed and all references held in the queue
833 * are released.
834 */
835 ret = handle_job_queue(
836 handle, &thread, handle->rotation_timer_queue);
837 if (ret) {
838 ERR("Failed to handle rotation timer pipe event");
839 goto error;
840 }
841
842 if (fd == queue_pipe_fd) {
843 char buf;
844
845 ret = lttng_read(fd, &buf, 1);
846 if (ret != 1) {
847 ERR("Failed to read from wakeup pipe (fd = %i)",
848 fd);
849 goto error;
850 }
851 } else {
852 DBG("Quit pipe activity");
853 goto exit;
854 }
855 }
856 }
857 }
858 exit:
859 error:
860 DBG("Thread exit");
861 fini_thread_state(&thread);
862 end:
863 health_unregister(the_health_sessiond);
864 rcu_thread_offline();
865 rcu_unregister_thread();
866 return nullptr;
867 }
868
869 static bool shutdown_rotation_thread(void *thread_data)
870 {
871 struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
872 const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
873
874 return notify_thread_pipe(write_fd) == 1;
875 }
876
877 bool launch_rotation_thread(struct rotation_thread_handle *handle)
878 {
879 struct lttng_thread *thread;
880
881 thread = lttng_thread_create(
882 "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
883 if (!thread) {
884 goto error;
885 }
886 lttng_thread_put(thread);
887 return true;
888 error:
889 return false;
890 }
This page took 0.048554 seconds and 4 git commands to generate.