X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=5b9e4248c0e779deadc7b604ace4d928f7c9634f;hb=6763619c3f295a1502e77ca9563d7a21d68b25c7;hp=b3b7ee427268ff6d24169a037b44eab134eddef9;hpb=c4e361a4de07eef209c85d79e734cfb0f94e006d;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index b3b7ee427..5b9e4248c 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -809,6 +809,32 @@ end: return ret; } +/* + * Check if a connection is attached to a session. + * Return 1 if attached, 0 if not attached, a negative value on error. + */ +static +int session_attached(struct relay_connection *conn, uint64_t session_id) +{ + struct relay_session *session; + int found = 0; + + if (!conn->viewer_session) { + goto end; + } + cds_list_for_each_entry(session, + &conn->viewer_session->sessions_head, + viewer_session_list) { + if (session->id == session_id) { + found = 1; + goto end; + } + } + +end: + return found; +} + /* * Send the viewer the list of current sessions. */ @@ -820,6 +846,7 @@ int viewer_get_new_streams(struct relay_connection *conn) struct lttng_viewer_new_streams_request request; struct lttng_viewer_new_streams_response response; struct relay_session *session; + uint64_t session_id; assert(conn); @@ -832,29 +859,27 @@ int viewer_get_new_streams(struct relay_connection *conn) if (ret < 0) { goto error; } + session_id = be64toh(request.session_id); health_code_update(); rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, - be64toh(request.session_id)); + session = session_find_by_id(conn->sessions_ht, session_id); if (!session) { - DBG("Relay session %" PRIu64 " not found", - be64toh(request.session_id)); + DBG("Relay session %" PRIu64 " not found", session_id); response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } - if (conn->session_id == session->id) { - /* We confirmed the viewer is asking for the same session. */ - send_streams = 1; - response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); - } else { + if (!session_attached(conn, session_id)) { send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } + send_streams = 1; + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); + if (!send_streams) { goto send_reply; } @@ -868,6 +893,23 @@ int viewer_get_new_streams(struct relay_connection *conn) nb_streams = nb_created + nb_unsent; response.streams_count = htobe32(nb_streams); + /* + * If the session is closed and we have no new streams to send, + * it means that the viewer has already received the whole trace + * for this session and should now close it. + */ + if (nb_streams == 0 && session->close_flag) { + send_streams = 0; + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); + /* + * Remove the session from the attached list of the connection + * and try to destroy it. + */ + cds_list_del(&session->viewer_session_list); + session_viewer_try_destroy(conn->sessions_ht, session); + goto send_reply; + } + send_reply: health_code_update(); ret = send_response(conn->sock, &response, sizeof(response)); @@ -926,6 +968,12 @@ int viewer_attach_session(struct relay_connection *conn) health_code_update(); + if (!conn->viewer_session) { + DBG("Client trying to attach before creating a live viewer session"); + response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION); + goto send_reply; + } + rcu_read_lock(); session = session_find_by_id(conn->sessions_ht, be64toh(request.session_id)); @@ -950,8 +998,8 @@ int viewer_attach_session(struct relay_connection *conn) } else { send_streams = 1; response.status = htobe32(LTTNG_VIEWER_ATTACH_OK); - conn->session_id = session->id; - conn->session = session; + cds_list_add(&session->viewer_session_list, + &conn->viewer_session->sessions_head); } switch (be32toh(request.seek)) { @@ -1035,14 +1083,14 @@ int viewer_get_next_index(struct relay_connection *conn) health_code_update(); rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, conn->session_id); - if (!session) { + vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id)); + if (!vstream) { ret = -1; goto end_unlock; } - vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id)); - if (!vstream) { + session = session_find_by_id(conn->sessions_ht, vstream->session_id); + if (!session) { ret = -1; goto end_unlock; } @@ -1236,6 +1284,7 @@ int viewer_get_packet(struct relay_connection *conn) struct lttng_viewer_get_packet get_packet_info; struct lttng_viewer_trace_packet reply; struct relay_viewer_stream *stream; + struct relay_session *session; struct ctf_trace *ctf_trace; assert(conn); @@ -1259,7 +1308,13 @@ int viewer_get_packet(struct relay_connection *conn) goto error; } - ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht, + session = session_find_by_id(conn->sessions_ht, stream->session_id); + if (!session) { + ret = -1; + goto error; + } + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); assert(ctf_trace); @@ -1397,6 +1452,7 @@ int viewer_get_metadata(struct relay_connection *conn) struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *stream; struct ctf_trace *ctf_trace; + struct relay_session *session; assert(conn); @@ -1417,7 +1473,13 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht, + session = session_find_by_id(conn->sessions_ht, stream->session_id); + if (!session) { + ret = -1; + goto error; + } + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); assert(ctf_trace); assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received); @@ -1491,6 +1553,42 @@ end: return ret; } +/* + * Create a viewer session. + * + * Return 0 on success or else a negative value. + */ +static +int viewer_create_session(struct relay_connection *conn) +{ + int ret; + struct lttng_viewer_create_session_response resp; + + DBG("Viewer create session received"); + + resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); + conn->viewer_session = zmalloc(sizeof(conn->viewer_session)); + if (!conn->viewer_session) { + ERR("Allocation viewer session"); + resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR); + goto send_reply; + } + CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head); + +send_reply: + health_code_update(); + ret = send_response(conn->sock, &resp, sizeof(resp)); + if (ret < 0) { + goto end; + } + health_code_update(); + ret = 0; + +end: + return ret; +} + + /* * live_relay_unknown_command: send -1 if received unknown command */ @@ -1550,6 +1648,9 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, case LTTNG_VIEWER_GET_NEW_STREAMS: ret = viewer_get_new_streams(conn); break; + case LTTNG_VIEWER_CREATE_SESSION: + ret = viewer_create_session(conn); + break; default: ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); live_relay_unknown_command(conn); @@ -1634,28 +1735,34 @@ static void try_destroy_streams(struct relay_session *session) static void destroy_connection(struct lttng_ht *relay_connections_ht, struct relay_connection *conn) { - struct relay_session *session; + struct relay_session *session, *tmp_session; assert(relay_connections_ht); assert(conn); - DBG("Cleaning connection of session ID %" PRIu64, conn->session_id); - connection_delete(relay_connections_ht, conn); + if (!conn->viewer_session) { + goto end; + } + rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, conn->session_id); - if (session) { + cds_list_for_each_entry_safe(session, tmp_session, + &conn->viewer_session->sessions_head, + viewer_session_list) { + DBG("Cleaning connection of session ID %" PRIu64, session->id); /* * Very important that this is done before destroying the session so we * can put back every viewer stream reference from the ctf_trace. */ destroy_viewer_streams_by_session(session); try_destroy_streams(session); + cds_list_del(&session->viewer_session_list); session_viewer_try_destroy(conn->sessions_ht, session); } rcu_read_unlock(); +end: connection_destroy(conn); }