Remove fcntl wrapper
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
CommitLineData
db66e574 1/*
ab5be9fa
MJ
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
db66e574 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
db66e574 6 *
db66e574
JD
7 */
8
9#define _LGPL_SOURCE
28ab034a
JG
10#include "cmd.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "notification-thread-commands.hpp"
28ab034a
JG
14#include "rotation-thread.hpp"
15#include "session.hpp"
16#include "thread.hpp"
17#include "timer.hpp"
18#include "utils.hpp"
19
20#include <common/align.hpp>
c9e313bc
SM
21#include <common/config/session-config.hpp>
22#include <common/defaults.hpp>
28ab034a 23#include <common/error.hpp>
0038180d
JG
24#include <common/eventfd.hpp>
25#include <common/exception.hpp>
26#include <common/file-descriptor.hpp>
27#include <common/format.hpp>
c9e313bc 28#include <common/futex.hpp>
c9e313bc 29#include <common/hashtable/utils.hpp>
c9e313bc 30#include <common/kernel-ctl/kernel-ctl.hpp>
0038180d
JG
31#include <common/locked-reference.hpp>
32#include <common/make-unique-wrapper.hpp>
33#include <common/pthread-lock.hpp>
34#include <common/scope-exit.hpp>
28ab034a 35#include <common/time.hpp>
56047f5a 36#include <common/urcu.hpp>
28ab034a
JG
37#include <common/utils.hpp>
38
0038180d 39#include <lttng/action/action-internal.hpp>
c9e313bc 40#include <lttng/condition/condition-internal.hpp>
28ab034a
JG
41#include <lttng/location-internal.hpp>
42#include <lttng/notification/channel-internal.hpp>
c08136a3 43#include <lttng/notification/notification-internal.hpp>
28ab034a
JG
44#include <lttng/rotate-internal.hpp>
45#include <lttng/trigger/trigger.h>
db66e574 46
671e39d7 47#include <fcntl.h>
28ab034a 48#include <inttypes.h>
0038180d 49#include <memory>
28ab034a 50#include <signal.h>
dc65dda3 51#include <sys/eventfd.h>
28ab034a
JG
52#include <sys/stat.h>
53#include <time.h>
db66e574
JD
54#include <urcu.h>
55#include <urcu/list.h>
db66e574 56
0038180d 57namespace ls = lttng::sessiond;
db66e574 58
92816cc3
JG
59/*
60 * The timer thread enqueues jobs and wakes up the rotation thread.
61 * When the rotation thread wakes up, it empties the queue.
62 */
0038180d 63struct ls::rotation_thread_timer_queue {
92816cc3
JG
64 struct lttng_pipe *event_pipe;
65 struct cds_list_head list;
66 pthread_mutex_t lock;
67};
68
f1494934
JG
69namespace {
70struct rotation_thread_job {
0038180d
JG
71 using uptr = std::unique_ptr<
72 rotation_thread_job,
73 lttng::details::create_unique_class<rotation_thread_job, lttng::free>>;
74
75 enum ls::rotation_thread_job_type type;
f1494934
JG
76 struct ltt_session *session;
77 /* List member in struct rotation_thread_timer_queue. */
78 struct cds_list_head head;
79};
f1494934 80
0038180d 81const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
db66e574 82{
92816cc3 83 switch (job_type) {
0038180d 84 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
92816cc3 85 return "CHECK_PENDING_ROTATION";
0038180d 86 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
92816cc3
JG
87 return "SCHEDULED_ROTATION";
88 default:
89 abort();
90 }
db66e574
JD
91}
92
92816cc3
JG
93/*
94 * Called with the rotation_thread_timer_queue lock held.
95 * Return true if the same timer job already exists in the queue, false if not.
96 */
0038180d
JG
97bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
98 ls::rotation_thread_job_type job_type,
99 ltt_session *session)
92816cc3
JG
100{
101 bool exists = false;
102 struct rotation_thread_job *job;
103
28ab034a 104 cds_list_for_each_entry (job, &queue->list, head) {
c7031a2c 105 if (job->session == session && job->type == job_type) {
92816cc3
JG
106 exists = true;
107 goto end;
db66e574 108 }
db66e574 109 }
92816cc3
JG
110end:
111 return exists;
112}
113
0038180d 114void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
92816cc3 115{
db582e11 116 int ret = 0;
92816cc3
JG
117 struct consumer_socket *socket;
118 struct cds_lfht_iter iter;
d2956687
JG
119 enum consumer_trace_chunk_exists_status exists_status;
120 uint64_t relayd_id;
121 bool chunk_exists_on_peer = false;
122 enum lttng_trace_chunk_status chunk_status;
56047f5a 123 lttng::urcu::read_lock_guard read_lock;
d2956687 124
0038180d 125 LTTNG_ASSERT(session.chunk_being_archived);
92816cc3
JG
126
127 /*
128 * Check for a local pending rotation on all consumers (32-bit
129 * user space, 64-bit user space, and kernel).
130 */
0038180d 131 if (!session.ust_session) {
92816cc3
JG
132 goto skip_ust;
133 }
56047f5a 134
28ab034a 135 cds_lfht_for_each_entry (
0038180d
JG
136 session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
137 relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 138 -1ULL :
0038180d 139 session.ust_session->consumer->net_seq_index;
d2956687 140
0038180d 141 lttng::pthread::lock_guard socket_lock(*socket->lock);
d2956687 142 ret = consumer_trace_chunk_exists(socket,
28ab034a 143 relayd_id,
0038180d
JG
144 session.id,
145 session.chunk_being_archived,
28ab034a 146 &exists_status);
d2956687 147 if (ret) {
83ed9e90 148 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3 149 goto end;
db66e574 150 }
d2956687 151
16100d7a 152 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
153 chunk_exists_on_peer = true;
154 goto end;
16100d7a 155 }
16100d7a 156 }
db66e574 157
92816cc3 158skip_ust:
0038180d 159 if (!session.kernel_session) {
92816cc3 160 goto skip_kernel;
db66e574 161 }
0038180d 162
28ab034a 163 cds_lfht_for_each_entry (
0038180d
JG
164 session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
165 lttng::pthread::lock_guard socket_lock(*socket->lock);
166
167 relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 168 -1ULL :
0038180d 169 session.kernel_session->consumer->net_seq_index;
d2956687
JG
170
171 ret = consumer_trace_chunk_exists(socket,
28ab034a 172 relayd_id,
0038180d
JG
173 session.id,
174 session.chunk_being_archived,
28ab034a 175 &exists_status);
d2956687 176 if (ret) {
83ed9e90 177 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3
JG
178 goto end;
179 }
d2956687 180
16100d7a 181 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
182 chunk_exists_on_peer = true;
183 goto end;
16100d7a 184 }
92816cc3
JG
185 }
186skip_kernel:
187end:
db66e574 188
d2956687
JG
189 if (!chunk_exists_on_peer) {
190 uint64_t chunk_being_archived_id;
191
0038180d 192 chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
28ab034a 193 &chunk_being_archived_id);
a0377dfe 194 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
28ab034a
JG
195 DBG("Rotation of trace archive %" PRIu64
196 " of session \"%s\" is complete on all consumers",
197 chunk_being_archived_id,
0038180d 198 session.name);
db66e574 199 }
0038180d
JG
200
201 _rotation_completed = !chunk_exists_on_peer;
92816cc3 202 if (ret) {
28ab034a 203 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
2961f09e 204 if (ret) {
0038180d 205 ERR("Failed to reset rotation state of session \"%s\"", session.name);
2961f09e 206 }
db66e574 207 }
db66e574
JD
208}
209
d88744a4 210/*
92816cc3 211 * Check if the last rotation was completed, called with session lock held.
d2956687
JG
212 * Should only return non-zero in the event of a fatal error. Doing so will
213 * shutdown the thread.
d88744a4 214 */
0038180d
JG
215int check_session_rotation_pending(ltt_session& session,
216 notification_thread_handle& notification_thread_handle)
d88744a4
JD
217{
218 int ret;
92816cc3 219 struct lttng_trace_archive_location *location;
d2956687
JG
220 enum lttng_trace_chunk_status chunk_status;
221 bool rotation_completed = false;
222 const char *archived_chunk_name;
223 uint64_t chunk_being_archived_id;
224
0038180d 225 if (!session.chunk_being_archived) {
dc1d5967
FD
226 ret = 0;
227 goto end;
228 }
229
28ab034a 230 chunk_status =
0038180d 231 lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
a0377dfe 232 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d88744a4 233
bd0514a5 234 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
0038180d 235 session.name,
28ab034a 236 chunk_being_archived_id);
d2956687 237
faf1bdcf
JG
238 /*
239 * The rotation-pending check timer of a session is launched in
240 * one-shot mode. If the rotation is incomplete, the rotation
241 * thread will re-enable the pending-check timer.
242 *
243 * The timer thread can't stop the timer itself since it is involved
244 * in the check for the timer's quiescence.
245 */
246 ret = timer_session_rotation_pending_check_stop(session);
247 if (ret) {
6ae1bf46 248 goto check_ongoing_rotation;
faf1bdcf
JG
249 }
250
0038180d
JG
251 check_session_rotation_pending_on_consumers(session, rotation_completed);
252 if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
6ae1bf46 253 goto check_ongoing_rotation;
92816cc3
JG
254 }
255
92816cc3
JG
256 /*
257 * Now we can clear the "ONGOING" state in the session. New
258 * rotations can start now.
259 */
28ab034a 260 chunk_status = lttng_trace_chunk_get_name(
0038180d 261 session.chunk_being_archived, &archived_chunk_name, nullptr);
a0377dfe 262 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
0038180d
JG
263 free(session.last_archived_chunk_name);
264 session.last_archived_chunk_name = strdup(archived_chunk_name);
265 if (!session.last_archived_chunk_name) {
d2956687
JG
266 PERROR("Failed to duplicate archived chunk name");
267 }
0038180d 268
d2956687 269 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
92816cc3 270
0038180d
JG
271 if (!session.quiet_rotation) {
272 location = session_get_trace_archive_location(&session);
7fdbed1c 273 ret = notification_thread_command_session_rotation_completed(
0038180d
JG
274 &notification_thread_handle,
275 session.id,
276 session.last_archived_chunk_id.value,
28ab034a 277 location);
d3740619 278 lttng_trace_archive_location_put(location);
7fdbed1c 279 if (ret != LTTNG_OK) {
bd0514a5 280 ERR("Failed to notify notification thread of completed rotation for session %s",
0038180d 281 session.name);
7fdbed1c 282 }
92816cc3
JG
283 }
284
285 ret = 0;
6ae1bf46 286check_ongoing_rotation:
0038180d
JG
287 if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
288 chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
28ab034a 289 &chunk_being_archived_id);
a0377dfe 290 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 291
bd0514a5 292 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
28ab034a 293 chunk_being_archived_id,
0038180d
JG
294 session.name);
295 ret = timer_session_rotation_pending_check_start(&session,
28ab034a 296 DEFAULT_ROTATE_PENDING_TIMER);
92816cc3 297 if (ret) {
d2956687 298 ERR("Failed to re-enable rotation pending timer");
92816cc3
JG
299 ret = -1;
300 goto end;
301 }
302 }
303
6ae1bf46 304end:
d88744a4
JD
305 return ret;
306}
307
ed1e52a3 308/* Call with the session and session_list locks held. */
0038180d 309int launch_session_rotation(ltt_session& session)
259c2674
JD
310{
311 int ret;
92816cc3 312 struct lttng_rotate_session_return rotation_return;
259c2674 313
0038180d 314 DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
259c2674 315
0038180d
JG
316 ASSERT_SESSION_LIST_LOCKED();
317 ASSERT_LOCKED(session.lock);
318
319 ret = cmd_rotate_session(&session,
320 &rotation_return,
321 false,
322 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
92816cc3 323 if (ret == LTTNG_OK) {
bd0514a5 324 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
0038180d 325 session.name);
92816cc3
JG
326 } else {
327 /* Don't consider errors as fatal. */
bd0514a5 328 DBG("Scheduled time-based rotation aborted for session %s: %s",
0038180d 329 session.name,
28ab034a 330 lttng_strerror(ret));
259c2674 331 }
0038180d 332
92816cc3
JG
333 return 0;
334}
259c2674 335
0038180d
JG
336int run_job(const rotation_thread_job& job,
337 ltt_session& session,
338 notification_thread_handle& notification_thread_handle)
92816cc3
JG
339{
340 int ret;
259c2674 341
0038180d
JG
342 switch (job.type) {
343 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
16100d7a 344 ret = launch_session_rotation(session);
92816cc3 345 break;
0038180d 346 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
28ab034a 347 ret = check_session_rotation_pending(session, notification_thread_handle);
92816cc3
JG
348 break;
349 default:
350 abort();
259c2674 351 }
0038180d 352
259c2674
JD
353 return ret;
354}
355
0038180d 356bool shutdown_rotation_thread(void *thread_data)
d88744a4 357{
0038180d 358 auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
d88744a4 359
0038180d
JG
360 return handle->shutdown();
361}
362} /* namespace */
d88744a4 363
0038180d
JG
364ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
365{
366 auto queue = zmalloc<ls::rotation_thread_timer_queue>();
367 if (!queue) {
368 PERROR("Failed to allocate timer rotate queue");
369 goto end;
370 }
371
372 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
373 CDS_INIT_LIST_HEAD(&queue->list);
374 pthread_mutex_init(&queue->lock, nullptr);
375end:
376 return queue;
377}
378
379void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
380{
381 if (!queue) {
382 return;
383 }
384
385 lttng_pipe_destroy(queue->event_pipe);
386
387 {
388 lttng::pthread::lock_guard queue_lock(queue->lock);
389
390 LTTNG_ASSERT(cds_list_empty(&queue->list));
391 }
392
393 pthread_mutex_destroy(&queue->lock);
394 free(queue);
395}
396
28f23191
JG
397ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
398 notification_thread_handle& notification_thread_handle) :
83885b70
MJ
399 _rotation_timer_queue(rotation_timer_queue),
400 _notification_thread_handle(notification_thread_handle)
0038180d
JG
401{
402 _quit_pipe.reset([]() {
403 auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
404 if (!raw_pipe) {
405 LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
d88744a4 406 }
d88744a4 407
0038180d
JG
408 return raw_pipe;
409 }());
410
411 _notification_channel.reset([]() {
412 auto channel = lttng_notification_channel_create(
413 lttng_session_daemon_notification_endpoint);
414 if (!channel) {
415 LTTNG_THROW_ERROR(
416 "Failed to create notification channel of rotation thread");
417 }
418
419 return channel;
420 }());
421
422 lttng_poll_init(&_events);
423
424 /*
425 * Create pollset with size 4:
426 * - rotation thread quit pipe,
427 * - rotation thread timer queue pipe,
428 * - notification channel sock,
429 * - subscribtion change event fd
430 */
431 if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
432 LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
433 }
434
435 if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
436 LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
437 }
438
439 if (lttng_poll_add(&_events,
440 lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
441 LPOLLIN) < 0) {
442 LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
443 }
444
445 if (lttng_poll_add(&_events,
446 _notification_channel_subscribtion_change_eventfd.fd(),
447 LPOLLIN) < 0) {
448 LTTNG_THROW_ERROR(
449 "Failed to add rotation thread notification channel subscription change eventfd to poll set");
450 }
451
452 if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
453 LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
454 }
455}
456
457ls::rotation_thread::~rotation_thread()
458{
459 lttng_poll_clean(&_events);
460}
461
462void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
28f23191
JG
463 ls::rotation_thread_job_type job_type,
464 ltt_session *session)
0038180d
JG
465{
466 const char dummy = '!';
467 struct rotation_thread_job *job = nullptr;
468 const char *job_type_str = get_job_type_str(job_type);
469 lttng::pthread::lock_guard queue_lock(queue->lock);
470
471 if (timer_job_exists(queue, job_type, session)) {
472 /*
473 * This timer job is already pending, we don't need to add
474 * it.
475 */
476 return;
477 }
478
479 job = zmalloc<rotation_thread_job>();
480 if (!job) {
481 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
482 job_type_str,
483 session->name);
484 return;
485 }
486
487 /* No reason for this to fail as the caller must hold a reference. */
488 (void) session_get(session);
489
490 job->session = session;
491 job->type = job_type;
492 cds_list_add_tail(&job->head, &queue->list);
493
494 const int write_ret =
495 lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
496 if (write_ret < 0) {
497 /*
498 * We do not want to block in the timer handler, the job has
499 * been enqueued in the list, the wakeup pipe is probably full,
500 * the job will be processed when the rotation_thread catches
501 * up.
502 */
503 DIAGNOSTIC_PUSH
504 DIAGNOSTIC_IGNORE_LOGICAL_OP
505 if (errno == EAGAIN || errno == EWOULDBLOCK) {
506 DIAGNOSTIC_POP
d88744a4 507 /*
0038180d
JG
508 * Not an error, but would be surprising and indicate
509 * that the rotation thread can't keep up with the
510 * current load.
d88744a4 511 */
0038180d
JG
512 DBG("Wake-up pipe of rotation thread job queue is full");
513 return;
d88744a4
JD
514 }
515
0038180d
JG
516 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
517 job_type_str,
518 session->name);
519 return;
d88744a4 520 }
0038180d 521}
d88744a4 522
0038180d
JG
523void ls::rotation_thread::_handle_job_queue()
524{
525 for (;;) {
526 rotation_thread_job::uptr job;
527
528 {
529 /* Take the queue lock only to pop an element from the list. */
530 lttng::pthread::lock_guard rotation_timer_queue_lock(
531 _rotation_timer_queue.lock);
532 if (cds_list_empty(&_rotation_timer_queue.list)) {
533 break;
534 }
d88744a4 535
0038180d
JG
536 job.reset(cds_list_first_entry(
537 &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
538 cds_list_del(&job->head);
539 }
540
541 session_lock_list();
28f23191
JG
542 const auto unlock_list =
543 lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
0038180d
JG
544
545 /* locked_ptr will unlock the session and release the ref held by the job. */
546 session_lock(job->session);
547 auto session = ltt_session::locked_ptr(job->session);
548
549 if (run_job(*job, *session, _notification_thread_handle)) {
550 return;
551 }
552 }
d88744a4
JD
553}
554
28f23191 555void ls::rotation_thread::_handle_notification(const lttng_notification& notification)
90936dcf
JD
556{
557 int ret = 0;
cd9adb8b 558 const char *condition_session_name = nullptr;
90936dcf
JD
559 enum lttng_condition_status condition_status;
560 enum lttng_evaluation_status evaluation_status;
561 uint64_t consumed;
0038180d
JG
562 auto *condition = lttng_notification_get_const_condition(&notification);
563 auto *evaluation = lttng_notification_get_const_evaluation(&notification);
564 const auto condition_type = lttng_condition_get_type(condition);
90936dcf
JD
565
566 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
0038180d 567 LTTNG_THROW_ERROR("Unexpected condition type");
90936dcf
JD
568 }
569
0038180d 570 /* Fetch info to test. */
90936dcf 571 condition_status = lttng_condition_session_consumed_size_get_session_name(
28ab034a 572 condition, &condition_session_name);
90936dcf 573 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
0038180d 574 LTTNG_THROW_ERROR("Session name could not be fetched from notification");
90936dcf 575 }
0038180d 576
28ab034a
JG
577 evaluation_status =
578 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
90936dcf 579 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
0038180d 580 LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
90936dcf
JD
581 }
582
0038180d
JG
583 DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
584 condition_session_name,
585 consumed);
586
90936dcf 587 session_lock_list();
0038180d
JG
588 const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
589
590 ltt_session::locked_ptr session{ [&condition_session_name]() {
591 auto raw_session_ptr = session_find_by_name(condition_session_name);
592
593 if (raw_session_ptr) {
594 session_lock(raw_session_ptr);
595 }
596
597 return raw_session_ptr;
598 }() };
90936dcf 599 if (!session) {
0038180d
JG
600 DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
601 lttng_condition_type_str(condition_type),
602 condition_session_name);
eb2827a4
JG
603 /*
604 * Not a fatal error: a session can be destroyed before we get
605 * the chance to handle the notification.
606 */
0038180d 607 return;
90936dcf 608 }
90936dcf 609
c08136a3 610 if (!lttng_trigger_is_equal(session->rotate_trigger,
0038180d
JG
611 lttng_notification_get_const_trigger(&notification))) {
612 DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
613 return;
c08136a3
JG
614 }
615
0038180d 616 unsubscribe_session_consumed_size_rotation(*session);
90936dcf 617
2545db87 618 ret = cmd_rotate_session(
0038180d 619 session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
2545db87
JG
620 switch (ret) {
621 case LTTNG_OK:
622 break;
623 case -LTTNG_ERR_ROTATION_PENDING:
90936dcf 624 DBG("Rotate already pending, subscribe to the next threshold value");
2545db87
JG
625 break;
626 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
627 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
628 break;
629 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
630 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
631 break;
632 default:
0038180d
JG
633 LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
634 static_cast<lttng_error_code>(-ret));
90936dcf 635 }
90936dcf 636
0038180d 637 subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
90936dcf
JD
638}
639
0038180d 640void ls::rotation_thread::_handle_notification_channel_activity()
90936dcf 641{
dc65dda3 642 bool notification_pending = true;
90936dcf 643
dc65dda3
JG
644 /*
645 * A notification channel may have multiple notifications queued-up internally in
646 * its buffers. This is because a notification channel multiplexes command replies
647 * and notifications. The current protocol specifies that multiple notifications can be
648 * received before the reply to a command.
649 *
650 * In such cases, the notification channel client implementation internally queues them and
651 * provides them on the next calls to lttng_notification_channel_get_next_notification().
652 * This is correct with respect to the public API, which is intended to be used in "blocking
653 * mode".
654 *
655 * However, this internal user relies on poll/epoll to wake-up when data is available
656 * on the notification channel's socket. As such, it can't assume that a wake-up means only
657 * one notification is available for consumption since many of them may have been queued in
658 * the channel's internal buffers.
659 */
660 while (notification_pending) {
0038180d
JG
661 const auto pending_status = lttng_notification_channel_has_pending_notification(
662 _notification_channel.get(), &notification_pending);
663 if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
664 LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
dc65dda3 665 }
d73ee93f 666
dc65dda3 667 if (!notification_pending) {
0038180d 668 return;
dc65dda3 669 }
d73ee93f 670
dc65dda3 671 /* Receive the next notification. */
0038180d
JG
672 lttng_notification::uptr notification;
673 enum lttng_notification_channel_status next_notification_status;
674
675 {
676 struct lttng_notification *raw_notification_ptr;
677
678 next_notification_status = lttng_notification_channel_get_next_notification(
679 _notification_channel.get(), &raw_notification_ptr);
680 notification.reset(raw_notification_ptr);
681 }
682
683 switch (next_notification_status) {
dc65dda3
JG
684 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
685 break;
686 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
687 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
0038180d 688 return;
dc65dda3 689 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
0038180d 690 LTTNG_THROW_ERROR("Notification channel was closed");
dc65dda3
JG
691 default:
692 /* Unhandled conditions / errors. */
0038180d 693 LTTNG_THROW_ERROR("Unknown notification channel status");
dc65dda3 694 }
90936dcf 695
0038180d 696 _handle_notification(*notification);
90936dcf 697 }
90936dcf
JD
698}
699
0038180d 700void ls::rotation_thread::_thread_function() noexcept
db66e574 701{
bd0514a5 702 DBG("Started rotation thread");
0038180d
JG
703
704 try {
705 _run();
706 } catch (const std::exception& e) {
707 ERR_FMT("Fatal rotation thread error: {}", e.what());
708 }
709
710 DBG("Thread exit");
711}
712
713void ls::rotation_thread::_run()
714{
f620cc28 715 rcu_register_thread();
0038180d
JG
716 const auto unregister_rcu_thread =
717 lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
718
f620cc28 719 rcu_thread_online();
0038180d
JG
720 const auto offline_rcu_thread =
721 lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
722
412d7227 723 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
f620cc28 724 health_code_update();
0038180d
JG
725 const auto unregister_health =
726 lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
db66e574 727
0038180d 728 const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
db66e574 729
db66e574 730 while (true) {
db66e574 731 health_poll_entry();
bd0514a5 732 DBG("Entering poll wait");
0038180d
JG
733 auto poll_wait_ret = lttng_poll_wait(&_events, -1);
734 DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
db66e574 735 health_poll_exit();
0038180d 736 if (poll_wait_ret < 0) {
db66e574
JD
737 /*
738 * Restart interrupted system call.
739 */
740 if (errno == EINTR) {
741 continue;
742 }
0038180d
JG
743
744 LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
db66e574
JD
745 }
746
0038180d
JG
747 const auto fd_count = poll_wait_ret;
748 for (int i = 0; i < fd_count; i++) {
749 const auto fd = LTTNG_POLL_GETFD(&_events, i);
750 const auto revents = LTTNG_POLL_GETEV(&_events, i);
db66e574 751
0038180d 752 DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
db66e574 753
92816cc3 754 if (revents & LPOLLERR) {
0038180d
JG
755 LTTNG_THROW_ERROR(
756 fmt::format("Polling returned an error on fd: fd={}", fd));
92816cc3
JG
757 }
758
0038180d
JG
759 if (fd == _notification_channel->socket ||
760 fd == _notification_channel_subscribtion_change_eventfd.fd()) {
761 try {
762 _handle_notification_channel_activity();
763 } catch (const lttng::ctl::error& e) {
764 /*
765 * The only non-fatal error (rotation failed), others
766 * are caught at the top-level.
767 */
768 DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
769 e.what());
770 continue;
85e17b27 771 }
dc65dda3 772
0038180d 773 if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
28f23191
JG
774 _notification_channel_subscribtion_change_eventfd
775 .decrement();
dc65dda3 776 }
85e17b27
JG
777 } else {
778 /* Job queue or quit pipe activity. */
85e17b27
JG
779
780 /*
781 * The job queue is serviced if there is
782 * activity on the quit pipe to ensure it is
783 * flushed and all references held in the queue
784 * are released.
785 */
0038180d 786 _handle_job_queue();
64d9b072
JG
787 if (fd == queue_pipe_fd) {
788 char buf;
789
0038180d
JG
790 if (lttng_read(fd, &buf, 1) != 1) {
791 LTTNG_THROW_POSIX(
792 fmt::format(
793 "Failed to read from wakeup pipe: fd={}",
794 fd),
795 errno);
64d9b072
JG
796 }
797 } else {
bd0514a5 798 DBG("Quit pipe activity");
0038180d 799 return;
90936dcf 800 }
db66e574
JD
801 }
802 }
803 }
db66e574 804}
64d9b072 805
0038180d 806bool ls::rotation_thread::shutdown() const noexcept
64d9b072 807{
0038180d 808 const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
64d9b072
JG
809
810 return notify_thread_pipe(write_fd) == 1;
811}
812
0038180d 813void ls::rotation_thread::launch_thread()
64d9b072 814{
0038180d
JG
815 auto thread = lttng_thread_create(
816 "Rotation",
817 [](void *ptr) {
818 auto handle = reinterpret_cast<rotation_thread *>(ptr);
819
820 handle->_thread_function();
821 return static_cast<void *>(nullptr);
822 },
823 shutdown_rotation_thread,
824 nullptr,
825 this);
64d9b072 826 if (!thread) {
0038180d 827 LTTNG_THROW_ERROR("Failed to launch rotation thread");
64d9b072 828 }
0038180d 829
64d9b072 830 lttng_thread_put(thread);
0038180d
JG
831}
832
833void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
28f23191 834 std::uint64_t size)
0038180d
JG
835{
836 const struct lttng_credentials session_creds = {
837 .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
838 .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
839 };
840
841 ASSERT_LOCKED(session.lock);
842
843 auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
844 lttng_condition_session_consumed_size_create());
845 if (!rotate_condition) {
846 LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
847 }
848
849 auto condition_status =
850 lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
851 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
852 LTTNG_THROW_ERROR(fmt::format(
853 "Could not set session consumed size condition threshold: size={}", size));
854 }
855
28f23191
JG
856 condition_status = lttng_condition_session_consumed_size_set_session_name(
857 rotate_condition.get(), session.name);
0038180d
JG
858 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
859 LTTNG_THROW_ERROR(fmt::format(
860 "Could not set session consumed size condition session name: name=`{}`",
861 session.name));
862 }
863
864 auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
865 lttng_action_notify_create());
866 if (!notify_action) {
867 LTTNG_THROW_POSIX("Could not create notify action", errno);
868 }
869
870 LTTNG_ASSERT(!session.rotate_trigger);
871 /* trigger acquires its own reference to condition and action on success. */
872 auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
873 lttng_trigger_create(rotate_condition.get(), notify_action.get()));
28f23191 874 if (!trigger) {
0038180d
JG
875 LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
876 }
877
878 /* Ensure this trigger is not visible to external users. */
879 lttng_trigger_set_hidden(trigger.get());
880 lttng_trigger_set_credentials(trigger.get(), &session_creds);
881
28f23191
JG
882 auto nc_status = lttng_notification_channel_subscribe(_notification_channel.get(),
883 rotate_condition.get());
0038180d
JG
884 if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
885 LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
886 }
887
888 /*
889 * Ensure any notification queued during the subscription are consumed by queueing an
890 * event.
891 */
892 _notification_channel_subscribtion_change_eventfd.increment();
893
894 const auto register_ret = notification_thread_command_register_trigger(
895 &_notification_thread_handle, trigger.get(), true);
896 if (register_ret != LTTNG_OK) {
897 LTTNG_THROW_CTL(
898 fmt::format(
899 "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
900 session.name,
901 size),
902 register_ret);
903 }
904
905 /* Ownership transferred to the session. */
906 session.rotate_trigger = trigger.release();
907}
908
909void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
910{
911 LTTNG_ASSERT(session.rotate_trigger);
912
913 const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
914 lttng_trigger_put(session.rotate_trigger);
915 session.rotate_trigger = nullptr;
916 });
917
918 const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
919 _notification_channel.get(),
920 lttng_trigger_get_const_condition(session.rotate_trigger));
921 if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
922 LTTNG_THROW_ERROR(fmt::format(
923 "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
924 session.name,
925 static_cast<int>(unsubscribe_status)));
926 }
927
928 /*
929 * Ensure any notification queued during the un-subscription are consumed by queueing an
930 * event.
931 */
932 _notification_channel_subscribtion_change_eventfd.increment();
933
934 const auto unregister_status = notification_thread_command_unregister_trigger(
935 &_notification_thread_handle, session.rotate_trigger);
936 if (unregister_status != LTTNG_OK) {
937 LTTNG_THROW_CTL(
938 fmt::format(
939 "Failed to unregister trigger for automatic size-based rotation: session_name{}",
940 session.name),
941 unregister_status);
942 }
64d9b072 943}
This page took 0.110233 seconds and 4 git commands to generate.