X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fconnection.c;h=fce6c84d58c922f5d26491c62160ba49ab2577ef;hp=76e48a6ab7762a11b1ead8a0fb09c854e3ceb1e5;hb=7591bab11eceedc6a0d1e02fd6f85592267a63b5;hpb=d77dded285b058e4242c8a3d2233f80e725ceefc diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index 76e48a6ab..fce6c84d5 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -19,91 +20,129 @@ #define _GNU_SOURCE #define _LGPL_SOURCE #include +#include #include "connection.h" #include "stream.h" +#include "viewer-session.h" -static void rcu_free_connection(struct rcu_head *head) +bool connection_get(struct relay_connection *conn) { - struct relay_connection *conn = - caa_container_of(head, struct relay_connection, rcu_node); + bool has_ref = false; - lttcomm_destroy_sock(conn->sock); - connection_free(conn); + pthread_mutex_lock(&conn->reflock); + if (conn->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&conn->ref); + } + pthread_mutex_unlock(&conn->reflock); + + return has_ref; } -/* - * Must be called with a read side lock held. The read side lock must be - * kept until the returned relay_connection is no longer in use. - */ -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock) +struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht, + int sock) { struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct relay_connection *conn = NULL; - assert(ht); assert(sock >= 0); - lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter); + rcu_read_lock(); + lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock), + &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (!node) { DBG2("Relay connection by sock %d not found", sock); goto end; } conn = caa_container_of(node, struct relay_connection, sock_n); - + if (!connection_get(conn)) { + conn = NULL; + } end: + rcu_read_unlock(); return conn; } -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn) +struct relay_connection *connection_create(struct lttcomm_sock *sock, + enum connection_type type) { - int ret; - struct lttng_ht_iter iter; - - assert(ht); - assert(conn); + struct relay_connection *conn; - iter.iter.node = &conn->sock_n.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); + conn = zmalloc(sizeof(*conn)); + if (!conn) { + PERROR("zmalloc relay connection"); + goto end; + } + pthread_mutex_init(&conn->reflock, NULL); + urcu_ref_init(&conn->ref); + conn->type = type; + conn->sock = sock; + lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); +end: + return conn; } -void connection_destroy(struct relay_connection *conn) +static void rcu_free_connection(struct rcu_head *head) { - assert(conn); + struct relay_connection *conn = + caa_container_of(head, struct relay_connection, rcu_node); + lttcomm_destroy_sock(conn->sock); + if (conn->viewer_session) { + viewer_session_destroy(conn->viewer_session); + conn->viewer_session = NULL; + } + free(conn); +} + +static void destroy_connection(struct relay_connection *conn) +{ call_rcu(&conn->rcu_node, rcu_free_connection); } -struct relay_connection *connection_create(void) +static void connection_release(struct urcu_ref *ref) { - struct relay_connection *conn; + struct relay_connection *conn = + caa_container_of(ref, struct relay_connection, ref); - conn = zmalloc(sizeof(*conn)); - if (!conn) { - PERROR("zmalloc relay connection"); - goto error; + if (conn->in_socket_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &conn->sock_n.node; + ret = lttng_ht_del(conn->socket_ht, &iter); + assert(!ret); } -error: - return conn; + if (conn->session) { + if (session_close(conn->session)) { + ERR("session_close"); + } + conn->session = NULL; + } + if (conn->viewer_session) { + viewer_session_close(conn->viewer_session); + } + destroy_connection(conn); } -void connection_init(struct relay_connection *conn) +void connection_put(struct relay_connection *conn) { - assert(conn); - assert(conn->sock); - - CDS_INIT_LIST_HEAD(&conn->recv_head); - lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); + rcu_read_lock(); + pthread_mutex_lock(&conn->reflock); + urcu_ref_put(&conn->ref, connection_release); + pthread_mutex_unlock(&conn->reflock); + rcu_read_unlock(); } -void connection_free(struct relay_connection *conn) +void connection_ht_add(struct lttng_ht *relay_connections_ht, + struct relay_connection *conn) { - assert(conn); - - free(conn->viewer_session); - free(conn); + assert(!conn->in_socket_ht); + lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n); + conn->in_socket_ht = 1; + conn->socket_ht = relay_connections_ht; }