return ret;
}
-/*
- * Write to writable pipe used to notify a thread.
- */
-static
-int notify_thread_pipe(int wpipe)
-{
- ssize_t ret;
-
- ret = lttng_write(wpipe, "!", 1);
- if (ret < 1) {
- PERROR("write poll pipe");
- }
-
- return (int) ret;
-}
-
-/*
- * Stop all threads by closing the thread quit pipe.
- */
-static
-int stop_threads(void)
+int relayd_live_stop(void)
{
- int ret, retval = 0;
-
- /* Stopping all threads */
- DBG("Terminating all live threads");
- ret = notify_thread_pipe(thread_quit_pipe[1]);
- if (ret < 0) {
- ERR("write error on thread quit pipe");
- retval = -1;
- }
-
- /* Dispatch thread */
+ /* Stop dispatch thread */
CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
futex_nto1_wake(&viewer_conn_queue.futex);
- return retval;
+ return 0;
}
/*
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
health_unregister(health_relayd);
DBG("Live viewer listener thread cleanup complete");
- if (stop_threads()) {
- ERR("Error stopping live threads");
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
}
return NULL;
}
}
health_unregister(health_relayd);
DBG("Live viewer dispatch thread dying");
- if (stop_threads()) {
- ERR("Error stopping live threads");
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
}
return NULL;
}
reply.major = htobe32(reply.major);
reply.minor = htobe32(reply.minor);
if (conn->type == RELAY_VIEWER_COMMAND) {
- reply.viewer_session_id = htobe64(++last_relay_viewer_session_id);
+ /*
+ * Increment outside of htobe64 macro, because can be used more than once
+ * within the macro, and thus the operation may be undefined.
+ */
+ last_relay_viewer_session_id++;
+ reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
}
health_code_update();
{
int ret, err = -1;
uint32_t nb_fd;
- struct relay_connection *conn;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttng_viewer_cmd recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+ struct relay_connection *destroy_conn;
DBG("[thread] Live viewer relay worker started");
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
ERR("Relay live pipe error");
goto error;
} else if (revents & LPOLLIN) {
+ struct relay_connection *conn;
+
ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
+ struct relay_connection *conn;
+
rcu_read_lock();
conn = connection_find_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
/* Cleanup reamaining connection object. */
rcu_read_lock();
- cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+ cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
+ destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, destroy_conn);
}
rcu_read_unlock();
error_poll_create:
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
- if (stop_threads()) {
- ERR("Error stopping live threads");
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
}
rcu_unregister_thread();
return NULL;
return utils_create_pipe_cloexec(live_conn_pipe);
}
-int relayd_live_stop(void)
-{
- return stop_threads();
-}
-
int relayd_live_join(void)
{
int ret, retval = 0;
cds_wfcq_init(&viewer_conn_queue.head, &viewer_conn_queue.tail);
/* Set up max poll set size */
- lttng_poll_set_max_size();
+ if (lttng_poll_set_max_size()) {
+ retval = -1;
+ goto exit_init_data;
+ }
/* Setup the dispatcher thread */
ret = pthread_create(&live_dispatcher_thread, NULL,
*/
return retval;
+ /*
+ * Join on the live_listener_thread should anything be added after
+ * the live_listener thread's creation.
+ */
- ret = pthread_join(live_listener_thread, &status);
- if (ret) {
- errno = ret;
- PERROR("pthread_join live listener");
- retval = -1;
- }
exit_listener_thread:
ret = pthread_join(live_worker_thread, &status);