X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fconnection.c;h=d657efb4f97eb5ec8fc75260505b6f40ae8f2eec;hp=4dc41e0e4fe8fe7e693257cb5b00c8748ee61ab3;hb=3a5328d20432d9782b24ddeaaa093ac6219d6e29;hpb=58eb9381cd933f0644e6a5b89750dbd6d9b14570 diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index 4dc41e0e4..d657efb4f 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -1,111 +1,196 @@ /* - * Copyright (C) 2013 - Julien Desfossez - * David Goulet + * Copyright (C) 2013 Julien Desfossez + * Copyright (C) 2013 David Goulet + * Copyright (C) 2015 Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include +#include #include "connection.h" #include "stream.h" +#include "viewer-session.h" -static void rcu_free_connection(struct rcu_head *head) +bool connection_get(struct relay_connection *conn) { - struct relay_connection *conn = - caa_container_of(head, struct relay_connection, rcu_node); - - lttcomm_destroy_sock(conn->sock); - free(conn); + return urcu_ref_get_unless_zero(&conn->ref); } -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock) +struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht, + int sock) { struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct relay_connection *conn = NULL; - assert(ht); assert(sock >= 0); - lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter); + rcu_read_lock(); + lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock), + &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (!node) { DBG2("Relay connection by sock %d not found", sock); goto end; } conn = caa_container_of(node, struct relay_connection, sock_n); - + if (!connection_get(conn)) { + conn = NULL; + } end: + rcu_read_unlock(); return conn; } -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn) +int connection_reset_protocol_state(struct relay_connection *connection) { - int ret; - struct lttng_ht_iter iter; + int ret = 0; + + switch (connection->type) { + case RELAY_DATA: + connection->protocol.data.state_id = + DATA_CONNECTION_STATE_RECEIVE_HEADER; + memset(&connection->protocol.data.state.receive_header, + 0, + sizeof(connection->protocol.data.state.receive_header)); + connection->protocol.data.state.receive_header.left_to_receive = + sizeof(struct lttcomm_relayd_data_hdr); + break; + case RELAY_CONTROL: + connection->protocol.ctrl.state_id = + CTRL_CONNECTION_STATE_RECEIVE_HEADER; + memset(&connection->protocol.ctrl.state.receive_header, + 0, + sizeof(connection->protocol.ctrl.state.receive_header)); + connection->protocol.data.state.receive_header.left_to_receive = + sizeof(struct lttcomm_relayd_hdr); + ret = lttng_dynamic_buffer_set_size( + &connection->protocol.ctrl.reception_buffer, + sizeof(struct lttcomm_relayd_hdr)); + if (ret) { + ERR("Failed to reinitialize control connection reception buffer size to %zu bytes.", sizeof(struct lttcomm_relayd_hdr)); + goto end; + } + break; + default: + goto end; + } + DBG("Reset communication state of relay connection (fd = %i)", + connection->sock->fd); +end: + return ret; +} - assert(ht); - assert(conn); +struct relay_connection *connection_create(struct lttcomm_sock *sock, + enum connection_type type) +{ + struct relay_connection *conn; - iter.iter.node = &conn->sock_n.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); + conn = zmalloc(sizeof(*conn)); + if (!conn) { + PERROR("zmalloc relay connection"); + goto end; + } + urcu_ref_init(&conn->ref); + conn->type = type; + conn->sock = sock; + lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); + if (conn->type == RELAY_CONTROL) { + lttng_dynamic_buffer_init(&conn->protocol.ctrl.reception_buffer); + } + connection_reset_protocol_state(conn); +end: + return conn; } -void connection_destroy(struct relay_connection *conn) +static void rcu_free_connection(struct rcu_head *head) { - struct relay_stream *stream, *tmp_stream; - - assert(conn); + struct relay_connection *conn = + caa_container_of(head, struct relay_connection, rcu_node); - /* Clean up recv list of this connection if any. */ - cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head, - recv_list) { - cds_list_del(&stream->recv_list); + lttcomm_destroy_sock(conn->sock); + if (conn->viewer_session) { + viewer_session_destroy(conn->viewer_session); + conn->viewer_session = NULL; + } + if (conn->type == RELAY_CONTROL) { + lttng_dynamic_buffer_reset( + &conn->protocol.ctrl.reception_buffer); } + free(conn); +} +static void destroy_connection(struct relay_connection *conn) +{ call_rcu(&conn->rcu_node, rcu_free_connection); } -struct relay_connection *connection_create(void) +static void connection_release(struct urcu_ref *ref) { - struct relay_connection *conn; + struct relay_connection *conn = + caa_container_of(ref, struct relay_connection, ref); - conn = zmalloc(sizeof(*conn)); - if (!conn) { - PERROR("zmalloc relay connection"); - goto error; + if (conn->in_socket_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &conn->sock_n.node; + ret = lttng_ht_del(conn->socket_ht, &iter); + assert(!ret); } -error: - return conn; + if (conn->session) { + if (session_close(conn->session)) { + ERR("session_close"); + } + conn->session = NULL; + } + if (conn->viewer_session) { + viewer_session_close(conn->viewer_session); + } + destroy_connection(conn); } -void connection_init(struct relay_connection *conn) +void connection_put(struct relay_connection *conn) { - assert(conn); - assert(conn->sock); + rcu_read_lock(); + urcu_ref_put(&conn->ref, connection_release); + rcu_read_unlock(); +} - CDS_INIT_LIST_HEAD(&conn->recv_head); - lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); +void connection_ht_add(struct lttng_ht *relay_connections_ht, + struct relay_connection *conn) +{ + assert(!conn->in_socket_ht); + lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n); + conn->in_socket_ht = 1; + conn->socket_ht = relay_connections_ht; } -void connection_free(struct relay_connection *conn) +int connection_set_session(struct relay_connection *conn, + struct relay_session *session) { - assert(conn); + int ret = 0; - free(conn); + assert(conn); + assert(session); + assert(!conn->session); + + if (connection_get(conn)) { + if (session_get(session)) { + conn->session = session; + } else { + ERR("Failed to get session reference in connection_set_session()"); + ret = -1; + } + connection_put(conn); + } else { + ERR("Failed to get connection reference in connection_set_session()"); + ret = -1; + } + return ret; }