X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=69e2eb3b286c2f54cad19fefbdbe1e5f2bb93492;hp=84057bfb4048bf4ab8f6b4c0c23ec27e55d30da1;hb=80e8027abb847655ebe43b2b5aec1a5141bb9668;hpb=fb4d42ab5ddb624b0a1059fbd88bd9f90290971f diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 84057bfb4..69e2eb3b2 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -57,15 +57,10 @@ #include "lttng-viewer.h" #include "utils.h" #include "health-relayd.h" +#include "testpoint.h" static struct lttng_uri *live_uri; -/* - * Quit pipe for all threads. This permits a single cancellation point - * for all threads when receiving an event on the pipe. - */ -static int live_thread_quit_pipe[2] = { -1, -1 }; - /* * This pipe is used to inform the worker thread that a command is queued and * ready to be processed. @@ -126,7 +121,7 @@ void stop_threads(void) /* Stopping all threads */ DBG("Terminating all live threads"); - ret = notify_thread_pipe(live_thread_quit_pipe[1]); + ret = notify_thread_pipe(thread_quit_pipe[1]); if (ret < 0) { ERR("write error on thread quit pipe"); } @@ -155,7 +150,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size) } /* Add quit pipe */ - ret = lttng_poll_add(events, live_thread_quit_pipe[0], LPOLLIN | LPOLLERR); + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -174,7 +169,7 @@ error: static int check_thread_quit_pipe(int fd, uint32_t events) { - if (fd == live_thread_quit_pipe[0] && (events & LPOLLIN)) { + if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { return 1; } @@ -259,6 +254,10 @@ void *thread_listener(void *data) lttng_relay_notify_ready(); + if (testpoint(relayd_thread_live_listener)) { + goto error_testpoint; + } + while (1) { health_code_update(); @@ -346,6 +345,7 @@ restart: exit: error: error_poll_add: +error_testpoint: lttng_poll_clean(&events); error_create_poll: if (live_control_sock->fd >= 0) { @@ -381,6 +381,10 @@ void *thread_dispatcher(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_DISPATCHER); + if (testpoint(relayd_thread_live_dispatcher)) { + goto error_testpoint; + } + health_code_update(); while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) { @@ -429,6 +433,7 @@ void *thread_dispatcher(void *data) err = 0; error: +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -847,6 +852,166 @@ error: return ret; } +/* + * Send the viewer the list of current sessions. + */ +static +int viewer_get_new_streams(struct relay_command *cmd, + struct lttng_ht *sessions_ht) +{ + int ret, send_streams = 0; + uint32_t nb_streams = 0; + struct lttng_viewer_new_streams_request request; + struct lttng_viewer_new_streams_response response; + struct lttng_viewer_stream send_stream; + struct relay_stream *stream; + struct relay_viewer_stream *viewer_stream; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct relay_session *session; + + assert(cmd); + assert(sessions_ht); + + DBG("Get new streams received"); + + if (cmd->version_check_done == 0) { + ERR("Trying to get streams before version check"); + ret = -1; + goto end_no_session; + } + + health_code_update(); + + ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0); + if (ret < 0 || ret != sizeof(request)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay failed to receive the command parameters."); + } + ret = -1; + goto error; + } + + health_code_update(); + + rcu_read_lock(); + lttng_ht_lookup(sessions_ht, + (void *)((unsigned long) be64toh(request.session_id)), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay session %" PRIu64 " not found", + be64toh(request.session_id)); + response.status = htobe32(VIEWER_NEW_STREAMS_ERR); + goto send_reply; + } + + session = caa_container_of(node, struct relay_session, session_n); + if (cmd->session_id == session->id) { + /* We confirmed the viewer is asking for the same session. */ + send_streams = 1; + response.status = htobe32(VIEWER_NEW_STREAMS_OK); + } else { + send_streams = 0; + response.status = htobe32(VIEWER_NEW_STREAMS_ERR); + goto send_reply; + } + + /* + * Fill the viewer_streams_ht to count the number of streams ready to be + * sent and avoid concurrency issues on the relay_streams_ht and don't rely + * on a total session stream count. + */ + pthread_mutex_lock(&session->viewer_ready_lock); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + stream_n.node) { + struct relay_viewer_stream *vstream; + + health_code_update(); + + /* + * Don't send stream if no ctf_trace, wrong session or if the stream is + * not ready for the viewer. + */ + if (stream->session != cmd->session || + !stream->ctf_trace || !stream->viewer_ready) { + continue; + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle); + if (!vstream) { + ret = init_viewer_stream(stream, 0); + if (ret < 0) { + pthread_mutex_unlock(&session->viewer_ready_lock); + goto end_unlock; + } + nb_streams++; + } else if (!vstream->sent_flag) { + nb_streams++; + } + } + pthread_mutex_unlock(&session->viewer_ready_lock); + + response.streams_count = htobe32(nb_streams); + +send_reply: + health_code_update(); + ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0); + if (ret < 0) { + ERR("Relay sending viewer attach response"); + goto end_unlock; + } + health_code_update(); + + /* + * Unknown or empty session, just return gracefully, the viewer knows what + * is happening. + */ + if (!send_streams || !nb_streams) { + ret = 0; + goto end_unlock; + } + + /* We should only be there if we have a session to attach to. */ + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, viewer_stream, + stream_n.node) { + health_code_update(); + + /* Don't send back if session does not match or already sent. */ + if (viewer_stream->session_id != cmd->session->id || + viewer_stream->sent_flag) { + continue; + } + + send_stream.id = htobe64(viewer_stream->stream_handle); + send_stream.ctf_trace_id = htobe64(viewer_stream->ctf_trace->id); + send_stream.metadata_flag = htobe32(viewer_stream->metadata_flag); + strncpy(send_stream.path_name, viewer_stream->path_name, + sizeof(send_stream.path_name)); + strncpy(send_stream.channel_name, viewer_stream->channel_name, + sizeof(send_stream.channel_name)); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &send_stream, + sizeof(send_stream), 0); + if (ret < 0) { + ERR("Relay sending stream %" PRIu64, viewer_stream->stream_handle); + goto end_unlock; + } + DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + viewer_stream->sent_flag = 1; + } + + ret = 0; + +end_unlock: + rcu_read_unlock(); +end_no_session: +error: + return ret; +} + /* * Send the viewer the list of current sessions. */ @@ -855,7 +1020,7 @@ int viewer_attach_session(struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret, send_streams = 0; - uint32_t nb_streams = 0, nb_streams_ready = 0; + uint32_t nb_streams = 0; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; struct lttng_viewer_stream send_stream; @@ -949,6 +1114,7 @@ int viewer_attach_session(struct relay_command *cmd, * ready to be sent and avoid concurrency issues on the * relay_streams_ht and don't rely on a total session stream count. */ + pthread_mutex_lock(&session->viewer_ready_lock); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { struct relay_viewer_stream *vstream; @@ -962,7 +1128,6 @@ int viewer_attach_session(struct relay_command *cmd, if (stream->session != cmd->session) { continue; } - nb_streams++; /* * Don't send streams with no ctf_trace, they are not @@ -971,21 +1136,19 @@ int viewer_attach_session(struct relay_command *cmd, if (!stream->ctf_trace || !stream->viewer_ready) { continue; } - nb_streams_ready++; vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { ret = init_viewer_stream(stream, seek_last); if (ret < 0) { + pthread_mutex_unlock(&session->viewer_ready_lock); goto end_unlock; } } + nb_streams++; } + pthread_mutex_unlock(&session->viewer_ready_lock); - /* We must have the same amount of existing stream and ready stream. */ - if (nb_streams != nb_streams_ready) { - nb_streams = 0; - } response.streams_count = htobe32(nb_streams); } @@ -1037,6 +1200,7 @@ send_reply: goto end_unlock; } DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + viewer_stream->sent_flag = 1; } ret = 0; @@ -1137,6 +1301,40 @@ void destroy_viewer_stream(struct relay_viewer_stream *vstream) call_rcu(&vstream->rcu_node, deferred_free_viewer_stream); } +/* + * Atomically check if new streams got added in the session since the last + * check and reset the flag to 0. + * + * Returns 1 if new streams got added, 0 if nothing changed, a negative value + * on error. + */ +static +int check_new_streams(uint64_t session_id, struct lttng_ht *sessions_ht) +{ + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct relay_session *session; + unsigned long current_val; + int ret; + + lttng_ht_lookup(sessions_ht, + (void *)((unsigned long) session_id), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay session %" PRIu64 " not found", session_id); + ret = -1; + goto error; + } + + session = caa_container_of(node, struct relay_session, session_n); + + current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); + ret = current_val; + +error: + return ret; +} + /* * Send the next index for a stream. * @@ -1261,6 +1459,13 @@ int viewer_get_next_index(struct relay_command *cmd, viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; } + ret = check_new_streams(vstream->session_id, sessions_ht); + if (ret < 0) { + goto end_unlock; + } else if (ret == 1) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + } + pthread_mutex_lock(&vstream->overwrite_lock); if (vstream->abort_flag) { /* @@ -1349,7 +1554,8 @@ end: * Return 0 on success or else a negative value. */ static -int viewer_get_packet(struct relay_command *cmd) +int viewer_get_packet(struct relay_command *cmd, + struct lttng_ht *sessions_ht) { int ret, send_data = 0; char *data = NULL; @@ -1424,6 +1630,15 @@ int viewer_get_packet(struct relay_command *cmd) goto send_reply; } + ret = check_new_streams(stream->session_id, sessions_ht); + if (ret < 0) { + goto end_unlock; + } else if (ret == 1) { + reply.status = htobe32(VIEWER_GET_PACKET_ERR); + reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; + goto send_reply; + } + len = be32toh(get_packet_info.len); data = zmalloc(len); if (!data) { @@ -1658,11 +1873,14 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_get_next_index(cmd, sessions_ht); break; case VIEWER_GET_PACKET: - ret = viewer_get_packet(cmd); + ret = viewer_get_packet(cmd, sessions_ht); break; case VIEWER_GET_METADATA: ret = viewer_get_metadata(cmd); break; + case VIEWER_GET_NEW_STREAMS: + ret = viewer_get_new_streams(cmd, sessions_ht); + break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); live_relay_unknown_command(cmd); @@ -1831,6 +2049,10 @@ void *thread_worker(void *data) health_register(health_relayd, HEALTH_RELAYD_TYPE_LIVE_WORKER); + if (testpoint(relayd_thread_live_worker)) { + goto error_testpoint; + } + /* table of connections indexed on socket */ relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); if (!relay_connections_ht) { @@ -1986,6 +2208,7 @@ relay_connections_ht_error: DBG("Viewer worker thread exited with error"); } DBG("Viewer worker thread cleanup complete"); +error_testpoint: if (err) { health_error(); ERR("Health error occurred in %s", __func__); @@ -2044,7 +2267,7 @@ error: * main */ int live_start_threads(struct lttng_uri *uri, - struct relay_local_data *relay_ctx, int quit_pipe[2]) + struct relay_local_data *relay_ctx) { int ret = 0; void *status; @@ -2053,9 +2276,6 @@ int live_start_threads(struct lttng_uri *uri, assert(uri); live_uri = uri; - live_thread_quit_pipe[0] = quit_pipe[0]; - live_thread_quit_pipe[1] = quit_pipe[1]; - /* Check if daemon is UID = 0 */ is_root = !getuid();