* Cleanup the daemon
*/
static
-void cleanup(void)
+void cleanup_relayd_live(void)
{
DBG("Cleaning up");
return ret;
}
-/*
- * Write to writable pipe used to notify a thread.
- */
-static
-int notify_thread_pipe(int wpipe)
-{
- ssize_t ret;
-
- ret = lttng_write(wpipe, "!", 1);
- if (ret < 1) {
- PERROR("write poll pipe");
- }
-
- return (int) ret;
-}
-
-/*
- * Stop all threads by closing the thread quit pipe.
- */
-static
-void stop_threads(void)
+int relayd_live_stop(void)
{
- int ret;
-
- /* Stopping all threads */
- DBG("Terminating all live threads");
- ret = notify_thread_pipe(thread_quit_pipe[1]);
- if (ret < 0) {
- ERR("write error on thread quit pipe");
- }
-
- /* Dispatch thread */
+ /* Stop dispatch thread */
CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
futex_nto1_wake(&viewer_conn_queue.futex);
+ return 0;
}
/*
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
health_unregister(health_relayd);
DBG("Live viewer listener thread cleanup complete");
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
return NULL;
}
}
health_unregister(health_relayd);
DBG("Live viewer dispatch thread dying");
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
return NULL;
}
reply.major = htobe32(reply.major);
reply.minor = htobe32(reply.minor);
if (conn->type == RELAY_VIEWER_COMMAND) {
- reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
+ /*
+ * Increment outside of htobe64 macro, because can be used more than once
+ * within the macro, and thus the operation may be undefined.
+ */
+ last_relay_viewer_session_id++;
+ reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
}
health_code_update();
{
int ret, err = -1;
uint32_t nb_fd;
- struct relay_connection *conn;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttng_viewer_cmd recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+ struct relay_connection *destroy_conn;
DBG("[thread] Live viewer relay worker started");
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
ERR("Relay live pipe error");
goto error;
} else if (revents & LPOLLIN) {
+ struct relay_connection *conn;
+
ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
+ struct relay_connection *conn;
+
rcu_read_lock();
conn = connection_find_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
/* Cleanup reamaining connection object. */
rcu_read_lock();
- cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+ cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
+ destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, destroy_conn);
}
rcu_read_unlock();
error_poll_create:
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
rcu_unregister_thread();
return NULL;
}
*/
static int create_conn_pipe(void)
{
- int ret;
-
- ret = utils_create_pipe_cloexec(live_conn_pipe);
-
- return ret;
+ return utils_create_pipe_cloexec(live_conn_pipe);
}
-void live_stop_threads(void)
+int relayd_live_join(void)
{
- int ret;
+ int ret, retval = 0;
void *status;
- stop_threads();
-
ret = pthread_join(live_listener_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live listener");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
ret = pthread_join(live_worker_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live worker");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
ret = pthread_join(live_dispatcher_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live dispatcher");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
- cleanup();
+ cleanup_relayd_live();
-error:
- return;
+ return retval;
}
/*
* main
*/
-int live_start_threads(struct lttng_uri *uri,
+int relayd_live_create(struct lttng_uri *uri,
struct relay_local_data *relay_ctx)
{
- int ret = 0;
+ int ret = 0, retval = 0;
void *status;
int is_root;
- assert(uri);
+ if (!uri) {
+ retval = -1;
+ goto exit_init_data;
+ }
live_uri = uri;
/* Check if daemon is UID = 0 */
if (!is_root) {
if (live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
- ret = -1;
- goto exit;
+ retval = -1;
+ goto exit_init_data;
}
}
/* Setup the thread apps communication pipe. */
- if ((ret = create_conn_pipe()) < 0) {
- goto exit;
+ if (create_conn_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* Init relay command queue. */
cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
/* Set up max poll set size */
- lttng_poll_set_max_size();
+ if (lttng_poll_set_max_size()) {
+ retval = -1;
+ goto exit_init_data;
+ }
/* Setup the dispatcher thread */
ret = pthread_create(&live_dispatcher_thread, NULL,
thread_dispatcher, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer dispatcher");
- goto exit_dispatcher;
+ retval = -1;
+ goto exit_dispatcher_thread;
}
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
thread_worker, relay_ctx);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer worker");
- goto exit_worker;
+ retval = -1;
+ goto exit_worker_thread;
}
/* Setup the listener thread */
ret = pthread_create(&live_listener_thread, NULL,
thread_listener, (void *) NULL);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer listener");
- goto exit_listener;
+ retval = -1;
+ goto exit_listener_thread;
}
- ret = 0;
- goto end;
+ /*
+ * All OK, started all threads.
+ */
+ return retval;
-exit_listener:
- ret = pthread_join(live_listener_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join live listener");
- goto error; /* join error, exit without cleanup */
- }
+ /*
+ * Join on the live_listener_thread should anything be added after
+ * the live_listener thread's creation.
+ */
+
+exit_listener_thread:
-exit_worker:
ret = pthread_join(live_worker_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live worker");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
+exit_worker_thread:
-exit_dispatcher:
ret = pthread_join(live_dispatcher_thread, &status);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live dispatcher");
- goto error; /* join error, exit without cleanup */
+ retval = -1;
}
+exit_dispatcher_thread:
-exit:
- cleanup();
+exit_init_data:
+ cleanup_relayd_live();
-end:
-error:
- return ret;
+ return retval;
}