projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Refactor relayd main/set_options/cleanup
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
live.c
diff --git
a/src/bin/lttng-relayd/live.c
b/src/bin/lttng-relayd/live.c
index 468a7e58710dc7fcb18424fbb4d4f063ad623a21..beb67b2a5654ed54cc5ef36a75408c2d6b667364 100644
(file)
--- a/
src/bin/lttng-relayd/live.c
+++ b/
src/bin/lttng-relayd/live.c
@@
-17,6
+17,7
@@
*/
#define _GNU_SOURCE
*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <getopt.h>
#include <grp.h>
#include <limits.h>
#include <getopt.h>
#include <grp.h>
#include <limits.h>
@@
-43,6
+44,7
@@
#include <common/common.h>
#include <common/compat/poll.h>
#include <common/compat/socket.h>
#include <common/common.h>
#include <common/compat/poll.h>
#include <common/compat/socket.h>
+#include <common/compat/endian.h>
#include <common/defaults.h>
#include <common/futex.h>
#include <common/index/index.h>
#include <common/defaults.h>
#include <common/futex.h>
#include <common/index/index.h>
@@
-93,7
+95,7
@@
static uint64_t last_relay_viewer_session_id;
* Cleanup the daemon
*/
static
* Cleanup the daemon
*/
static
-void cleanup(void)
+void cleanup
_relayd_live
(void)
{
DBG("Cleaning up");
{
DBG("Cleaning up");
@@
-344,20
+346,22
@@
int notify_thread_pipe(int wpipe)
* Stop all threads by closing the thread quit pipe.
*/
static
* Stop all threads by closing the thread quit pipe.
*/
static
-
void
stop_threads(void)
+
int
stop_threads(void)
{
{
- int ret;
+ int ret
, retval = 0
;
/* Stopping all threads */
DBG("Terminating all live threads");
ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
/* Stopping all threads */
DBG("Terminating all live threads");
ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
+ retval = -1;
}
/* Dispatch thread */
CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
futex_nto1_wake(&viewer_conn_queue.futex);
}
/* Dispatch thread */
CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
futex_nto1_wake(&viewer_conn_queue.futex);
+ return retval;
}
/*
}
/*
@@
-557,11
+561,12
@@
restart:
new_conn->sock = newsock;
/* Enqueue request for the dispatcher thread. */
new_conn->sock = newsock;
/* Enqueue request for the dispatcher thread. */
- cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode);
+ cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
+ &new_conn->qnode);
/*
* Wake the dispatch queue futex. Implicit memory barrier with
/*
* Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfq_enqueue.
+ * the exchange in cds_wf
c
q_enqueue.
*/
futex_nto1_wake(&viewer_conn_queue.futex);
}
*/
futex_nto1_wake(&viewer_conn_queue.futex);
}
@@
-588,7
+593,9
@@
error_sock_control:
}
health_unregister(health_relayd);
DBG("Live viewer listener thread cleanup complete");
}
health_unregister(health_relayd);
DBG("Live viewer listener thread cleanup complete");
- stop_threads();
+ if (stop_threads()) {
+ ERR("Error stopping live threads");
+ }
return NULL;
}
return NULL;
}
@@
-600,7
+607,7
@@
void *thread_dispatcher(void *data)
{
int err = -1;
ssize_t ret;
{
int err = -1;
ssize_t ret;
- struct cds_wfq_node *node;
+ struct cds_wf
c
q_node *node;
struct relay_connection *conn = NULL;
DBG("[thread] Live viewer relay dispatcher started");
struct relay_connection *conn = NULL;
DBG("[thread] Live viewer relay dispatcher started");
@@
-623,7
+630,8
@@
void *thread_dispatcher(void *data)
health_code_update();
/* Dequeue commands */
health_code_update();
/* Dequeue commands */
- node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue);
+ node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head,
+ &viewer_conn_queue.tail);
if (node == NULL) {
DBG("Woken up but nothing in the live-viewer "
"relay command queue");
if (node == NULL) {
DBG("Woken up but nothing in the live-viewer "
"relay command queue");
@@
-664,7
+672,9
@@
error_testpoint:
}
health_unregister(health_relayd);
DBG("Live viewer dispatch thread dying");
}
health_unregister(health_relayd);
DBG("Live viewer dispatch thread dying");
- stop_threads();
+ if (stop_threads()) {
+ ERR("Error stopping live threads");
+ }
return NULL;
}
return NULL;
}
@@
-800,14
+810,9
@@
int viewer_list_sessions(struct relay_connection *conn)
}
health_code_update();
}
health_code_update();
- rcu_read_unlock();
ret = 0;
ret = 0;
- goto end;
-
end_unlock:
rcu_read_unlock();
end_unlock:
rcu_read_unlock();
-
-end:
return ret;
}
return ret;
}
@@
-1205,8
+1210,9
@@
static int check_index_status(struct relay_viewer_stream *vstream,
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
index->timestamp_end = htobe64(rstream->beacon_ts_end);
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
index->timestamp_end = htobe64(rstream->beacon_ts_end);
+ index->stream_id = htobe64(rstream->ctf_stream_id);
goto index_ready;
goto index_ready;
- } else if (rstream->total_index_received < vstream->last_sent_index
+ } else if (rstream->total_index_received <
=
vstream->last_sent_index
&& !vstream->close_write_flag) {
/*
* Reader and writer are working in the same tracefile, so we care
&& !vstream->close_write_flag) {
/*
* Reader and writer are working in the same tracefile, so we care
@@
-1318,7
+1324,7
@@
int viewer_get_next_index(struct relay_connection *conn)
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (ret < 0) {
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (ret < 0) {
- goto end;
+ goto end
_unlock
;
} else if (ret == 1) {
/*
* This means the viewer index data structure has been populated by the
} else if (ret == 1) {
/*
* This means the viewer index data structure has been populated by the
@@
-2032,7
+2038,9
@@
error_testpoint:
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
- stop_threads();
+ if (stop_threads()) {
+ ERR("Error stopping live threads");
+ }
rcu_unregister_thread();
return NULL;
}
rcu_unregister_thread();
return NULL;
}
@@
-2043,55
+2051,59
@@
error_testpoint:
*/
static int create_conn_pipe(void)
{
*/
static int create_conn_pipe(void)
{
- int ret;
-
- ret = utils_create_pipe_cloexec(live_conn_pipe);
+ return utils_create_pipe_cloexec(live_conn_pipe);
+}
- return ret;
+int relayd_live_stop(void)
+{
+ return stop_threads();
}
}
-
void live_stop_threads
(void)
+
int relayd_live_join
(void)
{
{
- int ret;
+ int ret
, retval = 0
;
void *status;
void *status;
- stop_threads();
-
ret = pthread_join(live_listener_thread, &status);
ret = pthread_join(live_listener_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live listener");
PERROR("pthread_join live listener");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
ret = pthread_join(live_worker_thread, &status);
}
ret = pthread_join(live_worker_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live worker");
PERROR("pthread_join live worker");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
ret = pthread_join(live_dispatcher_thread, &status);
}
ret = pthread_join(live_dispatcher_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live dispatcher");
PERROR("pthread_join live dispatcher");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
}
- cleanup();
+ cleanup
_relayd_live
();
-error:
- return;
+ return retval;
}
/*
* main
*/
}
/*
* main
*/
-int
live_start_threads
(struct lttng_uri *uri,
+int
relayd_live_create
(struct lttng_uri *uri,
struct relay_local_data *relay_ctx)
{
struct relay_local_data *relay_ctx)
{
- int ret = 0;
+ int ret = 0
, retval = 0
;
void *status;
int is_root;
void *status;
int is_root;
- assert(uri);
+ if (!uri) {
+ retval = -1;
+ goto exit_init_data;
+ }
live_uri = uri;
/* Check if daemon is UID = 0 */
live_uri = uri;
/* Check if daemon is UID = 0 */
@@
-2100,18
+2112,19
@@
int live_start_threads(struct lttng_uri *uri,
if (!is_root) {
if (live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
if (!is_root) {
if (live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
- ret = -1;
- goto exit;
+ ret
val
= -1;
+ goto exit
_init_data
;
}
}
/* Setup the thread apps communication pipe. */
}
}
/* Setup the thread apps communication pipe. */
- if ((ret = create_conn_pipe()) < 0) {
- goto exit;
+ if (create_conn_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* Init relay command queue. */
}
/* Init relay command queue. */
- cds_wf
q_init(&viewer_conn_queue.queue
);
+ cds_wf
cq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail
);
/* Set up max poll set size */
lttng_poll_set_max_size();
/* Set up max poll set size */
lttng_poll_set_max_size();
@@
-2119,55
+2132,65
@@
int live_start_threads(struct lttng_uri *uri,
/* Setup the dispatcher thread */
ret = pthread_create(&live_dispatcher_thread, NULL,
thread_dispatcher, (void *) NULL);
/* Setup the dispatcher thread */
ret = pthread_create(&live_dispatcher_thread, NULL,
thread_dispatcher, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer dispatcher");
PERROR("pthread_create viewer dispatcher");
- goto exit_dispatcher;
+ retval = -1;
+ goto exit_dispatcher_thread;
}
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
thread_worker, relay_ctx);
}
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
thread_worker, relay_ctx);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer worker");
PERROR("pthread_create viewer worker");
- goto exit_worker;
+ retval = -1;
+ goto exit_worker_thread;
}
/* Setup the listener thread */
ret = pthread_create(&live_listener_thread, NULL,
thread_listener, (void *) NULL);
}
/* Setup the listener thread */
ret = pthread_create(&live_listener_thread, NULL,
thread_listener, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer listener");
PERROR("pthread_create viewer listener");
- goto exit_listener;
+ retval = -1;
+ goto exit_listener_thread;
}
}
- ret = 0;
- goto end;
+ /*
+ * All OK, started all threads.
+ */
+ return retval;
+
-exit_listener:
ret = pthread_join(live_listener_thread, &status);
ret = pthread_join(live_listener_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live listener");
PERROR("pthread_join live listener");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
}
+exit_listener_thread:
-exit_worker:
ret = pthread_join(live_worker_thread, &status);
ret = pthread_join(live_worker_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live worker");
PERROR("pthread_join live worker");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
}
+exit_worker_thread:
-exit_dispatcher:
ret = pthread_join(live_dispatcher_thread, &status);
ret = pthread_join(live_dispatcher_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live dispatcher");
PERROR("pthread_join live dispatcher");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
}
+exit_dispatcher_thread:
-exit:
- cleanup();
+exit
_init_data
:
+ cleanup
_relayd_live
();
-end:
-error:
- return ret;
+ return retval;
}
}
This page took
0.028419 seconds
and
4
git commands to generate.