From 6763619c3f295a1502e77ca9563d7a21d68b25c7 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 30 Jan 2014 18:36:28 -0500 Subject: [PATCH] Fix: allow attach command to multiple sessions The attach session command can now attach to multiple session using an ID list. Signed-off-by: Julien Desfossez --- src/bin/lttng-relayd/connection.c | 1 + src/bin/lttng-relayd/live.c | 95 ++++++++++++++++++++++++------- 2 files changed, 74 insertions(+), 22 deletions(-) diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index 4dc41e0e4..28d0cb311 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -28,6 +28,7 @@ static void rcu_free_connection(struct rcu_head *head) caa_container_of(head, struct relay_connection, rcu_node); lttcomm_destroy_sock(conn->sock); + free(conn->viewer_session); free(conn); } diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 91d870493..5b9e4248c 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -809,6 +809,32 @@ end: return ret; } +/* + * Check if a connection is attached to a session. + * Return 1 if attached, 0 if not attached, a negative value on error. + */ +static +int session_attached(struct relay_connection *conn, uint64_t session_id) +{ + struct relay_session *session; + int found = 0; + + if (!conn->viewer_session) { + goto end; + } + cds_list_for_each_entry(session, + &conn->viewer_session->sessions_head, + viewer_session_list) { + if (session->id == session_id) { + found = 1; + goto end; + } + } + +end: + return found; +} + /* * Send the viewer the list of current sessions. */ @@ -820,6 +846,7 @@ int viewer_get_new_streams(struct relay_connection *conn) struct lttng_viewer_new_streams_request request; struct lttng_viewer_new_streams_response response; struct relay_session *session; + uint64_t session_id; assert(conn); @@ -832,29 +859,27 @@ int viewer_get_new_streams(struct relay_connection *conn) if (ret < 0) { goto error; } + session_id = be64toh(request.session_id); health_code_update(); rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, - be64toh(request.session_id)); + session = session_find_by_id(conn->sessions_ht, session_id); if (!session) { - DBG("Relay session %" PRIu64 " not found", - be64toh(request.session_id)); + DBG("Relay session %" PRIu64 " not found", session_id); response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } - if (conn->session_id == session->id) { - /* We confirmed the viewer is asking for the same session. */ - send_streams = 1; - response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); - } else { + if (!session_attached(conn, session_id)) { send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } + send_streams = 1; + response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); + if (!send_streams) { goto send_reply; } @@ -876,6 +901,12 @@ int viewer_get_new_streams(struct relay_connection *conn) if (nb_streams == 0 && session->close_flag) { send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); + /* + * Remove the session from the attached list of the connection + * and try to destroy it. + */ + cds_list_del(&session->viewer_session_list); + session_viewer_try_destroy(conn->sessions_ht, session); goto send_reply; } @@ -967,8 +998,8 @@ int viewer_attach_session(struct relay_connection *conn) } else { send_streams = 1; response.status = htobe32(LTTNG_VIEWER_ATTACH_OK); - conn->session_id = session->id; - conn->session = session; + cds_list_add(&session->viewer_session_list, + &conn->viewer_session->sessions_head); } switch (be32toh(request.seek)) { @@ -1052,14 +1083,14 @@ int viewer_get_next_index(struct relay_connection *conn) health_code_update(); rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, conn->session_id); - if (!session) { + vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id)); + if (!vstream) { ret = -1; goto end_unlock; } - vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id)); - if (!vstream) { + session = session_find_by_id(conn->sessions_ht, vstream->session_id); + if (!session) { ret = -1; goto end_unlock; } @@ -1253,6 +1284,7 @@ int viewer_get_packet(struct relay_connection *conn) struct lttng_viewer_get_packet get_packet_info; struct lttng_viewer_trace_packet reply; struct relay_viewer_stream *stream; + struct relay_session *session; struct ctf_trace *ctf_trace; assert(conn); @@ -1276,7 +1308,13 @@ int viewer_get_packet(struct relay_connection *conn) goto error; } - ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht, + session = session_find_by_id(conn->sessions_ht, stream->session_id); + if (!session) { + ret = -1; + goto error; + } + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); assert(ctf_trace); @@ -1414,6 +1452,7 @@ int viewer_get_metadata(struct relay_connection *conn) struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *stream; struct ctf_trace *ctf_trace; + struct relay_session *session; assert(conn); @@ -1434,7 +1473,13 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - ctf_trace = ctf_trace_find_by_path(conn->session->ctf_traces_ht, + session = session_find_by_id(conn->sessions_ht, stream->session_id); + if (!session) { + ret = -1; + goto error; + } + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); assert(ctf_trace); assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received); @@ -1690,28 +1735,34 @@ static void try_destroy_streams(struct relay_session *session) static void destroy_connection(struct lttng_ht *relay_connections_ht, struct relay_connection *conn) { - struct relay_session *session; + struct relay_session *session, *tmp_session; assert(relay_connections_ht); assert(conn); - DBG("Cleaning connection of session ID %" PRIu64, conn->session_id); - connection_delete(relay_connections_ht, conn); + if (!conn->viewer_session) { + goto end; + } + rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, conn->session_id); - if (session) { + cds_list_for_each_entry_safe(session, tmp_session, + &conn->viewer_session->sessions_head, + viewer_session_list) { + DBG("Cleaning connection of session ID %" PRIu64, session->id); /* * Very important that this is done before destroying the session so we * can put back every viewer stream reference from the ctf_trace. */ destroy_viewer_streams_by_session(session); try_destroy_streams(session); + cds_list_del(&session->viewer_session_list); session_viewer_try_destroy(conn->sessions_ht, session); } rcu_read_unlock(); +end: connection_destroy(conn); } -- 2.34.1