X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=fd570e0fc4bd801ebe065e5fd32b2f1c7780323f;hp=c60f7e4c9c4db0f3d00dff773b2c537b0cc4bd43;hb=b4aacfdce9fe9fddc01b738a8cc807d764245cef;hpb=53efb85a242809ed5ed21e9ab40effa696ecbc6f diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index c60f7e4c9..fd570e0fc 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -17,6 +17,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -93,7 +95,7 @@ static uint64_t last_relay_viewer_session_id; * Cleanup the daemon */ static -void cleanup(void) +void cleanup_relayd_live(void) { DBG("Cleaning up"); @@ -324,40 +326,12 @@ error_unlock: return ret; } -/* - * Write to writable pipe used to notify a thread. - */ -static -int notify_thread_pipe(int wpipe) +int relayd_live_stop(void) { - ssize_t ret; - - ret = lttng_write(wpipe, "!", 1); - if (ret < 1) { - PERROR("write poll pipe"); - } - - return (int) ret; -} - -/* - * Stop all threads by closing the thread quit pipe. - */ -static -void stop_threads(void) -{ - int ret; - - /* Stopping all threads */ - DBG("Terminating all live threads"); - ret = notify_thread_pipe(thread_quit_pipe[1]); - if (ret < 0) { - ERR("write error on thread quit pipe"); - } - - /* Dispatch thread */ + /* Stop dispatch thread */ CMM_STORE_SHARED(live_dispatch_thread_exit, 1); futex_nto1_wake(&viewer_conn_queue.futex); + return 0; } /* @@ -557,11 +531,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&viewer_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); } @@ -588,7 +563,9 @@ error_sock_control: } health_unregister(health_relayd); DBG("Live viewer listener thread cleanup complete"); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } return NULL; } @@ -600,7 +577,7 @@ void *thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *conn = NULL; DBG("[thread] Live viewer relay dispatcher started"); @@ -623,7 +600,8 @@ void *thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&viewer_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&viewer_conn_queue.head, + &viewer_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the live-viewer " "relay command queue"); @@ -664,7 +642,9 @@ error_testpoint: } health_unregister(health_relayd); DBG("Live viewer dispatch thread dying"); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } return NULL; } @@ -800,14 +780,9 @@ int viewer_list_sessions(struct relay_connection *conn) } health_code_update(); - rcu_read_unlock(); ret = 0; - goto end; - end_unlock: rcu_read_unlock(); - -end: return ret; } @@ -1205,6 +1180,7 @@ static int check_index_status(struct relay_viewer_stream *vstream, */ index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); index->timestamp_end = htobe64(rstream->beacon_ts_end); + index->stream_id = htobe64(rstream->ctf_stream_id); goto index_ready; } else if (rstream->total_index_received <= vstream->last_sent_index && !vstream->close_write_flag) { @@ -1318,7 +1294,7 @@ int viewer_get_next_index(struct relay_connection *conn) ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (ret < 0) { - goto end; + goto end_unlock; } else if (ret == 1) { /* * This means the viewer index data structure has been populated by the @@ -2032,7 +2008,9 @@ error_testpoint: ERR("Health error occurred in %s", __func__); } health_unregister(health_relayd); - stop_threads(); + if (lttng_relay_stop_threads()) { + ERR("Error stopping threads"); + } rcu_unregister_thread(); return NULL; } @@ -2043,55 +2021,54 @@ error_testpoint: */ static int create_conn_pipe(void) { - int ret; - - ret = utils_create_pipe_cloexec(live_conn_pipe); - - return ret; + return utils_create_pipe_cloexec(live_conn_pipe); } -void live_stop_threads(void) +int relayd_live_join(void) { - int ret; + int ret, retval = 0; void *status; - stop_threads(); - ret = pthread_join(live_listener_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live listener"); - goto error; /* join error, exit without cleanup */ + retval = -1; } ret = pthread_join(live_worker_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live worker"); - goto error; /* join error, exit without cleanup */ + retval = -1; } ret = pthread_join(live_dispatcher_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live dispatcher"); - goto error; /* join error, exit without cleanup */ + retval = -1; } - cleanup(); + cleanup_relayd_live(); -error: - return; + return retval; } /* * main */ -int live_start_threads(struct lttng_uri *uri, +int relayd_live_create(struct lttng_uri *uri, struct relay_local_data *relay_ctx) { - int ret = 0; + int ret = 0, retval = 0; void *status; int is_root; - assert(uri); + if (!uri) { + retval = -1; + goto exit_init_data; + } live_uri = uri; /* Check if daemon is UID = 0 */ @@ -2100,18 +2077,19 @@ int live_start_threads(struct lttng_uri *uri, if (!is_root) { if (live_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); - ret = -1; - goto exit; + retval = -1; + goto exit_init_data; } } /* Setup the thread apps communication pipe. */ - if ((ret = create_conn_pipe()) < 0) { - goto exit; + if (create_conn_pipe()) { + retval = -1; + goto exit_init_data; } /* Init relay command queue. */ - cds_wfq_init(&viewer_conn_queue.queue); + cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail); /* Set up max poll set size */ lttng_poll_set_max_size(); @@ -2119,55 +2097,65 @@ int live_start_threads(struct lttng_uri *uri, /* Setup the dispatcher thread */ ret = pthread_create(&live_dispatcher_thread, NULL, thread_dispatcher, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create viewer dispatcher"); - goto exit_dispatcher; + retval = -1; + goto exit_dispatcher_thread; } /* Setup the worker thread */ ret = pthread_create(&live_worker_thread, NULL, thread_worker, relay_ctx); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create viewer worker"); - goto exit_worker; + retval = -1; + goto exit_worker_thread; } /* Setup the listener thread */ ret = pthread_create(&live_listener_thread, NULL, thread_listener, (void *) NULL); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_create viewer listener"); - goto exit_listener; + retval = -1; + goto exit_listener_thread; } - ret = 0; - goto end; + /* + * All OK, started all threads. + */ + return retval; + -exit_listener: ret = pthread_join(live_listener_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live listener"); - goto error; /* join error, exit without cleanup */ + retval = -1; } +exit_listener_thread: -exit_worker: ret = pthread_join(live_worker_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live worker"); - goto error; /* join error, exit without cleanup */ + retval = -1; } +exit_worker_thread: -exit_dispatcher: ret = pthread_join(live_dispatcher_thread, &status); - if (ret != 0) { + if (ret) { + errno = ret; PERROR("pthread_join live dispatcher"); - goto error; /* join error, exit without cleanup */ + retval = -1; } +exit_dispatcher_thread: -exit: - cleanup(); +exit_init_data: + cleanup_relayd_live(); -end: -error: - return ret; + return retval; }