Clean-up: run format-cpp on the tree
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
CommitLineData
db66e574 1/*
ab5be9fa
MJ
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
db66e574 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
db66e574 6 *
db66e574
JD
7 */
8
9#define _LGPL_SOURCE
28ab034a
JG
10#include "cmd.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "notification-thread-commands.hpp"
28ab034a
JG
14#include "rotation-thread.hpp"
15#include "session.hpp"
16#include "thread.hpp"
17#include "timer.hpp"
18#include "utils.hpp"
19
20#include <common/align.hpp>
c9e313bc
SM
21#include <common/config/session-config.hpp>
22#include <common/defaults.hpp>
28ab034a 23#include <common/error.hpp>
0038180d
JG
24#include <common/eventfd.hpp>
25#include <common/exception.hpp>
26#include <common/file-descriptor.hpp>
27#include <common/format.hpp>
c9e313bc 28#include <common/futex.hpp>
c9e313bc 29#include <common/hashtable/utils.hpp>
c9e313bc 30#include <common/kernel-ctl/kernel-ctl.hpp>
0038180d
JG
31#include <common/locked-reference.hpp>
32#include <common/make-unique-wrapper.hpp>
33#include <common/pthread-lock.hpp>
34#include <common/scope-exit.hpp>
28ab034a 35#include <common/time.hpp>
56047f5a 36#include <common/urcu.hpp>
28ab034a
JG
37#include <common/utils.hpp>
38
0038180d 39#include <lttng/action/action-internal.hpp>
c9e313bc 40#include <lttng/condition/condition-internal.hpp>
28ab034a
JG
41#include <lttng/location-internal.hpp>
42#include <lttng/notification/channel-internal.hpp>
c08136a3 43#include <lttng/notification/notification-internal.hpp>
28ab034a
JG
44#include <lttng/rotate-internal.hpp>
45#include <lttng/trigger/trigger.h>
db66e574 46
28ab034a 47#include <inttypes.h>
0038180d 48#include <memory>
28ab034a 49#include <signal.h>
dc65dda3 50#include <sys/eventfd.h>
28ab034a
JG
51#include <sys/stat.h>
52#include <time.h>
db66e574
JD
53#include <urcu.h>
54#include <urcu/list.h>
db66e574 55
0038180d 56namespace ls = lttng::sessiond;
db66e574 57
92816cc3
JG
58/*
59 * The timer thread enqueues jobs and wakes up the rotation thread.
60 * When the rotation thread wakes up, it empties the queue.
61 */
0038180d 62struct ls::rotation_thread_timer_queue {
92816cc3
JG
63 struct lttng_pipe *event_pipe;
64 struct cds_list_head list;
65 pthread_mutex_t lock;
66};
67
f1494934
JG
68namespace {
69struct rotation_thread_job {
0038180d
JG
70 using uptr = std::unique_ptr<
71 rotation_thread_job,
72 lttng::details::create_unique_class<rotation_thread_job, lttng::free>>;
73
74 enum ls::rotation_thread_job_type type;
f1494934
JG
75 struct ltt_session *session;
76 /* List member in struct rotation_thread_timer_queue. */
77 struct cds_list_head head;
78};
f1494934 79
0038180d 80const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
db66e574 81{
92816cc3 82 switch (job_type) {
0038180d 83 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
92816cc3 84 return "CHECK_PENDING_ROTATION";
0038180d 85 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
92816cc3
JG
86 return "SCHEDULED_ROTATION";
87 default:
88 abort();
89 }
db66e574
JD
90}
91
92816cc3
JG
92/*
93 * Called with the rotation_thread_timer_queue lock held.
94 * Return true if the same timer job already exists in the queue, false if not.
95 */
0038180d
JG
96bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
97 ls::rotation_thread_job_type job_type,
98 ltt_session *session)
92816cc3
JG
99{
100 bool exists = false;
101 struct rotation_thread_job *job;
102
28ab034a 103 cds_list_for_each_entry (job, &queue->list, head) {
c7031a2c 104 if (job->session == session && job->type == job_type) {
92816cc3
JG
105 exists = true;
106 goto end;
db66e574 107 }
db66e574 108 }
92816cc3
JG
109end:
110 return exists;
111}
112
0038180d 113void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
92816cc3 114{
db582e11 115 int ret = 0;
92816cc3
JG
116 struct consumer_socket *socket;
117 struct cds_lfht_iter iter;
d2956687
JG
118 enum consumer_trace_chunk_exists_status exists_status;
119 uint64_t relayd_id;
120 bool chunk_exists_on_peer = false;
121 enum lttng_trace_chunk_status chunk_status;
56047f5a 122 lttng::urcu::read_lock_guard read_lock;
d2956687 123
0038180d 124 LTTNG_ASSERT(session.chunk_being_archived);
92816cc3
JG
125
126 /*
127 * Check for a local pending rotation on all consumers (32-bit
128 * user space, 64-bit user space, and kernel).
129 */
0038180d 130 if (!session.ust_session) {
92816cc3
JG
131 goto skip_ust;
132 }
56047f5a 133
28ab034a 134 cds_lfht_for_each_entry (
0038180d
JG
135 session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
136 relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 137 -1ULL :
0038180d 138 session.ust_session->consumer->net_seq_index;
d2956687 139
0038180d 140 lttng::pthread::lock_guard socket_lock(*socket->lock);
d2956687 141 ret = consumer_trace_chunk_exists(socket,
28ab034a 142 relayd_id,
0038180d
JG
143 session.id,
144 session.chunk_being_archived,
28ab034a 145 &exists_status);
d2956687 146 if (ret) {
83ed9e90 147 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3 148 goto end;
db66e574 149 }
d2956687 150
16100d7a 151 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
152 chunk_exists_on_peer = true;
153 goto end;
16100d7a 154 }
16100d7a 155 }
db66e574 156
92816cc3 157skip_ust:
0038180d 158 if (!session.kernel_session) {
92816cc3 159 goto skip_kernel;
db66e574 160 }
0038180d 161
28ab034a 162 cds_lfht_for_each_entry (
0038180d
JG
163 session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
164 lttng::pthread::lock_guard socket_lock(*socket->lock);
165
166 relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 167 -1ULL :
0038180d 168 session.kernel_session->consumer->net_seq_index;
d2956687
JG
169
170 ret = consumer_trace_chunk_exists(socket,
28ab034a 171 relayd_id,
0038180d
JG
172 session.id,
173 session.chunk_being_archived,
28ab034a 174 &exists_status);
d2956687 175 if (ret) {
83ed9e90 176 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3
JG
177 goto end;
178 }
d2956687 179
16100d7a 180 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
181 chunk_exists_on_peer = true;
182 goto end;
16100d7a 183 }
92816cc3
JG
184 }
185skip_kernel:
186end:
db66e574 187
d2956687
JG
188 if (!chunk_exists_on_peer) {
189 uint64_t chunk_being_archived_id;
190
0038180d 191 chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
28ab034a 192 &chunk_being_archived_id);
a0377dfe 193 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
28ab034a
JG
194 DBG("Rotation of trace archive %" PRIu64
195 " of session \"%s\" is complete on all consumers",
196 chunk_being_archived_id,
0038180d 197 session.name);
db66e574 198 }
0038180d
JG
199
200 _rotation_completed = !chunk_exists_on_peer;
92816cc3 201 if (ret) {
28ab034a 202 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
2961f09e 203 if (ret) {
0038180d 204 ERR("Failed to reset rotation state of session \"%s\"", session.name);
2961f09e 205 }
db66e574 206 }
db66e574
JD
207}
208
d88744a4 209/*
92816cc3 210 * Check if the last rotation was completed, called with session lock held.
d2956687
JG
211 * Should only return non-zero in the event of a fatal error. Doing so will
212 * shutdown the thread.
d88744a4 213 */
0038180d
JG
214int check_session_rotation_pending(ltt_session& session,
215 notification_thread_handle& notification_thread_handle)
d88744a4
JD
216{
217 int ret;
92816cc3 218 struct lttng_trace_archive_location *location;
d2956687
JG
219 enum lttng_trace_chunk_status chunk_status;
220 bool rotation_completed = false;
221 const char *archived_chunk_name;
222 uint64_t chunk_being_archived_id;
223
0038180d 224 if (!session.chunk_being_archived) {
dc1d5967
FD
225 ret = 0;
226 goto end;
227 }
228
28ab034a 229 chunk_status =
0038180d 230 lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
a0377dfe 231 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d88744a4 232
bd0514a5 233 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
0038180d 234 session.name,
28ab034a 235 chunk_being_archived_id);
d2956687 236
faf1bdcf
JG
237 /*
238 * The rotation-pending check timer of a session is launched in
239 * one-shot mode. If the rotation is incomplete, the rotation
240 * thread will re-enable the pending-check timer.
241 *
242 * The timer thread can't stop the timer itself since it is involved
243 * in the check for the timer's quiescence.
244 */
245 ret = timer_session_rotation_pending_check_stop(session);
246 if (ret) {
6ae1bf46 247 goto check_ongoing_rotation;
faf1bdcf
JG
248 }
249
0038180d
JG
250 check_session_rotation_pending_on_consumers(session, rotation_completed);
251 if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
6ae1bf46 252 goto check_ongoing_rotation;
92816cc3
JG
253 }
254
92816cc3
JG
255 /*
256 * Now we can clear the "ONGOING" state in the session. New
257 * rotations can start now.
258 */
28ab034a 259 chunk_status = lttng_trace_chunk_get_name(
0038180d 260 session.chunk_being_archived, &archived_chunk_name, nullptr);
a0377dfe 261 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
0038180d
JG
262 free(session.last_archived_chunk_name);
263 session.last_archived_chunk_name = strdup(archived_chunk_name);
264 if (!session.last_archived_chunk_name) {
d2956687
JG
265 PERROR("Failed to duplicate archived chunk name");
266 }
0038180d 267
d2956687 268 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
92816cc3 269
0038180d
JG
270 if (!session.quiet_rotation) {
271 location = session_get_trace_archive_location(&session);
7fdbed1c 272 ret = notification_thread_command_session_rotation_completed(
0038180d
JG
273 &notification_thread_handle,
274 session.id,
275 session.last_archived_chunk_id.value,
28ab034a 276 location);
d3740619 277 lttng_trace_archive_location_put(location);
7fdbed1c 278 if (ret != LTTNG_OK) {
bd0514a5 279 ERR("Failed to notify notification thread of completed rotation for session %s",
0038180d 280 session.name);
7fdbed1c 281 }
92816cc3
JG
282 }
283
284 ret = 0;
6ae1bf46 285check_ongoing_rotation:
0038180d
JG
286 if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
287 chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
28ab034a 288 &chunk_being_archived_id);
a0377dfe 289 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 290
bd0514a5 291 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
28ab034a 292 chunk_being_archived_id,
0038180d
JG
293 session.name);
294 ret = timer_session_rotation_pending_check_start(&session,
28ab034a 295 DEFAULT_ROTATE_PENDING_TIMER);
92816cc3 296 if (ret) {
d2956687 297 ERR("Failed to re-enable rotation pending timer");
92816cc3
JG
298 ret = -1;
299 goto end;
300 }
301 }
302
6ae1bf46 303end:
d88744a4
JD
304 return ret;
305}
306
ed1e52a3 307/* Call with the session and session_list locks held. */
0038180d 308int launch_session_rotation(ltt_session& session)
259c2674
JD
309{
310 int ret;
92816cc3 311 struct lttng_rotate_session_return rotation_return;
259c2674 312
0038180d 313 DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
259c2674 314
0038180d
JG
315 ASSERT_SESSION_LIST_LOCKED();
316 ASSERT_LOCKED(session.lock);
317
318 ret = cmd_rotate_session(&session,
319 &rotation_return,
320 false,
321 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
92816cc3 322 if (ret == LTTNG_OK) {
bd0514a5 323 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
0038180d 324 session.name);
92816cc3
JG
325 } else {
326 /* Don't consider errors as fatal. */
bd0514a5 327 DBG("Scheduled time-based rotation aborted for session %s: %s",
0038180d 328 session.name,
28ab034a 329 lttng_strerror(ret));
259c2674 330 }
0038180d 331
92816cc3
JG
332 return 0;
333}
259c2674 334
0038180d
JG
335int run_job(const rotation_thread_job& job,
336 ltt_session& session,
337 notification_thread_handle& notification_thread_handle)
92816cc3
JG
338{
339 int ret;
259c2674 340
0038180d
JG
341 switch (job.type) {
342 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
16100d7a 343 ret = launch_session_rotation(session);
92816cc3 344 break;
0038180d 345 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
28ab034a 346 ret = check_session_rotation_pending(session, notification_thread_handle);
92816cc3
JG
347 break;
348 default:
349 abort();
259c2674 350 }
0038180d 351
259c2674
JD
352 return ret;
353}
354
0038180d 355bool shutdown_rotation_thread(void *thread_data)
d88744a4 356{
0038180d 357 auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
d88744a4 358
0038180d
JG
359 return handle->shutdown();
360}
361} /* namespace */
d88744a4 362
0038180d
JG
363ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
364{
365 auto queue = zmalloc<ls::rotation_thread_timer_queue>();
366 if (!queue) {
367 PERROR("Failed to allocate timer rotate queue");
368 goto end;
369 }
370
371 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
372 CDS_INIT_LIST_HEAD(&queue->list);
373 pthread_mutex_init(&queue->lock, nullptr);
374end:
375 return queue;
376}
377
378void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
379{
380 if (!queue) {
381 return;
382 }
383
384 lttng_pipe_destroy(queue->event_pipe);
385
386 {
387 lttng::pthread::lock_guard queue_lock(queue->lock);
388
389 LTTNG_ASSERT(cds_list_empty(&queue->list));
390 }
391
392 pthread_mutex_destroy(&queue->lock);
393 free(queue);
394}
395
28f23191
JG
396ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
397 notification_thread_handle& notification_thread_handle) :
0038180d
JG
398 _rotation_timer_queue{ rotation_timer_queue },
399 _notification_thread_handle{ notification_thread_handle }
400{
401 _quit_pipe.reset([]() {
402 auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
403 if (!raw_pipe) {
404 LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
d88744a4 405 }
d88744a4 406
0038180d
JG
407 return raw_pipe;
408 }());
409
410 _notification_channel.reset([]() {
411 auto channel = lttng_notification_channel_create(
412 lttng_session_daemon_notification_endpoint);
413 if (!channel) {
414 LTTNG_THROW_ERROR(
415 "Failed to create notification channel of rotation thread");
416 }
417
418 return channel;
419 }());
420
421 lttng_poll_init(&_events);
422
423 /*
424 * Create pollset with size 4:
425 * - rotation thread quit pipe,
426 * - rotation thread timer queue pipe,
427 * - notification channel sock,
428 * - subscribtion change event fd
429 */
430 if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
431 LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
432 }
433
434 if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
435 LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
436 }
437
438 if (lttng_poll_add(&_events,
439 lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
440 LPOLLIN) < 0) {
441 LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
442 }
443
444 if (lttng_poll_add(&_events,
445 _notification_channel_subscribtion_change_eventfd.fd(),
446 LPOLLIN) < 0) {
447 LTTNG_THROW_ERROR(
448 "Failed to add rotation thread notification channel subscription change eventfd to poll set");
449 }
450
451 if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
452 LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
453 }
454}
455
456ls::rotation_thread::~rotation_thread()
457{
458 lttng_poll_clean(&_events);
459}
460
461void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
28f23191
JG
462 ls::rotation_thread_job_type job_type,
463 ltt_session *session)
0038180d
JG
464{
465 const char dummy = '!';
466 struct rotation_thread_job *job = nullptr;
467 const char *job_type_str = get_job_type_str(job_type);
468 lttng::pthread::lock_guard queue_lock(queue->lock);
469
470 if (timer_job_exists(queue, job_type, session)) {
471 /*
472 * This timer job is already pending, we don't need to add
473 * it.
474 */
475 return;
476 }
477
478 job = zmalloc<rotation_thread_job>();
479 if (!job) {
480 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
481 job_type_str,
482 session->name);
483 return;
484 }
485
486 /* No reason for this to fail as the caller must hold a reference. */
487 (void) session_get(session);
488
489 job->session = session;
490 job->type = job_type;
491 cds_list_add_tail(&job->head, &queue->list);
492
493 const int write_ret =
494 lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
495 if (write_ret < 0) {
496 /*
497 * We do not want to block in the timer handler, the job has
498 * been enqueued in the list, the wakeup pipe is probably full,
499 * the job will be processed when the rotation_thread catches
500 * up.
501 */
502 DIAGNOSTIC_PUSH
503 DIAGNOSTIC_IGNORE_LOGICAL_OP
504 if (errno == EAGAIN || errno == EWOULDBLOCK) {
505 DIAGNOSTIC_POP
d88744a4 506 /*
0038180d
JG
507 * Not an error, but would be surprising and indicate
508 * that the rotation thread can't keep up with the
509 * current load.
d88744a4 510 */
0038180d
JG
511 DBG("Wake-up pipe of rotation thread job queue is full");
512 return;
d88744a4
JD
513 }
514
0038180d
JG
515 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
516 job_type_str,
517 session->name);
518 return;
d88744a4 519 }
0038180d 520}
d88744a4 521
0038180d
JG
522void ls::rotation_thread::_handle_job_queue()
523{
524 for (;;) {
525 rotation_thread_job::uptr job;
526
527 {
528 /* Take the queue lock only to pop an element from the list. */
529 lttng::pthread::lock_guard rotation_timer_queue_lock(
530 _rotation_timer_queue.lock);
531 if (cds_list_empty(&_rotation_timer_queue.list)) {
532 break;
533 }
d88744a4 534
0038180d
JG
535 job.reset(cds_list_first_entry(
536 &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
537 cds_list_del(&job->head);
538 }
539
540 session_lock_list();
28f23191
JG
541 const auto unlock_list =
542 lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
0038180d
JG
543
544 /* locked_ptr will unlock the session and release the ref held by the job. */
545 session_lock(job->session);
546 auto session = ltt_session::locked_ptr(job->session);
547
548 if (run_job(*job, *session, _notification_thread_handle)) {
549 return;
550 }
551 }
d88744a4
JD
552}
553
28f23191 554void ls::rotation_thread::_handle_notification(const lttng_notification& notification)
90936dcf
JD
555{
556 int ret = 0;
cd9adb8b 557 const char *condition_session_name = nullptr;
90936dcf
JD
558 enum lttng_condition_status condition_status;
559 enum lttng_evaluation_status evaluation_status;
560 uint64_t consumed;
0038180d
JG
561 auto *condition = lttng_notification_get_const_condition(&notification);
562 auto *evaluation = lttng_notification_get_const_evaluation(&notification);
563 const auto condition_type = lttng_condition_get_type(condition);
90936dcf
JD
564
565 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
0038180d 566 LTTNG_THROW_ERROR("Unexpected condition type");
90936dcf
JD
567 }
568
0038180d 569 /* Fetch info to test. */
90936dcf 570 condition_status = lttng_condition_session_consumed_size_get_session_name(
28ab034a 571 condition, &condition_session_name);
90936dcf 572 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
0038180d 573 LTTNG_THROW_ERROR("Session name could not be fetched from notification");
90936dcf 574 }
0038180d 575
28ab034a
JG
576 evaluation_status =
577 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
90936dcf 578 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
0038180d 579 LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
90936dcf
JD
580 }
581
0038180d
JG
582 DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
583 condition_session_name,
584 consumed);
585
90936dcf 586 session_lock_list();
0038180d
JG
587 const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
588
589 ltt_session::locked_ptr session{ [&condition_session_name]() {
590 auto raw_session_ptr = session_find_by_name(condition_session_name);
591
592 if (raw_session_ptr) {
593 session_lock(raw_session_ptr);
594 }
595
596 return raw_session_ptr;
597 }() };
90936dcf 598 if (!session) {
0038180d
JG
599 DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
600 lttng_condition_type_str(condition_type),
601 condition_session_name);
eb2827a4
JG
602 /*
603 * Not a fatal error: a session can be destroyed before we get
604 * the chance to handle the notification.
605 */
0038180d 606 return;
90936dcf 607 }
90936dcf 608
c08136a3 609 if (!lttng_trigger_is_equal(session->rotate_trigger,
0038180d
JG
610 lttng_notification_get_const_trigger(&notification))) {
611 DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
612 return;
c08136a3
JG
613 }
614
0038180d 615 unsubscribe_session_consumed_size_rotation(*session);
90936dcf 616
2545db87 617 ret = cmd_rotate_session(
0038180d 618 session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
2545db87
JG
619 switch (ret) {
620 case LTTNG_OK:
621 break;
622 case -LTTNG_ERR_ROTATION_PENDING:
90936dcf 623 DBG("Rotate already pending, subscribe to the next threshold value");
2545db87
JG
624 break;
625 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
626 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
627 break;
628 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
629 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
630 break;
631 default:
0038180d
JG
632 LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
633 static_cast<lttng_error_code>(-ret));
90936dcf 634 }
90936dcf 635
0038180d 636 subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
90936dcf
JD
637}
638
0038180d 639void ls::rotation_thread::_handle_notification_channel_activity()
90936dcf 640{
dc65dda3 641 bool notification_pending = true;
90936dcf 642
dc65dda3
JG
643 /*
644 * A notification channel may have multiple notifications queued-up internally in
645 * its buffers. This is because a notification channel multiplexes command replies
646 * and notifications. The current protocol specifies that multiple notifications can be
647 * received before the reply to a command.
648 *
649 * In such cases, the notification channel client implementation internally queues them and
650 * provides them on the next calls to lttng_notification_channel_get_next_notification().
651 * This is correct with respect to the public API, which is intended to be used in "blocking
652 * mode".
653 *
654 * However, this internal user relies on poll/epoll to wake-up when data is available
655 * on the notification channel's socket. As such, it can't assume that a wake-up means only
656 * one notification is available for consumption since many of them may have been queued in
657 * the channel's internal buffers.
658 */
659 while (notification_pending) {
0038180d
JG
660 const auto pending_status = lttng_notification_channel_has_pending_notification(
661 _notification_channel.get(), &notification_pending);
662 if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
663 LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
dc65dda3 664 }
d73ee93f 665
dc65dda3 666 if (!notification_pending) {
0038180d 667 return;
dc65dda3 668 }
d73ee93f 669
dc65dda3 670 /* Receive the next notification. */
0038180d
JG
671 lttng_notification::uptr notification;
672 enum lttng_notification_channel_status next_notification_status;
673
674 {
675 struct lttng_notification *raw_notification_ptr;
676
677 next_notification_status = lttng_notification_channel_get_next_notification(
678 _notification_channel.get(), &raw_notification_ptr);
679 notification.reset(raw_notification_ptr);
680 }
681
682 switch (next_notification_status) {
dc65dda3
JG
683 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
684 break;
685 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
686 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
0038180d 687 return;
dc65dda3 688 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
0038180d 689 LTTNG_THROW_ERROR("Notification channel was closed");
dc65dda3
JG
690 default:
691 /* Unhandled conditions / errors. */
0038180d 692 LTTNG_THROW_ERROR("Unknown notification channel status");
dc65dda3 693 }
90936dcf 694
0038180d 695 _handle_notification(*notification);
90936dcf 696 }
90936dcf
JD
697}
698
0038180d 699void ls::rotation_thread::_thread_function() noexcept
db66e574 700{
bd0514a5 701 DBG("Started rotation thread");
0038180d
JG
702
703 try {
704 _run();
705 } catch (const std::exception& e) {
706 ERR_FMT("Fatal rotation thread error: {}", e.what());
707 }
708
709 DBG("Thread exit");
710}
711
712void ls::rotation_thread::_run()
713{
f620cc28 714 rcu_register_thread();
0038180d
JG
715 const auto unregister_rcu_thread =
716 lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
717
f620cc28 718 rcu_thread_online();
0038180d
JG
719 const auto offline_rcu_thread =
720 lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
721
412d7227 722 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
f620cc28 723 health_code_update();
0038180d
JG
724 const auto unregister_health =
725 lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
db66e574 726
0038180d 727 const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
db66e574 728
db66e574 729 while (true) {
db66e574 730 health_poll_entry();
bd0514a5 731 DBG("Entering poll wait");
0038180d
JG
732 auto poll_wait_ret = lttng_poll_wait(&_events, -1);
733 DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
db66e574 734 health_poll_exit();
0038180d 735 if (poll_wait_ret < 0) {
db66e574
JD
736 /*
737 * Restart interrupted system call.
738 */
739 if (errno == EINTR) {
740 continue;
741 }
0038180d
JG
742
743 LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
db66e574
JD
744 }
745
0038180d
JG
746 const auto fd_count = poll_wait_ret;
747 for (int i = 0; i < fd_count; i++) {
748 const auto fd = LTTNG_POLL_GETFD(&_events, i);
749 const auto revents = LTTNG_POLL_GETEV(&_events, i);
db66e574 750
0038180d 751 DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
db66e574 752
92816cc3 753 if (revents & LPOLLERR) {
0038180d
JG
754 LTTNG_THROW_ERROR(
755 fmt::format("Polling returned an error on fd: fd={}", fd));
92816cc3
JG
756 }
757
0038180d
JG
758 if (fd == _notification_channel->socket ||
759 fd == _notification_channel_subscribtion_change_eventfd.fd()) {
760 try {
761 _handle_notification_channel_activity();
762 } catch (const lttng::ctl::error& e) {
763 /*
764 * The only non-fatal error (rotation failed), others
765 * are caught at the top-level.
766 */
767 DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
768 e.what());
769 continue;
85e17b27 770 }
dc65dda3 771
0038180d 772 if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
28f23191
JG
773 _notification_channel_subscribtion_change_eventfd
774 .decrement();
dc65dda3 775 }
85e17b27
JG
776 } else {
777 /* Job queue or quit pipe activity. */
85e17b27
JG
778
779 /*
780 * The job queue is serviced if there is
781 * activity on the quit pipe to ensure it is
782 * flushed and all references held in the queue
783 * are released.
784 */
0038180d 785 _handle_job_queue();
64d9b072
JG
786 if (fd == queue_pipe_fd) {
787 char buf;
788
0038180d
JG
789 if (lttng_read(fd, &buf, 1) != 1) {
790 LTTNG_THROW_POSIX(
791 fmt::format(
792 "Failed to read from wakeup pipe: fd={}",
793 fd),
794 errno);
64d9b072
JG
795 }
796 } else {
bd0514a5 797 DBG("Quit pipe activity");
0038180d 798 return;
90936dcf 799 }
db66e574
JD
800 }
801 }
802 }
db66e574 803}
64d9b072 804
0038180d 805bool ls::rotation_thread::shutdown() const noexcept
64d9b072 806{
0038180d 807 const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
64d9b072
JG
808
809 return notify_thread_pipe(write_fd) == 1;
810}
811
0038180d 812void ls::rotation_thread::launch_thread()
64d9b072 813{
0038180d
JG
814 auto thread = lttng_thread_create(
815 "Rotation",
816 [](void *ptr) {
817 auto handle = reinterpret_cast<rotation_thread *>(ptr);
818
819 handle->_thread_function();
820 return static_cast<void *>(nullptr);
821 },
822 shutdown_rotation_thread,
823 nullptr,
824 this);
64d9b072 825 if (!thread) {
0038180d 826 LTTNG_THROW_ERROR("Failed to launch rotation thread");
64d9b072 827 }
0038180d 828
64d9b072 829 lttng_thread_put(thread);
0038180d
JG
830}
831
832void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
28f23191 833 std::uint64_t size)
0038180d
JG
834{
835 const struct lttng_credentials session_creds = {
836 .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
837 .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
838 };
839
840 ASSERT_LOCKED(session.lock);
841
842 auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
843 lttng_condition_session_consumed_size_create());
844 if (!rotate_condition) {
845 LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
846 }
847
848 auto condition_status =
849 lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
850 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
851 LTTNG_THROW_ERROR(fmt::format(
852 "Could not set session consumed size condition threshold: size={}", size));
853 }
854
28f23191
JG
855 condition_status = lttng_condition_session_consumed_size_set_session_name(
856 rotate_condition.get(), session.name);
0038180d
JG
857 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
858 LTTNG_THROW_ERROR(fmt::format(
859 "Could not set session consumed size condition session name: name=`{}`",
860 session.name));
861 }
862
863 auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
864 lttng_action_notify_create());
865 if (!notify_action) {
866 LTTNG_THROW_POSIX("Could not create notify action", errno);
867 }
868
869 LTTNG_ASSERT(!session.rotate_trigger);
870 /* trigger acquires its own reference to condition and action on success. */
871 auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
872 lttng_trigger_create(rotate_condition.get(), notify_action.get()));
28f23191 873 if (!trigger) {
0038180d
JG
874 LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
875 }
876
877 /* Ensure this trigger is not visible to external users. */
878 lttng_trigger_set_hidden(trigger.get());
879 lttng_trigger_set_credentials(trigger.get(), &session_creds);
880
28f23191
JG
881 auto nc_status = lttng_notification_channel_subscribe(_notification_channel.get(),
882 rotate_condition.get());
0038180d
JG
883 if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
884 LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
885 }
886
887 /*
888 * Ensure any notification queued during the subscription are consumed by queueing an
889 * event.
890 */
891 _notification_channel_subscribtion_change_eventfd.increment();
892
893 const auto register_ret = notification_thread_command_register_trigger(
894 &_notification_thread_handle, trigger.get(), true);
895 if (register_ret != LTTNG_OK) {
896 LTTNG_THROW_CTL(
897 fmt::format(
898 "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
899 session.name,
900 size),
901 register_ret);
902 }
903
904 /* Ownership transferred to the session. */
905 session.rotate_trigger = trigger.release();
906}
907
908void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
909{
910 LTTNG_ASSERT(session.rotate_trigger);
911
912 const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
913 lttng_trigger_put(session.rotate_trigger);
914 session.rotate_trigger = nullptr;
915 });
916
917 const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
918 _notification_channel.get(),
919 lttng_trigger_get_const_condition(session.rotate_trigger));
920 if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
921 LTTNG_THROW_ERROR(fmt::format(
922 "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
923 session.name,
924 static_cast<int>(unsubscribe_status)));
925 }
926
927 /*
928 * Ensure any notification queued during the un-subscription are consumed by queueing an
929 * event.
930 */
931 _notification_channel_subscribtion_change_eventfd.increment();
932
933 const auto unregister_status = notification_thread_command_unregister_trigger(
934 &_notification_thread_handle, session.rotate_trigger);
935 if (unregister_status != LTTNG_OK) {
936 LTTNG_THROW_CTL(
937 fmt::format(
938 "Failed to unregister trigger for automatic size-based rotation: session_name{}",
939 session.name),
940 unregister_status);
941 }
64d9b072 942}
This page took 0.108226 seconds and 4 git commands to generate.