Replace explicit rcu_read_lock/unlock with lttng::urcu::read_lock_guard
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
CommitLineData
db66e574 1/*
ab5be9fa
MJ
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
db66e574 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
db66e574 6 *
db66e574
JD
7 */
8
9#define _LGPL_SOURCE
28ab034a
JG
10#include "cmd.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "notification-thread-commands.hpp"
14#include "rotate.hpp"
15#include "rotation-thread.hpp"
16#include "session.hpp"
17#include "thread.hpp"
18#include "timer.hpp"
19#include "utils.hpp"
20
21#include <common/align.hpp>
c9e313bc
SM
22#include <common/config/session-config.hpp>
23#include <common/defaults.hpp>
28ab034a 24#include <common/error.hpp>
c9e313bc 25#include <common/futex.hpp>
c9e313bc 26#include <common/hashtable/utils.hpp>
c9e313bc 27#include <common/kernel-ctl/kernel-ctl.hpp>
28ab034a 28#include <common/time.hpp>
56047f5a 29#include <common/urcu.hpp>
28ab034a
JG
30#include <common/utils.hpp>
31
c9e313bc 32#include <lttng/condition/condition-internal.hpp>
28ab034a
JG
33#include <lttng/location-internal.hpp>
34#include <lttng/notification/channel-internal.hpp>
c08136a3 35#include <lttng/notification/notification-internal.hpp>
28ab034a
JG
36#include <lttng/rotate-internal.hpp>
37#include <lttng/trigger/trigger.h>
db66e574 38
28ab034a
JG
39#include <inttypes.h>
40#include <signal.h>
41#include <sys/stat.h>
42#include <time.h>
db66e574
JD
43#include <urcu.h>
44#include <urcu/list.h>
db66e574 45
cd9adb8b 46struct lttng_notification_channel *rotate_notification_channel = nullptr;
90936dcf 47
92816cc3 48struct rotation_thread {
db66e574
JD
49 struct lttng_poll_event events;
50};
51
92816cc3
JG
52/*
53 * The timer thread enqueues jobs and wakes up the rotation thread.
54 * When the rotation thread wakes up, it empties the queue.
55 */
56struct rotation_thread_timer_queue {
57 struct lttng_pipe *event_pipe;
58 struct cds_list_head list;
59 pthread_mutex_t lock;
60};
61
62struct rotation_thread_handle {
92816cc3
JG
63 struct rotation_thread_timer_queue *rotation_timer_queue;
64 /* Access to the notification thread cmd_queue */
65 struct notification_thread_handle *notification_thread_handle;
64d9b072
JG
66 /* Thread-specific quit pipe. */
67 struct lttng_pipe *quit_pipe;
92816cc3
JG
68};
69
f1494934
JG
70namespace {
71struct rotation_thread_job {
72 enum rotation_thread_job_type type;
73 struct ltt_session *session;
74 /* List member in struct rotation_thread_timer_queue. */
75 struct cds_list_head head;
76};
77} /* namespace */
78
28ab034a 79static const char *get_job_type_str(enum rotation_thread_job_type job_type)
db66e574 80{
92816cc3
JG
81 switch (job_type) {
82 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
83 return "CHECK_PENDING_ROTATION";
84 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
85 return "SCHEDULED_ROTATION";
86 default:
87 abort();
88 }
db66e574
JD
89}
90
cd9adb8b 91struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
db66e574 92{
cd9adb8b 93 struct rotation_thread_timer_queue *queue = nullptr;
db66e574 94
64803277 95 queue = zmalloc<rotation_thread_timer_queue>();
92816cc3
JG
96 if (!queue) {
97 PERROR("Failed to allocate timer rotate queue");
98 goto end;
99 }
db66e574 100
92816cc3
JG
101 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
102 CDS_INIT_LIST_HEAD(&queue->list);
cd9adb8b 103 pthread_mutex_init(&queue->lock, nullptr);
92816cc3
JG
104end:
105 return queue;
db66e574
JD
106}
107
28ab034a 108void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
db66e574 109{
92816cc3
JG
110 if (!queue) {
111 return;
db66e574
JD
112 }
113
92816cc3
JG
114 lttng_pipe_destroy(queue->event_pipe);
115
116 pthread_mutex_lock(&queue->lock);
a0377dfe 117 LTTNG_ASSERT(cds_list_empty(&queue->list));
92816cc3
JG
118 pthread_mutex_unlock(&queue->lock);
119 pthread_mutex_destroy(&queue->lock);
120 free(queue);
121}
db66e574 122
92816cc3
JG
123/*
124 * Destroy the thread data previously created by the init function.
125 */
28ab034a 126void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
92816cc3 127{
64d9b072 128 lttng_pipe_destroy(handle->quit_pipe);
db66e574
JD
129 free(handle);
130}
131
28ab034a
JG
132struct rotation_thread_handle *
133rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
134 struct notification_thread_handle *notification_thread_handle)
db66e574
JD
135{
136 struct rotation_thread_handle *handle;
137
64803277 138 handle = zmalloc<rotation_thread_handle>();
db66e574
JD
139 if (!handle) {
140 goto end;
141 }
142
92816cc3
JG
143 handle->rotation_timer_queue = rotation_timer_queue;
144 handle->notification_thread_handle = notification_thread_handle;
64d9b072
JG
145 handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
146 if (!handle->quit_pipe) {
147 goto error;
148 }
92816cc3
JG
149
150end:
151 return handle;
64d9b072
JG
152error:
153 rotation_thread_handle_destroy(handle);
cd9adb8b 154 return nullptr;
92816cc3
JG
155}
156
157/*
158 * Called with the rotation_thread_timer_queue lock held.
159 * Return true if the same timer job already exists in the queue, false if not.
160 */
28ab034a
JG
161static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
162 enum rotation_thread_job_type job_type,
163 struct ltt_session *session)
92816cc3
JG
164{
165 bool exists = false;
166 struct rotation_thread_job *job;
167
28ab034a 168 cds_list_for_each_entry (job, &queue->list, head) {
c7031a2c 169 if (job->session == session && job->type == job_type) {
92816cc3
JG
170 exists = true;
171 goto end;
db66e574 172 }
db66e574 173 }
92816cc3
JG
174end:
175 return exists;
176}
177
178void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
28ab034a
JG
179 enum rotation_thread_job_type job_type,
180 struct ltt_session *session)
92816cc3
JG
181{
182 int ret;
be2956e7 183 const char dummy = '!';
cd9adb8b 184 struct rotation_thread_job *job = nullptr;
92816cc3
JG
185 const char *job_type_str = get_job_type_str(job_type);
186
187 pthread_mutex_lock(&queue->lock);
c7031a2c 188 if (timer_job_exists(queue, job_type, session)) {
92816cc3
JG
189 /*
190 * This timer job is already pending, we don't need to add
191 * it.
192 */
193 goto end;
db66e574 194 }
92816cc3 195
64803277 196 job = zmalloc<rotation_thread_job>();
92816cc3 197 if (!job) {
c7031a2c 198 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
28ab034a
JG
199 job_type_str,
200 session->name);
92816cc3
JG
201 goto end;
202 }
c7031a2c
JG
203 /* No reason for this to fail as the caller must hold a reference. */
204 (void) session_get(session);
205
206 job->session = session;
92816cc3 207 job->type = job_type;
92816cc3
JG
208 cds_list_add_tail(&job->head, &queue->list);
209
28ab034a 210 ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
92816cc3
JG
211 if (ret < 0) {
212 /*
213 * We do not want to block in the timer handler, the job has
214 * been enqueued in the list, the wakeup pipe is probably full,
215 * the job will be processed when the rotation_thread catches
216 * up.
217 */
942003e5
MJ
218 DIAGNOSTIC_PUSH
219 DIAGNOSTIC_IGNORE_LOGICAL_OP
92816cc3 220 if (errno == EAGAIN || errno == EWOULDBLOCK) {
28ab034a 221 DIAGNOSTIC_POP
92816cc3
JG
222 /*
223 * Not an error, but would be surprising and indicate
224 * that the rotation thread can't keep up with the
225 * current load.
226 */
227 DBG("Wake-up pipe of rotation thread job queue is full");
228 goto end;
db66e574 229 }
c7031a2c 230 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
28ab034a
JG
231 job_type_str,
232 session->name);
92816cc3 233 goto end;
db66e574 234 }
db66e574
JD
235
236end:
92816cc3 237 pthread_mutex_unlock(&queue->lock);
db66e574
JD
238}
239
28ab034a 240static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
db66e574
JD
241{
242 int ret;
243
244 /*
64d9b072
JG
245 * Create pollset with size 3:
246 * - rotation thread quit pipe,
92816cc3 247 * - rotation thread timer queue pipe,
64d9b072 248 * - notification channel sock,
db66e574 249 */
64d9b072
JG
250 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
251 if (ret < 0) {
db66e574
JD
252 goto error;
253 }
64d9b072 254
28ab034a 255 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
64d9b072 256 if (ret < 0) {
bd0514a5 257 ERR("Failed to add quit pipe read fd to poll set");
64d9b072
JG
258 goto error;
259 }
260
28ab034a
JG
261 ret = lttng_poll_add(
262 poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
d086f507 263 if (ret < 0) {
bd0514a5 264 ERR("Failed to add rotate_pending fd to poll set");
d086f507
JD
265 goto error;
266 }
db66e574 267
db66e574
JD
268 return ret;
269error:
270 lttng_poll_clean(poll_set);
271 return ret;
272}
273
28ab034a 274static void fini_thread_state(struct rotation_thread *state)
db66e574
JD
275{
276 lttng_poll_clean(&state->events);
90936dcf
JD
277 if (rotate_notification_channel) {
278 lttng_notification_channel_destroy(rotate_notification_channel);
279 }
db66e574
JD
280}
281
28ab034a 282static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
db66e574
JD
283{
284 int ret;
285
286 memset(state, 0, sizeof(*state));
287 lttng_poll_init(&state->events);
288
289 ret = init_poll_set(&state->events, handle);
290 if (ret) {
bd0514a5 291 ERR("Failed to initialize rotation thread poll set");
db66e574
JD
292 goto end;
293 }
294
28ab034a
JG
295 rotate_notification_channel =
296 lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
90936dcf 297 if (!rotate_notification_channel) {
bd0514a5 298 ERR("Could not create notification channel");
90936dcf
JD
299 ret = -1;
300 goto end;
301 }
28ab034a 302 ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
90936dcf 303 if (ret < 0) {
bd0514a5 304 ERR("Failed to add notification fd to pollset");
90936dcf
JD
305 goto end;
306 }
307
db66e574
JD
308end:
309 return ret;
310}
311
28ab034a
JG
312static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
313 bool *_rotation_completed)
92816cc3 314{
db582e11 315 int ret = 0;
92816cc3
JG
316 struct consumer_socket *socket;
317 struct cds_lfht_iter iter;
d2956687
JG
318 enum consumer_trace_chunk_exists_status exists_status;
319 uint64_t relayd_id;
320 bool chunk_exists_on_peer = false;
321 enum lttng_trace_chunk_status chunk_status;
56047f5a 322 lttng::urcu::read_lock_guard read_lock;
d2956687 323
a0377dfe 324 LTTNG_ASSERT(session->chunk_being_archived);
92816cc3
JG
325
326 /*
327 * Check for a local pending rotation on all consumers (32-bit
328 * user space, 64-bit user space, and kernel).
329 */
92816cc3
JG
330 if (!session->ust_session) {
331 goto skip_ust;
332 }
56047f5a 333
28ab034a
JG
334 cds_lfht_for_each_entry (
335 session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
d2956687 336 relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a
JG
337 -1ULL :
338 session->ust_session->consumer->net_seq_index;
d2956687
JG
339
340 pthread_mutex_lock(socket->lock);
341 ret = consumer_trace_chunk_exists(socket,
28ab034a
JG
342 relayd_id,
343 session->id,
344 session->chunk_being_archived,
345 &exists_status);
d2956687
JG
346 if (ret) {
347 pthread_mutex_unlock(socket->lock);
83ed9e90 348 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3 349 goto end;
db66e574 350 }
d2956687 351
16100d7a 352 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
353 pthread_mutex_unlock(socket->lock);
354 chunk_exists_on_peer = true;
355 goto end;
16100d7a 356 }
d2956687 357 pthread_mutex_unlock(socket->lock);
16100d7a 358 }
db66e574 359
92816cc3
JG
360skip_ust:
361 if (!session->kernel_session) {
362 goto skip_kernel;
db66e574 363 }
28ab034a
JG
364 cds_lfht_for_each_entry (
365 session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
d2956687
JG
366 pthread_mutex_lock(socket->lock);
367 relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a
JG
368 -1ULL :
369 session->kernel_session->consumer->net_seq_index;
d2956687
JG
370
371 ret = consumer_trace_chunk_exists(socket,
28ab034a
JG
372 relayd_id,
373 session->id,
374 session->chunk_being_archived,
375 &exists_status);
d2956687
JG
376 if (ret) {
377 pthread_mutex_unlock(socket->lock);
83ed9e90 378 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3
JG
379 goto end;
380 }
d2956687 381
16100d7a 382 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
383 pthread_mutex_unlock(socket->lock);
384 chunk_exists_on_peer = true;
385 goto end;
16100d7a 386 }
d2956687 387 pthread_mutex_unlock(socket->lock);
92816cc3
JG
388 }
389skip_kernel:
390end:
db66e574 391
d2956687
JG
392 if (!chunk_exists_on_peer) {
393 uint64_t chunk_being_archived_id;
394
28ab034a
JG
395 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
396 &chunk_being_archived_id);
a0377dfe 397 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
28ab034a
JG
398 DBG("Rotation of trace archive %" PRIu64
399 " of session \"%s\" is complete on all consumers",
400 chunk_being_archived_id,
401 session->name);
db66e574 402 }
d2956687 403 *_rotation_completed = !chunk_exists_on_peer;
92816cc3 404 if (ret) {
28ab034a 405 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
2961f09e 406 if (ret) {
28ab034a 407 ERR("Failed to reset rotation state of session \"%s\"", session->name);
2961f09e 408 }
db66e574 409 }
db66e574
JD
410}
411
d88744a4 412/*
92816cc3 413 * Check if the last rotation was completed, called with session lock held.
d2956687
JG
414 * Should only return non-zero in the event of a fatal error. Doing so will
415 * shutdown the thread.
d88744a4 416 */
28ab034a
JG
417static int
418check_session_rotation_pending(struct ltt_session *session,
419 struct notification_thread_handle *notification_thread_handle)
d88744a4
JD
420{
421 int ret;
92816cc3 422 struct lttng_trace_archive_location *location;
d2956687
JG
423 enum lttng_trace_chunk_status chunk_status;
424 bool rotation_completed = false;
425 const char *archived_chunk_name;
426 uint64_t chunk_being_archived_id;
427
dc1d5967
FD
428 if (!session->chunk_being_archived) {
429 ret = 0;
430 goto end;
431 }
432
28ab034a
JG
433 chunk_status =
434 lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
a0377dfe 435 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d88744a4 436
bd0514a5 437 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
28ab034a
JG
438 session->name,
439 chunk_being_archived_id);
d2956687 440
faf1bdcf
JG
441 /*
442 * The rotation-pending check timer of a session is launched in
443 * one-shot mode. If the rotation is incomplete, the rotation
444 * thread will re-enable the pending-check timer.
445 *
446 * The timer thread can't stop the timer itself since it is involved
447 * in the check for the timer's quiescence.
448 */
449 ret = timer_session_rotation_pending_check_stop(session);
450 if (ret) {
6ae1bf46 451 goto check_ongoing_rotation;
faf1bdcf
JG
452 }
453
28ab034a
JG
454 check_session_rotation_pending_on_consumers(session, &rotation_completed);
455 if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
6ae1bf46 456 goto check_ongoing_rotation;
92816cc3
JG
457 }
458
92816cc3
JG
459 /*
460 * Now we can clear the "ONGOING" state in the session. New
461 * rotations can start now.
462 */
28ab034a 463 chunk_status = lttng_trace_chunk_get_name(
cd9adb8b 464 session->chunk_being_archived, &archived_chunk_name, nullptr);
a0377dfe 465 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687
JG
466 free(session->last_archived_chunk_name);
467 session->last_archived_chunk_name = strdup(archived_chunk_name);
468 if (!session->last_archived_chunk_name) {
469 PERROR("Failed to duplicate archived chunk name");
470 }
471 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
92816cc3 472
7fdbed1c
JG
473 if (!session->quiet_rotation) {
474 location = session_get_trace_archive_location(session);
7fdbed1c 475 ret = notification_thread_command_session_rotation_completed(
28ab034a
JG
476 notification_thread_handle,
477 session->id,
478 session->last_archived_chunk_id.value,
479 location);
d3740619 480 lttng_trace_archive_location_put(location);
7fdbed1c 481 if (ret != LTTNG_OK) {
bd0514a5 482 ERR("Failed to notify notification thread of completed rotation for session %s",
28ab034a 483 session->name);
7fdbed1c 484 }
92816cc3
JG
485 }
486
487 ret = 0;
6ae1bf46 488check_ongoing_rotation:
92816cc3 489 if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
28ab034a
JG
490 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
491 &chunk_being_archived_id);
a0377dfe 492 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 493
bd0514a5 494 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
28ab034a
JG
495 chunk_being_archived_id,
496 session->name);
92816cc3 497 ret = timer_session_rotation_pending_check_start(session,
28ab034a 498 DEFAULT_ROTATE_PENDING_TIMER);
92816cc3 499 if (ret) {
d2956687 500 ERR("Failed to re-enable rotation pending timer");
92816cc3
JG
501 ret = -1;
502 goto end;
503 }
504 }
505
6ae1bf46 506end:
d88744a4
JD
507 return ret;
508}
509
ed1e52a3 510/* Call with the session and session_list locks held. */
28ab034a 511static int launch_session_rotation(struct ltt_session *session)
259c2674
JD
512{
513 int ret;
92816cc3 514 struct lttng_rotate_session_return rotation_return;
259c2674 515
28ab034a 516 DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
259c2674 517
28ab034a
JG
518 ret = cmd_rotate_session(
519 session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
92816cc3 520 if (ret == LTTNG_OK) {
bd0514a5 521 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
28ab034a 522 session->name);
92816cc3
JG
523 } else {
524 /* Don't consider errors as fatal. */
bd0514a5 525 DBG("Scheduled time-based rotation aborted for session %s: %s",
28ab034a
JG
526 session->name,
527 lttng_strerror(ret));
259c2674 528 }
92816cc3
JG
529 return 0;
530}
259c2674 531
28ab034a
JG
532static int run_job(struct rotation_thread_job *job,
533 struct ltt_session *session,
534 struct notification_thread_handle *notification_thread_handle)
92816cc3
JG
535{
536 int ret;
259c2674 537
92816cc3
JG
538 switch (job->type) {
539 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
16100d7a 540 ret = launch_session_rotation(session);
92816cc3
JG
541 break;
542 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
28ab034a 543 ret = check_session_rotation_pending(session, notification_thread_handle);
92816cc3
JG
544 break;
545 default:
546 abort();
259c2674 547 }
259c2674
JD
548 return ret;
549}
550
28ab034a
JG
551static int handle_job_queue(struct rotation_thread_handle *handle,
552 struct rotation_thread *state __attribute__((unused)),
553 struct rotation_thread_timer_queue *queue)
d88744a4
JD
554{
555 int ret = 0;
d88744a4
JD
556
557 for (;;) {
e32d7f27 558 struct ltt_session *session;
92816cc3 559 struct rotation_thread_job *job;
d88744a4 560
92816cc3 561 /* Take the queue lock only to pop an element from the list. */
d88744a4
JD
562 pthread_mutex_lock(&queue->lock);
563 if (cds_list_empty(&queue->list)) {
564 pthread_mutex_unlock(&queue->lock);
565 break;
566 }
28ab034a 567 job = cds_list_first_entry(&queue->list, typeof(*job), head);
92816cc3 568 cds_list_del(&job->head);
d88744a4
JD
569 pthread_mutex_unlock(&queue->lock);
570
d88744a4 571 session_lock_list();
c7031a2c 572 session = job->session;
d88744a4 573 if (!session) {
28ab034a 574 DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
d88744a4 575 /*
92816cc3
JG
576 * This is a non-fatal error, and we cannot report it to
577 * the user (timer), so just print the error and
578 * continue the processing.
579 *
580 * While the timer thread will purge pending signals for
581 * a session on the session's destruction, it is
582 * possible for a job targeting that session to have
583 * already been queued before it was destroyed.
d88744a4 584 */
92816cc3 585 free(job);
e32d7f27 586 session_put(session);
5b8139c6 587 session_unlock_list();
d88744a4
JD
588 continue;
589 }
590
d88744a4 591 session_lock(session);
16100d7a 592 ret = run_job(job, session, handle->notification_thread_handle);
d88744a4 593 session_unlock(session);
faf1bdcf 594 /* Release reference held by the job. */
e32d7f27 595 session_put(session);
ed1e52a3 596 session_unlock_list();
92816cc3 597 free(job);
d88744a4 598 if (ret) {
d88744a4
JD
599 goto end;
600 }
601 }
602
603 ret = 0;
604
605end:
606 return ret;
607}
608
28ab034a
JG
609static int handle_condition(const struct lttng_notification *notification,
610 struct notification_thread_handle *notification_thread_handle)
90936dcf
JD
611{
612 int ret = 0;
cd9adb8b 613 const char *condition_session_name = nullptr;
90936dcf
JD
614 enum lttng_condition_type condition_type;
615 enum lttng_condition_status condition_status;
616 enum lttng_evaluation_status evaluation_status;
617 uint64_t consumed;
618 struct ltt_session *session;
c08136a3 619 const struct lttng_condition *condition =
28ab034a 620 lttng_notification_get_const_condition(notification);
c08136a3 621 const struct lttng_evaluation *evaluation =
28ab034a 622 lttng_notification_get_const_evaluation(notification);
90936dcf
JD
623
624 condition_type = lttng_condition_get_type(condition);
625
626 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
627 ret = -1;
bd0514a5 628 ERR("Condition type and session usage type are not the same");
90936dcf
JD
629 goto end;
630 }
631
632 /* Fetch info to test */
633 condition_status = lttng_condition_session_consumed_size_get_session_name(
28ab034a 634 condition, &condition_session_name);
90936dcf 635 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
bd0514a5 636 ERR("Session name could not be fetched");
90936dcf
JD
637 ret = -1;
638 goto end;
639 }
28ab034a
JG
640 evaluation_status =
641 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
90936dcf 642 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
bd0514a5 643 ERR("Failed to get evaluation");
90936dcf
JD
644 ret = -1;
645 goto end;
646 }
647
648 session_lock_list();
649 session = session_find_by_name(condition_session_name);
650 if (!session) {
eb2827a4 651 DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
28ab034a
JG
652 lttng_condition_type_str(condition_type),
653 condition_session_name);
eb2827a4
JG
654 /*
655 * Not a fatal error: a session can be destroyed before we get
656 * the chance to handle the notification.
657 */
658 ret = 0;
659 session_unlock_list();
90936dcf
JD
660 goto end;
661 }
662 session_lock(session);
90936dcf 663
c08136a3 664 if (!lttng_trigger_is_equal(session->rotate_trigger,
28ab034a 665 lttng_notification_get_const_trigger(notification))) {
c08136a3
JG
666 /* Notification does not originate from our rotation trigger. */
667 ret = 0;
668 goto end_unlock;
669 }
670
28ab034a 671 ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
90936dcf 672 if (ret) {
490b3229 673 goto end_unlock;
90936dcf
JD
674 }
675
2545db87 676 ret = cmd_rotate_session(
cd9adb8b 677 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
2545db87
JG
678 switch (ret) {
679 case LTTNG_OK:
680 break;
681 case -LTTNG_ERR_ROTATION_PENDING:
90936dcf 682 DBG("Rotate already pending, subscribe to the next threshold value");
2545db87
JG
683 break;
684 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
685 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
686 break;
687 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
688 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
689 break;
690 default:
691 ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
90936dcf
JD
692 ret = -1;
693 goto end_unlock;
694 }
2545db87
JG
695
696 ret = subscribe_session_consumed_size_rotation(
28ab034a 697 session, consumed + session->rotate_size, notification_thread_handle);
90936dcf 698 if (ret) {
bd0514a5 699 ERR("Failed to subscribe to session consumed size condition");
90936dcf
JD
700 goto end_unlock;
701 }
702 ret = 0;
703
704end_unlock:
705 session_unlock(session);
e32d7f27 706 session_put(session);
5b8139c6 707 session_unlock_list();
90936dcf
JD
708end:
709 return ret;
710}
711
28ab034a
JG
712static int handle_notification_channel(int fd __attribute__((unused)),
713 struct rotation_thread_handle *handle,
714 struct rotation_thread *state __attribute__((unused)))
90936dcf
JD
715{
716 int ret;
d73ee93f 717 bool notification_pending;
cd9adb8b 718 struct lttng_notification *notification = nullptr;
90936dcf 719 enum lttng_notification_channel_status status;
90936dcf 720
28ab034a
JG
721 status = lttng_notification_channel_has_pending_notification(rotate_notification_channel,
722 &notification_pending);
d73ee93f 723 if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
bd0514a5 724 ERR("Error occurred while checking for pending notification");
d73ee93f
JG
725 ret = -1;
726 goto end;
727 }
728
729 if (!notification_pending) {
730 ret = 0;
731 goto end;
732 }
733
90936dcf 734 /* Receive the next notification. */
28ab034a
JG
735 status = lttng_notification_channel_get_next_notification(rotate_notification_channel,
736 &notification);
90936dcf
JD
737
738 switch (status) {
739 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
740 break;
741 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
742 /* Not an error, we will wait for the next one */
743 ret = 0;
28ab034a
JG
744 goto end;
745 ;
90936dcf
JD
746 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
747 ERR("Notification channel was closed");
748 ret = -1;
749 goto end;
750 default:
751 /* Unhandled conditions / errors. */
752 ERR("Unknown notification channel status");
753 ret = -1;
754 goto end;
755 }
756
28ab034a 757 ret = handle_condition(notification, handle->notification_thread_handle);
90936dcf
JD
758
759end:
760 lttng_notification_destroy(notification);
90936dcf
JD
761 return ret;
762}
763
28ab034a 764static void *thread_rotation(void *data)
db66e574
JD
765{
766 int ret;
7966af57 767 struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
92816cc3 768 struct rotation_thread thread;
87380d40 769 int queue_pipe_fd;
db66e574 770
bd0514a5 771 DBG("Started rotation thread");
f620cc28
JG
772 rcu_register_thread();
773 rcu_thread_online();
412d7227 774 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
f620cc28 775 health_code_update();
db66e574
JD
776
777 if (!handle) {
bd0514a5 778 ERR("Invalid thread context provided");
db66e574
JD
779 goto end;
780 }
781
28ab034a 782 queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
db66e574 783
92816cc3 784 ret = init_thread_state(handle, &thread);
db66e574 785 if (ret) {
f5f8e5cd 786 goto error;
db66e574
JD
787 }
788
db66e574
JD
789 while (true) {
790 int fd_count, i;
791
792 health_poll_entry();
bd0514a5 793 DBG("Entering poll wait");
92816cc3 794 ret = lttng_poll_wait(&thread.events, -1);
bd0514a5 795 DBG("Poll wait returned (%i)", ret);
db66e574
JD
796 health_poll_exit();
797 if (ret < 0) {
798 /*
799 * Restart interrupted system call.
800 */
801 if (errno == EINTR) {
802 continue;
803 }
bd0514a5 804 ERR("Error encountered during lttng_poll_wait (%i)", ret);
db66e574
JD
805 goto error;
806 }
807
808 fd_count = ret;
809 for (i = 0; i < fd_count; i++) {
92816cc3
JG
810 int fd = LTTNG_POLL_GETFD(&thread.events, i);
811 uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
db66e574 812
28ab034a 813 DBG("Handling fd (%i) activity (%u)", fd, revents);
db66e574 814
92816cc3 815 if (revents & LPOLLERR) {
bd0514a5 816 ERR("Polling returned an error on fd %i", fd);
92816cc3
JG
817 goto error;
818 }
819
85e17b27 820 if (fd == rotate_notification_channel->socket) {
28ab034a 821 ret = handle_notification_channel(fd, handle, &thread);
85e17b27 822 if (ret) {
bd0514a5 823 ERR("Error occurred while handling activity on notification channel socket");
85e17b27
JG
824 goto error;
825 }
826 } else {
827 /* Job queue or quit pipe activity. */
85e17b27
JG
828
829 /*
830 * The job queue is serviced if there is
831 * activity on the quit pipe to ensure it is
832 * flushed and all references held in the queue
833 * are released.
834 */
28ab034a
JG
835 ret = handle_job_queue(
836 handle, &thread, handle->rotation_timer_queue);
d88744a4 837 if (ret) {
bd0514a5 838 ERR("Failed to handle rotation timer pipe event");
d88744a4
JD
839 goto error;
840 }
85e17b27 841
64d9b072
JG
842 if (fd == queue_pipe_fd) {
843 char buf;
844
845 ret = lttng_read(fd, &buf, 1);
846 if (ret != 1) {
28ab034a
JG
847 ERR("Failed to read from wakeup pipe (fd = %i)",
848 fd);
64d9b072
JG
849 goto error;
850 }
851 } else {
bd0514a5 852 DBG("Quit pipe activity");
85e17b27 853 goto exit;
90936dcf 854 }
db66e574
JD
855 }
856 }
857 }
858exit:
859error:
bd0514a5 860 DBG("Thread exit");
92816cc3 861 fini_thread_state(&thread);
f620cc28 862end:
412d7227 863 health_unregister(the_health_sessiond);
03732c32
JG
864 rcu_thread_offline();
865 rcu_unregister_thread();
cd9adb8b 866 return nullptr;
db66e574 867}
64d9b072 868
28ab034a 869static bool shutdown_rotation_thread(void *thread_data)
64d9b072 870{
7966af57 871 struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
64d9b072
JG
872 const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
873
874 return notify_thread_pipe(write_fd) == 1;
875}
876
877bool launch_rotation_thread(struct rotation_thread_handle *handle)
878{
879 struct lttng_thread *thread;
880
28ab034a 881 thread = lttng_thread_create(
cd9adb8b 882 "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
64d9b072
JG
883 if (!thread) {
884 goto error;
885 }
886 lttng_thread_put(thread);
887 return true;
888error:
889 return false;
890}
This page took 0.108405 seconds and 4 git commands to generate.