Fix: sessiond: size-based notification occasionally not triggered
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
CommitLineData
db66e574 1/*
ab5be9fa
MJ
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
db66e574 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
db66e574 6 *
db66e574
JD
7 */
8
9#define _LGPL_SOURCE
28ab034a
JG
10#include "cmd.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "notification-thread-commands.hpp"
14#include "rotate.hpp"
15#include "rotation-thread.hpp"
16#include "session.hpp"
17#include "thread.hpp"
18#include "timer.hpp"
19#include "utils.hpp"
20
21#include <common/align.hpp>
c9e313bc
SM
22#include <common/config/session-config.hpp>
23#include <common/defaults.hpp>
28ab034a 24#include <common/error.hpp>
c9e313bc 25#include <common/futex.hpp>
c9e313bc 26#include <common/hashtable/utils.hpp>
c9e313bc 27#include <common/kernel-ctl/kernel-ctl.hpp>
28ab034a 28#include <common/time.hpp>
56047f5a 29#include <common/urcu.hpp>
28ab034a
JG
30#include <common/utils.hpp>
31
c9e313bc 32#include <lttng/condition/condition-internal.hpp>
28ab034a
JG
33#include <lttng/location-internal.hpp>
34#include <lttng/notification/channel-internal.hpp>
c08136a3 35#include <lttng/notification/notification-internal.hpp>
28ab034a
JG
36#include <lttng/rotate-internal.hpp>
37#include <lttng/trigger/trigger.h>
db66e574 38
28ab034a
JG
39#include <inttypes.h>
40#include <signal.h>
dc65dda3 41#include <sys/eventfd.h>
28ab034a
JG
42#include <sys/stat.h>
43#include <time.h>
db66e574
JD
44#include <urcu.h>
45#include <urcu/list.h>
db66e574 46
cd9adb8b 47struct lttng_notification_channel *rotate_notification_channel = nullptr;
dc65dda3
JG
48/*
49 * This eventfd is used to wake-up the rotation thread whenever a command
50 * completes on the notification channel. This ensures that any notification
51 * that was queued while waiting for a reply to the command is eventually
52 * consumed.
53 */
54int rotate_notification_channel_subscription_change_eventfd = -1;
90936dcf 55
92816cc3 56struct rotation_thread {
db66e574
JD
57 struct lttng_poll_event events;
58};
59
92816cc3
JG
60/*
61 * The timer thread enqueues jobs and wakes up the rotation thread.
62 * When the rotation thread wakes up, it empties the queue.
63 */
64struct rotation_thread_timer_queue {
65 struct lttng_pipe *event_pipe;
66 struct cds_list_head list;
67 pthread_mutex_t lock;
68};
69
70struct rotation_thread_handle {
92816cc3
JG
71 struct rotation_thread_timer_queue *rotation_timer_queue;
72 /* Access to the notification thread cmd_queue */
73 struct notification_thread_handle *notification_thread_handle;
64d9b072
JG
74 /* Thread-specific quit pipe. */
75 struct lttng_pipe *quit_pipe;
92816cc3
JG
76};
77
f1494934
JG
78namespace {
79struct rotation_thread_job {
80 enum rotation_thread_job_type type;
81 struct ltt_session *session;
82 /* List member in struct rotation_thread_timer_queue. */
83 struct cds_list_head head;
84};
85} /* namespace */
86
28ab034a 87static const char *get_job_type_str(enum rotation_thread_job_type job_type)
db66e574 88{
92816cc3
JG
89 switch (job_type) {
90 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
91 return "CHECK_PENDING_ROTATION";
92 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
93 return "SCHEDULED_ROTATION";
94 default:
95 abort();
96 }
db66e574
JD
97}
98
cd9adb8b 99struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
db66e574 100{
cd9adb8b 101 struct rotation_thread_timer_queue *queue = nullptr;
db66e574 102
64803277 103 queue = zmalloc<rotation_thread_timer_queue>();
92816cc3
JG
104 if (!queue) {
105 PERROR("Failed to allocate timer rotate queue");
106 goto end;
107 }
db66e574 108
92816cc3
JG
109 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
110 CDS_INIT_LIST_HEAD(&queue->list);
cd9adb8b 111 pthread_mutex_init(&queue->lock, nullptr);
92816cc3
JG
112end:
113 return queue;
db66e574
JD
114}
115
28ab034a 116void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
db66e574 117{
92816cc3
JG
118 if (!queue) {
119 return;
db66e574
JD
120 }
121
92816cc3
JG
122 lttng_pipe_destroy(queue->event_pipe);
123
124 pthread_mutex_lock(&queue->lock);
a0377dfe 125 LTTNG_ASSERT(cds_list_empty(&queue->list));
92816cc3
JG
126 pthread_mutex_unlock(&queue->lock);
127 pthread_mutex_destroy(&queue->lock);
128 free(queue);
129}
db66e574 130
92816cc3
JG
131/*
132 * Destroy the thread data previously created by the init function.
133 */
28ab034a 134void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
92816cc3 135{
64d9b072 136 lttng_pipe_destroy(handle->quit_pipe);
db66e574
JD
137 free(handle);
138}
139
28ab034a
JG
140struct rotation_thread_handle *
141rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
142 struct notification_thread_handle *notification_thread_handle)
db66e574
JD
143{
144 struct rotation_thread_handle *handle;
145
64803277 146 handle = zmalloc<rotation_thread_handle>();
db66e574
JD
147 if (!handle) {
148 goto end;
149 }
150
92816cc3
JG
151 handle->rotation_timer_queue = rotation_timer_queue;
152 handle->notification_thread_handle = notification_thread_handle;
64d9b072
JG
153 handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
154 if (!handle->quit_pipe) {
155 goto error;
156 }
92816cc3
JG
157
158end:
159 return handle;
64d9b072
JG
160error:
161 rotation_thread_handle_destroy(handle);
cd9adb8b 162 return nullptr;
92816cc3
JG
163}
164
165/*
166 * Called with the rotation_thread_timer_queue lock held.
167 * Return true if the same timer job already exists in the queue, false if not.
168 */
28ab034a
JG
169static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
170 enum rotation_thread_job_type job_type,
171 struct ltt_session *session)
92816cc3
JG
172{
173 bool exists = false;
174 struct rotation_thread_job *job;
175
28ab034a 176 cds_list_for_each_entry (job, &queue->list, head) {
c7031a2c 177 if (job->session == session && job->type == job_type) {
92816cc3
JG
178 exists = true;
179 goto end;
db66e574 180 }
db66e574 181 }
92816cc3
JG
182end:
183 return exists;
184}
185
186void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
28ab034a
JG
187 enum rotation_thread_job_type job_type,
188 struct ltt_session *session)
92816cc3
JG
189{
190 int ret;
be2956e7 191 const char dummy = '!';
cd9adb8b 192 struct rotation_thread_job *job = nullptr;
92816cc3
JG
193 const char *job_type_str = get_job_type_str(job_type);
194
195 pthread_mutex_lock(&queue->lock);
c7031a2c 196 if (timer_job_exists(queue, job_type, session)) {
92816cc3
JG
197 /*
198 * This timer job is already pending, we don't need to add
199 * it.
200 */
201 goto end;
db66e574 202 }
92816cc3 203
64803277 204 job = zmalloc<rotation_thread_job>();
92816cc3 205 if (!job) {
c7031a2c 206 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
28ab034a
JG
207 job_type_str,
208 session->name);
92816cc3
JG
209 goto end;
210 }
c7031a2c
JG
211 /* No reason for this to fail as the caller must hold a reference. */
212 (void) session_get(session);
213
214 job->session = session;
92816cc3 215 job->type = job_type;
92816cc3
JG
216 cds_list_add_tail(&job->head, &queue->list);
217
28ab034a 218 ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
92816cc3
JG
219 if (ret < 0) {
220 /*
221 * We do not want to block in the timer handler, the job has
222 * been enqueued in the list, the wakeup pipe is probably full,
223 * the job will be processed when the rotation_thread catches
224 * up.
225 */
942003e5
MJ
226 DIAGNOSTIC_PUSH
227 DIAGNOSTIC_IGNORE_LOGICAL_OP
92816cc3 228 if (errno == EAGAIN || errno == EWOULDBLOCK) {
28ab034a 229 DIAGNOSTIC_POP
92816cc3
JG
230 /*
231 * Not an error, but would be surprising and indicate
232 * that the rotation thread can't keep up with the
233 * current load.
234 */
235 DBG("Wake-up pipe of rotation thread job queue is full");
236 goto end;
db66e574 237 }
c7031a2c 238 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
28ab034a
JG
239 job_type_str,
240 session->name);
92816cc3 241 goto end;
db66e574 242 }
db66e574
JD
243
244end:
92816cc3 245 pthread_mutex_unlock(&queue->lock);
db66e574
JD
246}
247
28ab034a 248static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
db66e574
JD
249{
250 int ret;
251
252 /*
64d9b072
JG
253 * Create pollset with size 3:
254 * - rotation thread quit pipe,
92816cc3 255 * - rotation thread timer queue pipe,
64d9b072 256 * - notification channel sock,
db66e574 257 */
64d9b072
JG
258 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
259 if (ret < 0) {
db66e574
JD
260 goto error;
261 }
64d9b072 262
28ab034a 263 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
64d9b072 264 if (ret < 0) {
bd0514a5 265 ERR("Failed to add quit pipe read fd to poll set");
64d9b072
JG
266 goto error;
267 }
268
28ab034a
JG
269 ret = lttng_poll_add(
270 poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
d086f507 271 if (ret < 0) {
bd0514a5 272 ERR("Failed to add rotate_pending fd to poll set");
d086f507
JD
273 goto error;
274 }
db66e574 275
db66e574
JD
276 return ret;
277error:
278 lttng_poll_clean(poll_set);
279 return ret;
280}
281
28ab034a 282static void fini_thread_state(struct rotation_thread *state)
db66e574
JD
283{
284 lttng_poll_clean(&state->events);
90936dcf
JD
285 if (rotate_notification_channel) {
286 lttng_notification_channel_destroy(rotate_notification_channel);
287 }
dc65dda3
JG
288
289 if (rotate_notification_channel_subscription_change_eventfd >= 0) {
290 const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
291
292 if (close_ret) {
293 PERROR("Failed to close rotation thread notification channel subscription change eventfd");
294 }
295 }
db66e574
JD
296}
297
28ab034a 298static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
db66e574
JD
299{
300 int ret;
301
302 memset(state, 0, sizeof(*state));
303 lttng_poll_init(&state->events);
304
305 ret = init_poll_set(&state->events, handle);
306 if (ret) {
bd0514a5 307 ERR("Failed to initialize rotation thread poll set");
db66e574
JD
308 goto end;
309 }
310
28ab034a
JG
311 rotate_notification_channel =
312 lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
90936dcf 313 if (!rotate_notification_channel) {
bd0514a5 314 ERR("Could not create notification channel");
90936dcf
JD
315 ret = -1;
316 goto end;
317 }
28ab034a 318 ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
90936dcf 319 if (ret < 0) {
bd0514a5 320 ERR("Failed to add notification fd to pollset");
90936dcf
JD
321 goto end;
322 }
323
dc65dda3
JG
324 rotate_notification_channel_subscription_change_eventfd =
325 eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
326 if (rotate_notification_channel_subscription_change_eventfd < 0) {
327 PERROR("Failed to create rotation thread notification channel subscription change eventfd");
328 ret = -1;
329 goto end;
330 }
331 ret = lttng_poll_add(
332 &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
333 if (ret < 0) {
334 ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
335 goto end;
336 }
337
db66e574
JD
338end:
339 return ret;
340}
341
28ab034a
JG
342static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
343 bool *_rotation_completed)
92816cc3 344{
db582e11 345 int ret = 0;
92816cc3
JG
346 struct consumer_socket *socket;
347 struct cds_lfht_iter iter;
d2956687
JG
348 enum consumer_trace_chunk_exists_status exists_status;
349 uint64_t relayd_id;
350 bool chunk_exists_on_peer = false;
351 enum lttng_trace_chunk_status chunk_status;
56047f5a 352 lttng::urcu::read_lock_guard read_lock;
d2956687 353
a0377dfe 354 LTTNG_ASSERT(session->chunk_being_archived);
92816cc3
JG
355
356 /*
357 * Check for a local pending rotation on all consumers (32-bit
358 * user space, 64-bit user space, and kernel).
359 */
92816cc3
JG
360 if (!session->ust_session) {
361 goto skip_ust;
362 }
56047f5a 363
28ab034a
JG
364 cds_lfht_for_each_entry (
365 session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
d2956687 366 relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a
JG
367 -1ULL :
368 session->ust_session->consumer->net_seq_index;
d2956687
JG
369
370 pthread_mutex_lock(socket->lock);
371 ret = consumer_trace_chunk_exists(socket,
28ab034a
JG
372 relayd_id,
373 session->id,
374 session->chunk_being_archived,
375 &exists_status);
d2956687
JG
376 if (ret) {
377 pthread_mutex_unlock(socket->lock);
83ed9e90 378 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3 379 goto end;
db66e574 380 }
d2956687 381
16100d7a 382 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
383 pthread_mutex_unlock(socket->lock);
384 chunk_exists_on_peer = true;
385 goto end;
16100d7a 386 }
d2956687 387 pthread_mutex_unlock(socket->lock);
16100d7a 388 }
db66e574 389
92816cc3
JG
390skip_ust:
391 if (!session->kernel_session) {
392 goto skip_kernel;
db66e574 393 }
28ab034a
JG
394 cds_lfht_for_each_entry (
395 session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
d2956687
JG
396 pthread_mutex_lock(socket->lock);
397 relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a
JG
398 -1ULL :
399 session->kernel_session->consumer->net_seq_index;
d2956687
JG
400
401 ret = consumer_trace_chunk_exists(socket,
28ab034a
JG
402 relayd_id,
403 session->id,
404 session->chunk_being_archived,
405 &exists_status);
d2956687
JG
406 if (ret) {
407 pthread_mutex_unlock(socket->lock);
83ed9e90 408 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3
JG
409 goto end;
410 }
d2956687 411
16100d7a 412 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
413 pthread_mutex_unlock(socket->lock);
414 chunk_exists_on_peer = true;
415 goto end;
16100d7a 416 }
d2956687 417 pthread_mutex_unlock(socket->lock);
92816cc3
JG
418 }
419skip_kernel:
420end:
db66e574 421
d2956687
JG
422 if (!chunk_exists_on_peer) {
423 uint64_t chunk_being_archived_id;
424
28ab034a
JG
425 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
426 &chunk_being_archived_id);
a0377dfe 427 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
28ab034a
JG
428 DBG("Rotation of trace archive %" PRIu64
429 " of session \"%s\" is complete on all consumers",
430 chunk_being_archived_id,
431 session->name);
db66e574 432 }
d2956687 433 *_rotation_completed = !chunk_exists_on_peer;
92816cc3 434 if (ret) {
28ab034a 435 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
2961f09e 436 if (ret) {
28ab034a 437 ERR("Failed to reset rotation state of session \"%s\"", session->name);
2961f09e 438 }
db66e574 439 }
db66e574
JD
440}
441
d88744a4 442/*
92816cc3 443 * Check if the last rotation was completed, called with session lock held.
d2956687
JG
444 * Should only return non-zero in the event of a fatal error. Doing so will
445 * shutdown the thread.
d88744a4 446 */
28ab034a
JG
447static int
448check_session_rotation_pending(struct ltt_session *session,
449 struct notification_thread_handle *notification_thread_handle)
d88744a4
JD
450{
451 int ret;
92816cc3 452 struct lttng_trace_archive_location *location;
d2956687
JG
453 enum lttng_trace_chunk_status chunk_status;
454 bool rotation_completed = false;
455 const char *archived_chunk_name;
456 uint64_t chunk_being_archived_id;
457
dc1d5967
FD
458 if (!session->chunk_being_archived) {
459 ret = 0;
460 goto end;
461 }
462
28ab034a
JG
463 chunk_status =
464 lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
a0377dfe 465 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d88744a4 466
bd0514a5 467 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
28ab034a
JG
468 session->name,
469 chunk_being_archived_id);
d2956687 470
faf1bdcf
JG
471 /*
472 * The rotation-pending check timer of a session is launched in
473 * one-shot mode. If the rotation is incomplete, the rotation
474 * thread will re-enable the pending-check timer.
475 *
476 * The timer thread can't stop the timer itself since it is involved
477 * in the check for the timer's quiescence.
478 */
479 ret = timer_session_rotation_pending_check_stop(session);
480 if (ret) {
6ae1bf46 481 goto check_ongoing_rotation;
faf1bdcf
JG
482 }
483
28ab034a
JG
484 check_session_rotation_pending_on_consumers(session, &rotation_completed);
485 if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
6ae1bf46 486 goto check_ongoing_rotation;
92816cc3
JG
487 }
488
92816cc3
JG
489 /*
490 * Now we can clear the "ONGOING" state in the session. New
491 * rotations can start now.
492 */
28ab034a 493 chunk_status = lttng_trace_chunk_get_name(
cd9adb8b 494 session->chunk_being_archived, &archived_chunk_name, nullptr);
a0377dfe 495 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687
JG
496 free(session->last_archived_chunk_name);
497 session->last_archived_chunk_name = strdup(archived_chunk_name);
498 if (!session->last_archived_chunk_name) {
499 PERROR("Failed to duplicate archived chunk name");
500 }
501 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
92816cc3 502
7fdbed1c
JG
503 if (!session->quiet_rotation) {
504 location = session_get_trace_archive_location(session);
7fdbed1c 505 ret = notification_thread_command_session_rotation_completed(
28ab034a
JG
506 notification_thread_handle,
507 session->id,
508 session->last_archived_chunk_id.value,
509 location);
d3740619 510 lttng_trace_archive_location_put(location);
7fdbed1c 511 if (ret != LTTNG_OK) {
bd0514a5 512 ERR("Failed to notify notification thread of completed rotation for session %s",
28ab034a 513 session->name);
7fdbed1c 514 }
92816cc3
JG
515 }
516
517 ret = 0;
6ae1bf46 518check_ongoing_rotation:
92816cc3 519 if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
28ab034a
JG
520 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
521 &chunk_being_archived_id);
a0377dfe 522 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 523
bd0514a5 524 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
28ab034a
JG
525 chunk_being_archived_id,
526 session->name);
92816cc3 527 ret = timer_session_rotation_pending_check_start(session,
28ab034a 528 DEFAULT_ROTATE_PENDING_TIMER);
92816cc3 529 if (ret) {
d2956687 530 ERR("Failed to re-enable rotation pending timer");
92816cc3
JG
531 ret = -1;
532 goto end;
533 }
534 }
535
6ae1bf46 536end:
d88744a4
JD
537 return ret;
538}
539
ed1e52a3 540/* Call with the session and session_list locks held. */
28ab034a 541static int launch_session_rotation(struct ltt_session *session)
259c2674
JD
542{
543 int ret;
92816cc3 544 struct lttng_rotate_session_return rotation_return;
259c2674 545
28ab034a 546 DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
259c2674 547
28ab034a
JG
548 ret = cmd_rotate_session(
549 session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
92816cc3 550 if (ret == LTTNG_OK) {
bd0514a5 551 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
28ab034a 552 session->name);
92816cc3
JG
553 } else {
554 /* Don't consider errors as fatal. */
bd0514a5 555 DBG("Scheduled time-based rotation aborted for session %s: %s",
28ab034a
JG
556 session->name,
557 lttng_strerror(ret));
259c2674 558 }
92816cc3
JG
559 return 0;
560}
259c2674 561
28ab034a
JG
562static int run_job(struct rotation_thread_job *job,
563 struct ltt_session *session,
564 struct notification_thread_handle *notification_thread_handle)
92816cc3
JG
565{
566 int ret;
259c2674 567
92816cc3
JG
568 switch (job->type) {
569 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
16100d7a 570 ret = launch_session_rotation(session);
92816cc3
JG
571 break;
572 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
28ab034a 573 ret = check_session_rotation_pending(session, notification_thread_handle);
92816cc3
JG
574 break;
575 default:
576 abort();
259c2674 577 }
259c2674
JD
578 return ret;
579}
580
28ab034a
JG
581static int handle_job_queue(struct rotation_thread_handle *handle,
582 struct rotation_thread *state __attribute__((unused)),
583 struct rotation_thread_timer_queue *queue)
d88744a4
JD
584{
585 int ret = 0;
d88744a4
JD
586
587 for (;;) {
e32d7f27 588 struct ltt_session *session;
92816cc3 589 struct rotation_thread_job *job;
d88744a4 590
92816cc3 591 /* Take the queue lock only to pop an element from the list. */
d88744a4
JD
592 pthread_mutex_lock(&queue->lock);
593 if (cds_list_empty(&queue->list)) {
594 pthread_mutex_unlock(&queue->lock);
595 break;
596 }
28ab034a 597 job = cds_list_first_entry(&queue->list, typeof(*job), head);
92816cc3 598 cds_list_del(&job->head);
d88744a4
JD
599 pthread_mutex_unlock(&queue->lock);
600
d88744a4 601 session_lock_list();
c7031a2c 602 session = job->session;
d88744a4 603 if (!session) {
28ab034a 604 DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
d88744a4 605 /*
92816cc3
JG
606 * This is a non-fatal error, and we cannot report it to
607 * the user (timer), so just print the error and
608 * continue the processing.
609 *
610 * While the timer thread will purge pending signals for
611 * a session on the session's destruction, it is
612 * possible for a job targeting that session to have
613 * already been queued before it was destroyed.
d88744a4 614 */
92816cc3 615 free(job);
e32d7f27 616 session_put(session);
5b8139c6 617 session_unlock_list();
d88744a4
JD
618 continue;
619 }
620
d88744a4 621 session_lock(session);
16100d7a 622 ret = run_job(job, session, handle->notification_thread_handle);
d88744a4 623 session_unlock(session);
faf1bdcf 624 /* Release reference held by the job. */
e32d7f27 625 session_put(session);
ed1e52a3 626 session_unlock_list();
92816cc3 627 free(job);
d88744a4 628 if (ret) {
d88744a4
JD
629 goto end;
630 }
631 }
632
633 ret = 0;
634
635end:
636 return ret;
637}
638
28ab034a
JG
639static int handle_condition(const struct lttng_notification *notification,
640 struct notification_thread_handle *notification_thread_handle)
90936dcf
JD
641{
642 int ret = 0;
cd9adb8b 643 const char *condition_session_name = nullptr;
90936dcf
JD
644 enum lttng_condition_type condition_type;
645 enum lttng_condition_status condition_status;
646 enum lttng_evaluation_status evaluation_status;
647 uint64_t consumed;
648 struct ltt_session *session;
c08136a3 649 const struct lttng_condition *condition =
28ab034a 650 lttng_notification_get_const_condition(notification);
c08136a3 651 const struct lttng_evaluation *evaluation =
28ab034a 652 lttng_notification_get_const_evaluation(notification);
90936dcf
JD
653
654 condition_type = lttng_condition_get_type(condition);
655
656 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
657 ret = -1;
bd0514a5 658 ERR("Condition type and session usage type are not the same");
90936dcf
JD
659 goto end;
660 }
661
662 /* Fetch info to test */
663 condition_status = lttng_condition_session_consumed_size_get_session_name(
28ab034a 664 condition, &condition_session_name);
90936dcf 665 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
bd0514a5 666 ERR("Session name could not be fetched");
90936dcf
JD
667 ret = -1;
668 goto end;
669 }
28ab034a
JG
670 evaluation_status =
671 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
90936dcf 672 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
bd0514a5 673 ERR("Failed to get evaluation");
90936dcf
JD
674 ret = -1;
675 goto end;
676 }
677
678 session_lock_list();
679 session = session_find_by_name(condition_session_name);
680 if (!session) {
eb2827a4 681 DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
28ab034a
JG
682 lttng_condition_type_str(condition_type),
683 condition_session_name);
eb2827a4
JG
684 /*
685 * Not a fatal error: a session can be destroyed before we get
686 * the chance to handle the notification.
687 */
688 ret = 0;
689 session_unlock_list();
90936dcf
JD
690 goto end;
691 }
692 session_lock(session);
90936dcf 693
c08136a3 694 if (!lttng_trigger_is_equal(session->rotate_trigger,
28ab034a 695 lttng_notification_get_const_trigger(notification))) {
c08136a3
JG
696 /* Notification does not originate from our rotation trigger. */
697 ret = 0;
698 goto end_unlock;
699 }
700
28ab034a 701 ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
90936dcf 702 if (ret) {
490b3229 703 goto end_unlock;
90936dcf
JD
704 }
705
2545db87 706 ret = cmd_rotate_session(
cd9adb8b 707 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
2545db87
JG
708 switch (ret) {
709 case LTTNG_OK:
710 break;
711 case -LTTNG_ERR_ROTATION_PENDING:
90936dcf 712 DBG("Rotate already pending, subscribe to the next threshold value");
2545db87
JG
713 break;
714 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
715 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
716 break;
717 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
718 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
719 break;
720 default:
721 ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
90936dcf
JD
722 ret = -1;
723 goto end_unlock;
724 }
2545db87
JG
725
726 ret = subscribe_session_consumed_size_rotation(
28ab034a 727 session, consumed + session->rotate_size, notification_thread_handle);
90936dcf 728 if (ret) {
bd0514a5 729 ERR("Failed to subscribe to session consumed size condition");
90936dcf
JD
730 goto end_unlock;
731 }
732 ret = 0;
733
734end_unlock:
735 session_unlock(session);
e32d7f27 736 session_put(session);
5b8139c6 737 session_unlock_list();
90936dcf
JD
738end:
739 return ret;
740}
741
28ab034a
JG
742static int handle_notification_channel(int fd __attribute__((unused)),
743 struct rotation_thread_handle *handle,
744 struct rotation_thread *state __attribute__((unused)))
90936dcf
JD
745{
746 int ret;
dc65dda3 747 bool notification_pending = true;
cd9adb8b 748 struct lttng_notification *notification = nullptr;
90936dcf 749 enum lttng_notification_channel_status status;
90936dcf 750
dc65dda3
JG
751 /*
752 * A notification channel may have multiple notifications queued-up internally in
753 * its buffers. This is because a notification channel multiplexes command replies
754 * and notifications. The current protocol specifies that multiple notifications can be
755 * received before the reply to a command.
756 *
757 * In such cases, the notification channel client implementation internally queues them and
758 * provides them on the next calls to lttng_notification_channel_get_next_notification().
759 * This is correct with respect to the public API, which is intended to be used in "blocking
760 * mode".
761 *
762 * However, this internal user relies on poll/epoll to wake-up when data is available
763 * on the notification channel's socket. As such, it can't assume that a wake-up means only
764 * one notification is available for consumption since many of them may have been queued in
765 * the channel's internal buffers.
766 */
767 while (notification_pending) {
768 status = lttng_notification_channel_has_pending_notification(
769 rotate_notification_channel, &notification_pending);
770 if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
771 ERR("Error occurred while checking for pending notification");
772 ret = -1;
773 goto end;
774 }
d73ee93f 775
dc65dda3
JG
776 if (!notification_pending) {
777 ret = 0;
778 goto end;
779 }
d73ee93f 780
dc65dda3
JG
781 /* Receive the next notification. */
782 status = lttng_notification_channel_get_next_notification(
783 rotate_notification_channel, &notification);
784 switch (status) {
785 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
786 break;
787 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
788 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
789 ret = 0;
790 goto end;
791 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
792 ERR("Notification channel was closed");
793 ret = -1;
794 goto end;
795 default:
796 /* Unhandled conditions / errors. */
797 ERR("Unknown notification channel status");
798 ret = -1;
799 goto end;
800 }
90936dcf 801
dc65dda3
JG
802 ret = handle_condition(notification, handle->notification_thread_handle);
803 lttng_notification_destroy(notification);
804 if (ret) {
805 goto end;
806 }
90936dcf 807 }
90936dcf 808end:
90936dcf
JD
809 return ret;
810}
811
28ab034a 812static void *thread_rotation(void *data)
db66e574
JD
813{
814 int ret;
7966af57 815 struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
92816cc3 816 struct rotation_thread thread;
87380d40 817 int queue_pipe_fd;
db66e574 818
bd0514a5 819 DBG("Started rotation thread");
f620cc28
JG
820 rcu_register_thread();
821 rcu_thread_online();
412d7227 822 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
f620cc28 823 health_code_update();
db66e574
JD
824
825 if (!handle) {
bd0514a5 826 ERR("Invalid thread context provided");
db66e574
JD
827 goto end;
828 }
829
28ab034a 830 queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
db66e574 831
92816cc3 832 ret = init_thread_state(handle, &thread);
db66e574 833 if (ret) {
f5f8e5cd 834 goto error;
db66e574
JD
835 }
836
db66e574
JD
837 while (true) {
838 int fd_count, i;
839
840 health_poll_entry();
bd0514a5 841 DBG("Entering poll wait");
92816cc3 842 ret = lttng_poll_wait(&thread.events, -1);
bd0514a5 843 DBG("Poll wait returned (%i)", ret);
db66e574
JD
844 health_poll_exit();
845 if (ret < 0) {
846 /*
847 * Restart interrupted system call.
848 */
849 if (errno == EINTR) {
850 continue;
851 }
bd0514a5 852 ERR("Error encountered during lttng_poll_wait (%i)", ret);
db66e574
JD
853 goto error;
854 }
855
856 fd_count = ret;
857 for (i = 0; i < fd_count; i++) {
92816cc3
JG
858 int fd = LTTNG_POLL_GETFD(&thread.events, i);
859 uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
db66e574 860
28ab034a 861 DBG("Handling fd (%i) activity (%u)", fd, revents);
db66e574 862
92816cc3 863 if (revents & LPOLLERR) {
bd0514a5 864 ERR("Polling returned an error on fd %i", fd);
92816cc3
JG
865 goto error;
866 }
867
dc65dda3
JG
868 if (fd == rotate_notification_channel->socket ||
869 fd == rotate_notification_channel_subscription_change_eventfd) {
28ab034a 870 ret = handle_notification_channel(fd, handle, &thread);
85e17b27 871 if (ret) {
bd0514a5 872 ERR("Error occurred while handling activity on notification channel socket");
85e17b27
JG
873 goto error;
874 }
dc65dda3
JG
875
876 if (fd == rotate_notification_channel_subscription_change_eventfd) {
877 uint64_t eventfd_value;
878 const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
879
880 if (read_ret != sizeof(eventfd_value)) {
881 PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
882 goto error;
883 }
884 }
85e17b27
JG
885 } else {
886 /* Job queue or quit pipe activity. */
85e17b27
JG
887
888 /*
889 * The job queue is serviced if there is
890 * activity on the quit pipe to ensure it is
891 * flushed and all references held in the queue
892 * are released.
893 */
28ab034a
JG
894 ret = handle_job_queue(
895 handle, &thread, handle->rotation_timer_queue);
d88744a4 896 if (ret) {
bd0514a5 897 ERR("Failed to handle rotation timer pipe event");
d88744a4
JD
898 goto error;
899 }
85e17b27 900
64d9b072
JG
901 if (fd == queue_pipe_fd) {
902 char buf;
903
904 ret = lttng_read(fd, &buf, 1);
905 if (ret != 1) {
28ab034a
JG
906 ERR("Failed to read from wakeup pipe (fd = %i)",
907 fd);
64d9b072
JG
908 goto error;
909 }
910 } else {
bd0514a5 911 DBG("Quit pipe activity");
85e17b27 912 goto exit;
90936dcf 913 }
db66e574
JD
914 }
915 }
916 }
917exit:
918error:
bd0514a5 919 DBG("Thread exit");
92816cc3 920 fini_thread_state(&thread);
f620cc28 921end:
412d7227 922 health_unregister(the_health_sessiond);
03732c32
JG
923 rcu_thread_offline();
924 rcu_unregister_thread();
cd9adb8b 925 return nullptr;
db66e574 926}
64d9b072 927
28ab034a 928static bool shutdown_rotation_thread(void *thread_data)
64d9b072 929{
7966af57 930 struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
64d9b072
JG
931 const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
932
933 return notify_thread_pipe(write_fd) == 1;
934}
935
936bool launch_rotation_thread(struct rotation_thread_handle *handle)
937{
938 struct lttng_thread *thread;
939
28ab034a 940 thread = lttng_thread_create(
cd9adb8b 941 "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
64d9b072
JG
942 if (!thread) {
943 goto error;
944 }
945 lttng_thread_put(thread);
946 return true;
947error:
948 return false;
949}
This page took 0.10995 seconds and 4 git commands to generate.