There are cases where a connection error can cause streams to be
leaked.
For instance, the control connection could receive an index and
close. Since a packet is in-flight, the stream corresponding to
that index will not close. However, nothing guarantees that
the data connection will be able to receive the packet's data.
If the protocol is respected, this is not a problem. However,
a buggy consumerd or network errors can cause the streams to
remain in the "data in-flight" state and never close.
To mitigate a case observed in the field where a consumerd
would be forcibly closed (network interface brought down) and
cause leaks on the relay daemon, the session is aborted whenever
the control or data connection encounters an error. Aborting
a session causes the streams to be closed regardless of the
fact that data is in-flight.
Currently, only the control connection holds an ownership of
the session object. This can cause the following scenario to leak
streams:
1) Control connection receives an index
- Stream is put in "in-flight data" mode
2) Control connection is closed/shutdown cleanly
- try_stream_close refuses to close the stream as data is in-flight,
but it puts the stream in "closed" mode. When the data is
received, the stream will be closed as soon as possible.
3) Data connection closes cleanly or due to an error
- The stream "closing" condition will never be re-evaluated.
Since the data connection has no ownership of the session, it can
never clean-up the streams that are waiting for "in-flight" data to
arrive before closing.
This patch lazily associates the data connection to its session
so that the session can be aborted whenever an error happens on
either the data or control connection.
Note that this leaves the relayd vulnerable to a case which will
still leak. If the control connection receives an index and closes
cleanly, the data connection could have never been established
with the consumer daemon and result in a leak.
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
conn->in_socket_ht = 1;
conn->socket_ht = relay_connections_ht;
}
conn->in_socket_ht = 1;
conn->socket_ht = relay_connections_ht;
}
+
+int connection_set_session(struct relay_connection *conn,
+ struct relay_session *session)
+{
+ int ret = 0;
+
+ assert(conn);
+ assert(session);
+ assert(!conn->session);
+
+ if (connection_get(conn)) {
+ if (session_get(session)) {
+ conn->session = session;
+ } else {
+ ERR("Failed to get session reference in connection_set_session()");
+ ret = -1;
+ }
+ connection_put(conn);
+ } else {
+ ERR("Failed to get connection reference in connection_set_session()");
+ ret = -1;
+ }
+ return ret;
+}
void connection_put(struct relay_connection *connection);
void connection_ht_add(struct lttng_ht *relay_connections_ht,
struct relay_connection *conn);
void connection_put(struct relay_connection *connection);
void connection_ht_add(struct lttng_ht *relay_connections_ht,
struct relay_connection *conn);
+int connection_set_session(struct relay_connection *conn,
+ struct relay_session *session);
#endif /* _CONNECTION_H */
#endif /* _CONNECTION_H */
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+ state->header.stream_id, state->header.net_seq_num,
+ state->received, left_to_receive);
+
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
/* Protocol error. */
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
/* Protocol error. */
- DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
+ ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
state->header.stream_id);
status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
state->header.stream_id);
status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
pthread_mutex_lock(&stream->lock);
session = stream->trace->session;
pthread_mutex_lock(&stream->lock);
session = stream->trace->session;
-
- DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
- state->header.stream_id, state->header.net_seq_num,
- state->received, left_to_receive);
+ if (!conn->session) {
+ ret = connection_set_session(conn, session);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+ }
/*
* The size of the "chunk" received on any iteration is bounded by:
/*
* The size of the "chunk" received on any iteration is bounded by:
status = relay_process_control(ctrl_conn);
if (status != RELAY_CONNECTION_STATUS_OK) {
status = relay_process_control(ctrl_conn);
if (status != RELAY_CONNECTION_STATUS_OK) {
+ /*
+ * On socket error flag the session as aborted to force
+ * the cleanup of its stream otherwise it can leak
+ * during the lifetime of the relayd.
+ *
+ * This prevents situations in which streams can be
+ * left opened because an index was received, the
+ * control connection is closed, and the data
+ * connection is closed (uncleanly) before the packet's
+ * data provided.
+ *
+ * Since the control connection encountered an error,
+ * it is okay to be conservative and close the
+ * session right now as we can't rely on the protocol
+ * being respected anymore.
+ */
+ if (status == RELAY_CONNECTION_STATUS_ERROR) {
+ session_abort(ctrl_conn->session);
+ }
+
/* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
/* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
status = relay_process_data(data_conn);
/* Connection closed or error. */
if (status != RELAY_CONNECTION_STATUS_OK) {
status = relay_process_data(data_conn);
/* Connection closed or error. */
if (status != RELAY_CONNECTION_STATUS_OK) {
+ /*
+ * On socket error flag the session as aborted to force
+ * the cleanup of its stream otherwise it can leak
+ * during the lifetime of the relayd.
+ *
+ * This prevents situations in which streams can be
+ * left opened because an index was received, the
+ * control connection is closed, and the data
+ * connection is closed (uncleanly) before the packet's
+ * data provided.
+ *
+ * Since the data connection encountered an error,
+ * it is okay to be conservative and close the
+ * session right now as we can't rely on the protocol
+ * being respected anymore.
+ */
+ if (status == RELAY_CONNECTION_STATUS_ERROR) {
+ session_abort(data_conn->session);
+ }
relay_thread_close_connection(&events, pollfd,
data_conn);
/*
relay_thread_close_connection(&events, pollfd,
data_conn);
/*
sock_n.node) {
health_code_update();
sock_n.node) {
health_code_update();
- if (session_abort(destroy_conn->session)) {
- assert(0);
- }
+ session_abort(destroy_conn->session);
/*
* No need to grab another ref, because we own
/*
* No need to grab another ref, because we own
pthread_mutex_lock(&session->lock);
DBG("closing session %" PRIu64 ": is conn already closed %d",
session->id, session->connection_closed);
pthread_mutex_lock(&session->lock);
DBG("closing session %" PRIu64 ": is conn already closed %d",
session->id, session->connection_closed);
- if (session->connection_closed) {
- ret = -1;
- goto unlock;
- }
session->connection_closed = true;
session->connection_closed = true;
pthread_mutex_unlock(&session->lock);
pthread_mutex_unlock(&session->lock);
- if (ret) {
- return ret;
- }
rcu_read_lock();
cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
rcu_read_lock();
cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
pthread_mutex_lock(&session->lock);
DBG("aborting session %" PRIu64, session->id);
pthread_mutex_lock(&session->lock);
DBG("aborting session %" PRIu64, session->id);
- if (session->aborted) {
- ERR("session %" PRIu64 " is already aborted", session->id);
- ret = -1;
- goto unlock;
- }
pthread_mutex_unlock(&session->lock);
return ret;
}
pthread_mutex_unlock(&session->lock);
return ret;
}