From: Jonathan Rajotte Date: Sat, 28 Apr 2018 00:06:08 +0000 (-0400) Subject: Fix: relayd streams can be leaked on connection error X-Git-Tag: v2.11.0-rc1~254 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=fd0f1e3ef290b9d70e3856676298df5afb78d8dc Fix: relayd streams can be leaked on connection error There are cases where a connection error can cause streams to be leaked. For instance, the control connection could receive an index and close. Since a packet is in-flight, the stream corresponding to that index will not close. However, nothing guarantees that the data connection will be able to receive the packet's data. If the protocol is respected, this is not a problem. However, a buggy consumerd or network errors can cause the streams to remain in the "data in-flight" state and never close. To mitigate a case observed in the field where a consumerd would be forcibly closed (network interface brought down) and cause leaks on the relay daemon, the session is aborted whenever the control or data connection encounters an error. Aborting a session causes the streams to be closed regardless of the fact that data is in-flight. Currently, only the control connection holds an ownership of the session object. This can cause the following scenario to leak streams: 1) Control connection receives an index - Stream is put in "in-flight data" mode 2) Control connection is closed/shutdown cleanly - try_stream_close refuses to close the stream as data is in-flight, but it puts the stream in "closed" mode. When the data is received, the stream will be closed as soon as possible. 3) Data connection closes cleanly or due to an error - The stream "closing" condition will never be re-evaluated. Since the data connection has no ownership of the session, it can never clean-up the streams that are waiting for "in-flight" data to arrive before closing. This patch lazily associates the data connection to its session so that the session can be aborted whenever an error happens on either the data or control connection. Note that this leaves the relayd vulnerable to a case which will still leak. If the control connection receives an index and closes cleanly, the data connection could have never been established with the consumer daemon and result in a leak. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index dab794147..eeaa0a559 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -176,3 +176,27 @@ void connection_ht_add(struct lttng_ht *relay_connections_ht, conn->in_socket_ht = 1; conn->socket_ht = relay_connections_ht; } + +int connection_set_session(struct relay_connection *conn, + struct relay_session *session) +{ + int ret = 0; + + assert(conn); + assert(session); + assert(!conn->session); + + if (connection_get(conn)) { + if (session_get(session)) { + conn->session = session; + } else { + ERR("Failed to get session reference in connection_set_session()"); + ret = -1; + } + connection_put(conn); + } else { + ERR("Failed to get connection reference in connection_set_session()"); + ret = -1; + } + return ret; +} diff --git a/src/bin/lttng-relayd/connection.h b/src/bin/lttng-relayd/connection.h index 444195dff..1b6744b6c 100644 --- a/src/bin/lttng-relayd/connection.h +++ b/src/bin/lttng-relayd/connection.h @@ -148,5 +148,7 @@ bool connection_get(struct relay_connection *connection); void connection_put(struct relay_connection *connection); void connection_ht_add(struct lttng_ht *relay_connections_ht, struct relay_connection *conn); +int connection_set_session(struct relay_connection *conn, + struct relay_session *session); #endif /* _CONNECTION_H */ diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 569a3a470..cb4643b76 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -3344,10 +3344,14 @@ static enum relay_connection_status relay_process_data_receive_payload( uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", + state->header.stream_id, state->header.net_seq_num, + state->received, left_to_receive); + stream = stream_get_by_id(state->header.stream_id); if (!stream) { /* Protocol error. */ - DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64, + ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64, state->header.stream_id); status = RELAY_CONNECTION_STATUS_ERROR; goto end; @@ -3355,10 +3359,13 @@ static enum relay_connection_status relay_process_data_receive_payload( pthread_mutex_lock(&stream->lock); session = stream->trace->session; - - DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", - state->header.stream_id, state->header.net_seq_num, - state->received, left_to_receive); + if (!conn->session) { + ret = connection_set_session(conn, session); + if (ret) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + } /* * The size of the "chunk" received on any iteration is bounded by: @@ -3686,6 +3693,26 @@ restart: status = relay_process_control(ctrl_conn); if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the control connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(ctrl_conn->session); + } + /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, @@ -3765,6 +3792,25 @@ restart: status = relay_process_data(data_conn); /* Connection closed or error. */ if (status != RELAY_CONNECTION_STATUS_OK) { + /* + * On socket error flag the session as aborted to force + * the cleanup of its stream otherwise it can leak + * during the lifetime of the relayd. + * + * This prevents situations in which streams can be + * left opened because an index was received, the + * control connection is closed, and the data + * connection is closed (uncleanly) before the packet's + * data provided. + * + * Since the data connection encountered an error, + * it is okay to be conservative and close the + * session right now as we can't rely on the protocol + * being respected anymore. + */ + if (status == RELAY_CONNECTION_STATUS_ERROR) { + session_abort(data_conn->session); + } relay_thread_close_connection(&events, pollfd, data_conn); /* @@ -3803,9 +3849,7 @@ error: sock_n.node) { health_code_update(); - if (session_abort(destroy_conn->session)) { - assert(0); - } + session_abort(destroy_conn->session); /* * No need to grab another ref, because we own diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c index 3ea8e50d6..42c29aeb0 100644 --- a/src/bin/lttng-relayd/session.c +++ b/src/bin/lttng-relayd/session.c @@ -182,16 +182,8 @@ int session_close(struct relay_session *session) pthread_mutex_lock(&session->lock); DBG("closing session %" PRIu64 ": is conn already closed %d", session->id, session->connection_closed); - if (session->connection_closed) { - ret = -1; - goto unlock; - } session->connection_closed = true; -unlock: pthread_mutex_unlock(&session->lock); - if (ret) { - return ret; - } rcu_read_lock(); cds_lfht_for_each_entry(session->ctf_traces_ht->ht, @@ -226,13 +218,7 @@ int session_abort(struct relay_session *session) pthread_mutex_lock(&session->lock); DBG("aborting session %" PRIu64, session->id); - if (session->aborted) { - ERR("session %" PRIu64 " is already aborted", session->id); - ret = -1; - goto unlock; - } session->aborted = true; -unlock: pthread_mutex_unlock(&session->lock); return ret; }