X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=69e2eb3b286c2f54cad19fefbdbe1e5f2bb93492;hp=1a691a849c11e972fee17059df4d5c7068846f01;hb=80e8027abb847655ebe43b2b5aec1a5141bb9668;hpb=4a9daf1745ccbd2aab029206a664f39fcbd640ce diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 1a691a849..69e2eb3b2 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -852,6 +852,166 @@ error: return ret; } +/* + * Send the viewer the list of current sessions. + */ +static +int viewer_get_new_streams(struct relay_command *cmd, + struct lttng_ht *sessions_ht) +{ + int ret, send_streams = 0; + uint32_t nb_streams = 0; + struct lttng_viewer_new_streams_request request; + struct lttng_viewer_new_streams_response response; + struct lttng_viewer_stream send_stream; + struct relay_stream *stream; + struct relay_viewer_stream *viewer_stream; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct relay_session *session; + + assert(cmd); + assert(sessions_ht); + + DBG("Get new streams received"); + + if (cmd->version_check_done == 0) { + ERR("Trying to get streams before version check"); + ret = -1; + goto end_no_session; + } + + health_code_update(); + + ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0); + if (ret < 0 || ret != sizeof(request)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay failed to receive the command parameters."); + } + ret = -1; + goto error; + } + + health_code_update(); + + rcu_read_lock(); + lttng_ht_lookup(sessions_ht, + (void *)((unsigned long) be64toh(request.session_id)), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay session %" PRIu64 " not found", + be64toh(request.session_id)); + response.status = htobe32(VIEWER_NEW_STREAMS_ERR); + goto send_reply; + } + + session = caa_container_of(node, struct relay_session, session_n); + if (cmd->session_id == session->id) { + /* We confirmed the viewer is asking for the same session. */ + send_streams = 1; + response.status = htobe32(VIEWER_NEW_STREAMS_OK); + } else { + send_streams = 0; + response.status = htobe32(VIEWER_NEW_STREAMS_ERR); + goto send_reply; + } + + /* + * Fill the viewer_streams_ht to count the number of streams ready to be + * sent and avoid concurrency issues on the relay_streams_ht and don't rely + * on a total session stream count. + */ + pthread_mutex_lock(&session->viewer_ready_lock); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + stream_n.node) { + struct relay_viewer_stream *vstream; + + health_code_update(); + + /* + * Don't send stream if no ctf_trace, wrong session or if the stream is + * not ready for the viewer. + */ + if (stream->session != cmd->session || + !stream->ctf_trace || !stream->viewer_ready) { + continue; + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle); + if (!vstream) { + ret = init_viewer_stream(stream, 0); + if (ret < 0) { + pthread_mutex_unlock(&session->viewer_ready_lock); + goto end_unlock; + } + nb_streams++; + } else if (!vstream->sent_flag) { + nb_streams++; + } + } + pthread_mutex_unlock(&session->viewer_ready_lock); + + response.streams_count = htobe32(nb_streams); + +send_reply: + health_code_update(); + ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0); + if (ret < 0) { + ERR("Relay sending viewer attach response"); + goto end_unlock; + } + health_code_update(); + + /* + * Unknown or empty session, just return gracefully, the viewer knows what + * is happening. + */ + if (!send_streams || !nb_streams) { + ret = 0; + goto end_unlock; + } + + /* We should only be there if we have a session to attach to. */ + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, viewer_stream, + stream_n.node) { + health_code_update(); + + /* Don't send back if session does not match or already sent. */ + if (viewer_stream->session_id != cmd->session->id || + viewer_stream->sent_flag) { + continue; + } + + send_stream.id = htobe64(viewer_stream->stream_handle); + send_stream.ctf_trace_id = htobe64(viewer_stream->ctf_trace->id); + send_stream.metadata_flag = htobe32(viewer_stream->metadata_flag); + strncpy(send_stream.path_name, viewer_stream->path_name, + sizeof(send_stream.path_name)); + strncpy(send_stream.channel_name, viewer_stream->channel_name, + sizeof(send_stream.channel_name)); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &send_stream, + sizeof(send_stream), 0); + if (ret < 0) { + ERR("Relay sending stream %" PRIu64, viewer_stream->stream_handle); + goto end_unlock; + } + DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + viewer_stream->sent_flag = 1; + } + + ret = 0; + +end_unlock: + rcu_read_unlock(); +end_no_session: +error: + return ret; +} + /* * Send the viewer the list of current sessions. */ @@ -860,7 +1020,7 @@ int viewer_attach_session(struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret, send_streams = 0; - uint32_t nb_streams = 0, nb_streams_ready = 0; + uint32_t nb_streams = 0; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; struct lttng_viewer_stream send_stream; @@ -954,6 +1114,7 @@ int viewer_attach_session(struct relay_command *cmd, * ready to be sent and avoid concurrency issues on the * relay_streams_ht and don't rely on a total session stream count. */ + pthread_mutex_lock(&session->viewer_ready_lock); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { struct relay_viewer_stream *vstream; @@ -967,7 +1128,6 @@ int viewer_attach_session(struct relay_command *cmd, if (stream->session != cmd->session) { continue; } - nb_streams++; /* * Don't send streams with no ctf_trace, they are not @@ -976,21 +1136,19 @@ int viewer_attach_session(struct relay_command *cmd, if (!stream->ctf_trace || !stream->viewer_ready) { continue; } - nb_streams_ready++; vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { ret = init_viewer_stream(stream, seek_last); if (ret < 0) { + pthread_mutex_unlock(&session->viewer_ready_lock); goto end_unlock; } } + nb_streams++; } + pthread_mutex_unlock(&session->viewer_ready_lock); - /* We must have the same amount of existing stream and ready stream. */ - if (nb_streams != nb_streams_ready) { - nb_streams = 0; - } response.streams_count = htobe32(nb_streams); } @@ -1042,6 +1200,7 @@ send_reply: goto end_unlock; } DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + viewer_stream->sent_flag = 1; } ret = 0; @@ -1719,6 +1878,9 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, case VIEWER_GET_METADATA: ret = viewer_get_metadata(cmd); break; + case VIEWER_GET_NEW_STREAMS: + ret = viewer_get_new_streams(cmd, sessions_ht); + break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); live_relay_unknown_command(cmd);