projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Launch the notification thread using lttng_thread
[lttng-tools.git]
/
src
/
bin
/
lttng-sessiond
/
rotation-thread.c
diff --git
a/src/bin/lttng-sessiond/rotation-thread.c
b/src/bin/lttng-sessiond/rotation-thread.c
index 2e9796cc2892ad7f5b687d709ef63bd1b73d35f6..e8bd478e8db032934690979c86469b8de444d9f4 100644
(file)
--- a/
src/bin/lttng-sessiond/rotation-thread.c
+++ b/
src/bin/lttng-sessiond/rotation-thread.c
@@
-56,7
+56,7
@@
struct rotation_thread {
struct rotation_thread_job {
enum rotation_thread_job_type type;
struct rotation_thread_job {
enum rotation_thread_job_type type;
-
uint64_t session_id
;
+
struct ltt_session *session
;
/* List member in struct rotation_thread_timer_queue. */
struct cds_list_head head;
};
/* List member in struct rotation_thread_timer_queue. */
struct cds_list_head head;
};
@@
-72,11
+72,9
@@
struct rotation_thread_timer_queue {
};
struct rotation_thread_handle {
};
struct rotation_thread_handle {
- int quit_pipe;
struct rotation_thread_timer_queue *rotation_timer_queue;
/* Access to the notification thread cmd_queue */
struct notification_thread_handle *notification_thread_handle;
struct rotation_thread_timer_queue *rotation_timer_queue;
/* Access to the notification thread cmd_queue */
struct notification_thread_handle *notification_thread_handle;
- sem_t *notification_thread_ready;
};
static
};
static
@@
-132,8
+130,8
@@
void log_job_destruction(const struct rotation_thread_job *job)
abort();
}
abort();
}
- LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session
%" PRIu64
" on destruction",
- job_type_str, job->session
_id
);
+ LOG(log_level, "Rotation thread timer queue still contains job of type %s targeting session
\"%s\
" on destruction",
+ job_type_str, job->session
->name
);
}
void rotation_thread_timer_queue_destroy(
}
void rotation_thread_timer_queue_destroy(
@@
-169,10
+167,8
@@
void rotation_thread_handle_destroy(
}
struct rotation_thread_handle *rotation_thread_handle_create(
}
struct rotation_thread_handle *rotation_thread_handle_create(
- int quit_pipe,
struct rotation_thread_timer_queue *rotation_timer_queue,
struct rotation_thread_timer_queue *rotation_timer_queue,
- struct notification_thread_handle *notification_thread_handle,
- sem_t *notification_thread_ready)
+ struct notification_thread_handle *notification_thread_handle)
{
struct rotation_thread_handle *handle;
{
struct rotation_thread_handle *handle;
@@
-181,10
+177,8
@@
struct rotation_thread_handle *rotation_thread_handle_create(
goto end;
}
goto end;
}
- handle->quit_pipe = quit_pipe;
handle->rotation_timer_queue = rotation_timer_queue;
handle->notification_thread_handle = notification_thread_handle;
handle->rotation_timer_queue = rotation_timer_queue;
handle->notification_thread_handle = notification_thread_handle;
- handle->notification_thread_ready = notification_thread_ready;
end:
return handle;
end:
return handle;
@@
-196,13
+190,14
@@
end:
*/
static
bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
*/
static
bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
- enum rotation_thread_job_type job_type, uint64_t session_id)
+ enum rotation_thread_job_type job_type,
+ struct ltt_session *session)
{
bool exists = false;
struct rotation_thread_job *job;
cds_list_for_each_entry(job, &queue->list, head) {
{
bool exists = false;
struct rotation_thread_job *job;
cds_list_for_each_entry(job, &queue->list, head) {
- if (job->session
_id == session_id
&& job->type == job_type) {
+ if (job->session
== session
&& job->type == job_type) {
exists = true;
goto end;
}
exists = true;
goto end;
}
@@
-212,7
+207,8
@@
end:
}
void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
}
void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
- enum rotation_thread_job_type job_type, uint64_t session_id)
+ enum rotation_thread_job_type job_type,
+ struct ltt_session *session)
{
int ret;
const char * const dummy = "!";
{
int ret;
const char * const dummy = "!";
@@
-220,7
+216,7
@@
void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
const char *job_type_str = get_job_type_str(job_type);
pthread_mutex_lock(&queue->lock);
const char *job_type_str = get_job_type_str(job_type);
pthread_mutex_lock(&queue->lock);
- if (timer_job_exists(queue,
session_id, job_type
)) {
+ if (timer_job_exists(queue,
job_type, session
)) {
/*
* This timer job is already pending, we don't need to add
* it.
/*
* This timer job is already pending, we don't need to add
* it.
@@
-230,12
+226,15
@@
void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
job = zmalloc(sizeof(struct rotation_thread_job));
if (!job) {
job = zmalloc(sizeof(struct rotation_thread_job));
if (!job) {
- PERROR("Failed to allocate rotation thread job of type \"%s\" for session
id %" PRIu64
,
- job_type_str, session
_id
);
+ PERROR("Failed to allocate rotation thread job of type \"%s\" for session
\"%s\""
,
+ job_type_str, session
->name
);
goto end;
}
goto end;
}
+ /* No reason for this to fail as the caller must hold a reference. */
+ (void) session_get(session);
+
+ job->session = session;
job->type = job_type;
job->type = job_type;
- job->session_id = session_id;
cds_list_add_tail(&job->head, &queue->list);
ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
cds_list_add_tail(&job->head, &queue->list);
ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), dummy,
@@
-256,8
+255,8
@@
void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
DBG("Wake-up pipe of rotation thread job queue is full");
goto end;
}
DBG("Wake-up pipe of rotation thread job queue is full");
goto end;
}
- PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session
id %" PRIu64
,
- job_type_str, session
_id
);
+ PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session
\"%s\""
,
+ job_type_str, session
->name
);
goto end;
}
goto end;
}
@@
-276,15
+275,8
@@
int init_poll_set(struct lttng_poll_event *poll_set,
* - quit pipe,
* - rotation thread timer queue pipe,
*/
* - quit pipe,
* - rotation thread timer queue pipe,
*/
- ret = lttng_poll_create(poll_set, 2, LTTNG_CLOEXEC);
- if (ret < 0) {
- goto end;
- }
-
- ret = lttng_poll_add(poll_set, handle->quit_pipe,
- LPOLLIN | LPOLLERR);
- if (ret < 0) {
- ERR("[rotation-thread] Failed to add quit_pipe fd to pollset");
+ ret = sessiond_set_thread_pollset(poll_set, 2);
+ if (ret) {
goto error;
}
ret = lttng_poll_add(poll_set,
goto error;
}
ret = lttng_poll_add(poll_set,
@@
-295,7
+287,6
@@
int init_poll_set(struct lttng_poll_event *poll_set,
goto error;
}
goto error;
}
-end:
return ret;
error:
lttng_poll_clean(poll_set);
return ret;
error:
lttng_poll_clean(poll_set);
@@
-326,11
+317,6
@@
int init_thread_state(struct rotation_thread_handle *handle,
goto end;
}
goto end;
}
- /*
- * We wait until the notification thread is ready to create the
- * notification channel and add it to the poll_set.
- */
- sem_wait(handle->notification_thread_ready);
rotate_notification_channel = lttng_notification_channel_create(
lttng_session_daemon_notification_endpoint);
if (!rotate_notification_channel) {
rotate_notification_channel = lttng_notification_channel_create(
lttng_session_daemon_notification_endpoint);
if (!rotate_notification_channel) {
@@
-442,7
+428,12
@@
end:
session->rotation_pending_local = false;
}
if (ret) {
session->rotation_pending_local = false;
}
if (ret) {
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
}
return 0;
}
}
return 0;
}
@@
-502,14
+493,19
@@
int check_session_rotation_pending_relay(struct ltt_session *session)
ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
session->current_archive_id - 1,
session->name);
ERR("[rotation-thread] Encountered an error when checking if rotation of trace archive %" PRIu64 " of session \"%s\" is pending on the relay",
session->current_archive_id - 1,
session->name);
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
rotation_completed = false;
}
rcu_read_unlock();
if (rotation_completed) {
rotation_completed = false;
}
rcu_read_unlock();
if (rotation_completed) {
- DBG("[rotation-thread]
T
otation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
+ DBG("[rotation-thread]
R
otation of trace archive %" PRIu64 " of session \"%s\" is complete on the relay",
session->current_archive_id - 1,
session->name);
session->rotation_pending_relay = false;
session->current_archive_id - 1,
session->name);
session->rotation_pending_relay = false;
@@
-566,7
+562,12
@@
int check_session_rotation_pending(struct ltt_session *session,
/* Rename the completed trace archive's location. */
now = time(NULL);
if (now == (time_t) -1) {
/* Rename the completed trace archive's location. */
now = time(NULL);
if (now == (time_t) -1) {
- session->rotation_state = LTTNG_ROTATION_STATE_ERROR;
+ ret = session_reset_rotation_state(session,
+ LTTNG_ROTATION_STATE_ERROR);
+ if (ret) {
+ ERR("Failed to reset rotation state of session \"%s\"",
+ session->name);
+ }
ret = LTTNG_ERR_UNK;
goto end;
}
ret = LTTNG_ERR_UNK;
goto end;
}
@@
-653,7
+654,7
@@
end:
return ret;
}
return ret;
}
-/* Call with the session
lock
held. */
+/* Call with the session
and session_list locks
held. */
static
int launch_session_rotation(struct ltt_session *session)
{
static
int launch_session_rotation(struct ltt_session *session)
{
@@
-702,7
+703,6
@@
int handle_job_queue(struct rotation_thread_handle *handle,
{
int ret = 0;
int fd = lttng_pipe_get_readfd(queue->event_pipe);
{
int ret = 0;
int fd = lttng_pipe_get_readfd(queue->event_pipe);
- struct ltt_session *session;
char buf;
ret = lttng_read(fd, &buf, 1);
char buf;
ret = lttng_read(fd, &buf, 1);
@@
-713,6
+713,7
@@
int handle_job_queue(struct rotation_thread_handle *handle,
}
for (;;) {
}
for (;;) {
+ struct ltt_session *session;
struct rotation_thread_job *job;
/* Take the queue lock only to pop an element from the list. */
struct rotation_thread_job *job;
/* Take the queue lock only to pop an element from the list. */
@@
-727,10
+728,10
@@
int handle_job_queue(struct rotation_thread_handle *handle,
pthread_mutex_unlock(&queue->lock);
session_lock_list();
pthread_mutex_unlock(&queue->lock);
session_lock_list();
- session =
session_find_by_id(job->session_id)
;
+ session =
job->session
;
if (!session) {
if (!session) {
- DBG("[rotation-thread] Session
%" PRIu64
" not found",
-
job->session_id
);
+ DBG("[rotation-thread] Session
\"%s\
" not found",
+
session->name
);
/*
* This is a non-fatal error, and we cannot report it to
* the user (timer), so just print the error and
/*
* This is a non-fatal error, and we cannot report it to
* the user (timer), so just print the error and
@@
-743,14
+744,15
@@
int handle_job_queue(struct rotation_thread_handle *handle,
*/
session_unlock_list();
free(job);
*/
session_unlock_list();
free(job);
+ session_put(session);
continue;
}
session_lock(session);
continue;
}
session_lock(session);
- session_unlock_list();
-
ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
ret = run_job(job, session, handle->notification_thread_handle);
session_unlock(session);
+ session_put(session);
+ session_unlock_list();
free(job);
if (ret) {
goto end;
free(job);
if (ret) {
goto end;
@@
-815,7
+817,7
@@
int handle_condition(const struct lttng_condition *condition,
ret = unsubscribe_session_consumed_size_rotation(session,
notification_thread_handle);
if (ret) {
ret = unsubscribe_session_consumed_size_rotation(session,
notification_thread_handle);
if (ret) {
- goto end;
+ goto end
_unlock
;
}
ret = cmd_rotate_session(session, NULL);
}
ret = cmd_rotate_session(session, NULL);
@@
-838,6
+840,7
@@
int handle_condition(const struct lttng_condition *condition,
end_unlock:
session_unlock(session);
end_unlock:
session_unlock(session);
+ session_put(session);
end:
return ret;
}
end:
return ret;
}
@@
-857,7
+860,7
@@
int handle_notification_channel(int fd,
status = lttng_notification_channel_has_pending_notification(
rotate_notification_channel, ¬ification_pending);
if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
status = lttng_notification_channel_has_pending_notification(
rotate_notification_channel, ¬ification_pending);
if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
- ERR("[rotation-thread ]Error occured while checking for pending notification");
+ ERR("[rotation-thread ]Error occur
r
ed while checking for pending notification");
ret = -1;
goto end;
}
ret = -1;
goto end;
}
@@
-960,7
+963,7
@@
void *thread_rotation(void *data)
goto error;
}
goto error;
}
- if (
fd == handle->quit_pipe
) {
+ if (
sessiond_check_thread_quit_pipe(fd, revents)
) {
DBG("[rotation-thread] Quit pipe activity");
/* TODO flush the queue. */
goto exit;
DBG("[rotation-thread] Quit pipe activity");
/* TODO flush the queue. */
goto exit;
@@
-975,7
+978,7
@@
void *thread_rotation(void *data)
ret = handle_notification_channel(fd, handle,
&thread);
if (ret) {
ret = handle_notification_channel(fd, handle,
&thread);
if (ret) {
- ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
+ ERR("[rotation-thread] Error occur
r
ed while handling activity on notification channel socket");
goto error;
}
}
goto error;
}
}
This page took
0.030617 seconds
and
4
git commands to generate.