#include "lttng-relayd.h"
#include "live.h"
#include "health-relayd.h"
+#include "testpoint.h"
/* command line options */
char *opt_output_path;
* Quit pipe for all threads. This permits a single cancellation point
* for all threads when receiving an event on the pipe.
*/
-static int thread_quit_pipe[2] = { -1, -1 };
+int thread_quit_pipe[2] = { -1, -1 };
/*
* This pipe is used to inform the worker thread that a command is queued and
lttng_relay_notify_ready();
+ if (testpoint(relayd_thread_listener)) {
+ goto error_testpoint;
+ }
+
while (1) {
health_code_update();
exit:
error:
error_poll_add:
+error_testpoint:
lttng_poll_clean(&events);
error_create_poll:
if (data_sock->fd >= 0) {
health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
+ if (testpoint(relayd_thread_dispatcher)) {
+ goto error_testpoint;
+ }
+
health_code_update();
while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
err = 0;
error:
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
session->sock = cmd->sock;
session->minor = cmd->minor;
session->major = cmd->major;
+ pthread_mutex_init(&session->viewer_ready_lock, NULL);
cmd->session = session;
reply.session_id = htobe64(session->id);
switch (cmd->minor) {
+ case 1:
+ case 2:
+ case 3:
+ break;
case 4: /* LTTng sessiond 2.4 */
default:
ret = cmd_create_session_2_4(cmd, session);
{
struct relay_stream_recv_handle *node, *tmp_node;
+ pthread_mutex_lock(&cmd->session->viewer_ready_lock);
+
cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
struct relay_stream *stream;
free(node);
}
+ pthread_mutex_unlock(&cmd->session->viewer_ready_lock);
return;
}
* stream message is received, this list is emptied and streams are set
* with the viewer ready flag.
*/
- if (stream->metadata_flag) {
- stream->viewer_ready = 1;
- } else {
- queue_stream_handle(stream->stream_handle, cmd);
- }
+ queue_stream_handle(stream->stream_handle, cmd);
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
*/
set_viewer_ready_flag(cmd);
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ uatomic_set(&cmd->session->new_streams, 1);
+
reply.ret_code = htobe32(LTTNG_OK);
send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (send_ret < 0) {
health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
+ if (testpoint(relayd_thread_worker)) {
+ goto error_testpoint;
+ }
+
health_code_update();
/* table of connections indexed on socket */
}
DBG("Worker thread cleanup complete");
free(data_buffer);
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
goto exit_listener;
}
- ret = live_start_threads(live_uri, relay_ctx, thread_quit_pipe);
+ ret = live_start_threads(live_uri, relay_ctx);
if (ret != 0) {
ERR("Starting live viewer threads");
goto exit_live;