#include "lttng-viewer.h"
#include "utils.h"
#include "health-relayd.h"
+#include "testpoint.h"
static struct lttng_uri *live_uri;
-/*
- * Quit pipe for all threads. This permits a single cancellation point
- * for all threads when receiving an event on the pipe.
- */
-static int live_thread_quit_pipe[2] = { -1, -1 };
-
/*
* This pipe is used to inform the worker thread that a command is queued and
* ready to be processed.
/* Stopping all threads */
DBG("Terminating all live threads");
- ret = notify_thread_pipe(live_thread_quit_pipe[1]);
+ ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
}
}
/* Add quit pipe */
- ret = lttng_poll_add(events, live_thread_quit_pipe[0], LPOLLIN);
+ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
if (ret < 0) {
goto error;
}
static
int check_thread_quit_pipe(int fd, uint32_t events)
{
- if (fd == live_thread_quit_pipe[0] && (events & LPOLLIN)) {
+ if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
return 1;
}
goto error_sock_control;
}
- /*
- * Pass 3 as size here for the thread quit pipe, control and data socket.
- */
+ /* Pass 2 as size here for the thread quit pipe and control sockets. */
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
goto error_create_poll;
goto error_poll_add;
}
+ lttng_relay_notify_ready();
+
+ if (testpoint(relayd_thread_live_listener)) {
+ goto error_testpoint;
+ }
+
while (1) {
health_code_update();
exit:
error:
error_poll_add:
+error_testpoint:
lttng_poll_clean(&events);
error_create_poll:
if (live_control_sock->fd >= 0) {
health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_DISPATCHER);
+ if (testpoint(relayd_thread_live_dispatcher)) {
+ goto error_testpoint;
+ }
+
health_code_update();
while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
err = 0;
error:
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
* stream.
*/
if (ret_ref == 1 && vstream->ctf_trace->viewer_metadata_stream) {
+ delete_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
destroy_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
vstream->ctf_trace->metadata_stream = NULL;
DBG("Freeing ctf_trace %" PRIu64, vstream->ctf_trace->id);
call_rcu(&vstream->rcu_node, deferred_free_viewer_stream);
}
+/*
+ * Atomically check if new streams got added in the session since the last
+ * check and reset the flag to 0.
+ *
+ * Returns 1 if new streams got added, 0 if nothing changed, a negative value
+ * on error.
+ */
+static
+int check_new_streams(uint64_t session_id, struct lttng_ht *sessions_ht)
+{
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+ struct relay_session *session;
+ unsigned long current_val;
+ int ret;
+
+ lttng_ht_lookup(sessions_ht,
+ (void *)((unsigned long) session_id), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ DBG("Relay session %" PRIu64 " not found", session_id);
+ ret = -1;
+ goto error;
+ }
+
+ session = caa_container_of(node, struct relay_session, session_n);
+
+ current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
+ ret = current_val;
+
+error:
+ return ret;
+}
+
/*
* Send the next index for a stream.
*
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
}
+ ret = check_new_streams(vstream->session_id, sessions_ht);
+ if (ret < 0) {
+ goto end_unlock;
+ } else if (ret == 1) {
+ viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+ }
+
pthread_mutex_lock(&vstream->overwrite_lock);
if (vstream->abort_flag) {
/*
* Return 0 on success or else a negative value.
*/
static
-int viewer_get_packet(struct relay_command *cmd)
+int viewer_get_packet(struct relay_command *cmd,
+ struct lttng_ht *sessions_ht)
{
int ret, send_data = 0;
char *data = NULL;
goto send_reply;
}
+ ret = check_new_streams(stream->session_id, sessions_ht);
+ if (ret < 0) {
+ goto end_unlock;
+ } else if (ret == 1) {
+ reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+ reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+ goto send_reply;
+ }
+
len = be32toh(get_packet_info.len);
data = zmalloc(len);
if (!data) {
ret = viewer_get_next_index(cmd, sessions_ht);
break;
case VIEWER_GET_PACKET:
- ret = viewer_get_packet(cmd);
+ ret = viewer_get_packet(cmd, sessions_ht);
break;
case VIEWER_GET_METADATA:
ret = viewer_get_metadata(cmd);
health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_WORKER);
+ if (testpoint(relayd_thread_live_worker)) {
+ goto error_testpoint;
+ }
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
DBG("Viewer worker thread exited with error");
}
DBG("Viewer worker thread cleanup complete");
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
* main
*/
int live_start_threads(struct lttng_uri *uri,
- struct relay_local_data *relay_ctx, int quit_pipe[2])
+ struct relay_local_data *relay_ctx)
{
int ret = 0;
void *status;
assert(uri);
live_uri = uri;
- live_thread_quit_pipe[0] = quit_pipe[0];
- live_thread_quit_pipe[1] = quit_pipe[1];
-
/* Check if daemon is UID = 0 */
is_root = !getuid();