-/*
- * Create and add connection to the given hash table.
- *
- * Return poll add value or else -1 on error.
- */
-static
-int add_connection(int fd, struct lttng_poll_event *events,
- struct lttng_ht *relay_connections_ht)
-{
- int ret;
- struct relay_command *relay_connection;
-
- assert(events);
- assert(relay_connections_ht);
-
- relay_connection = zmalloc(sizeof(struct relay_command));
- if (relay_connection == NULL) {
- PERROR("Relay command zmalloc");
- goto error;
- }
-
- ret = lttng_read(fd, relay_connection, sizeof(*relay_connection));
- if (ret < sizeof(*relay_connection)) {
- PERROR("read relay cmd pipe");
- goto error_read;
- }
-
- lttng_ht_node_init_ulong(&relay_connection->sock_n,
- (unsigned long) relay_connection->sock->fd);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relay_connections_ht,
- &relay_connection->sock_n);
- rcu_read_unlock();
-
- return lttng_poll_add(events, relay_connection->sock->fd,
- LPOLLIN | LPOLLRDHUP);
-
-error_read:
- free(relay_connection);
-error:
- return -1;
-}
-
-static
-void deferred_free_connection(struct rcu_head *head)
-{
- struct relay_command *relay_connection =
- caa_container_of(head, struct relay_command, rcu_node);
-
- if (relay_connection->session &&
- relay_connection->session->viewer_attached > 0) {
- relay_connection->session->viewer_attached--;
- }
- lttcomm_destroy_sock(relay_connection->sock);
- free(relay_connection);
-}
-
-/*
- * Delete all streams for a specific session ID.
- */
-static
-void viewer_del_streams(uint64_t session_id)
-{
- struct relay_viewer_stream *stream;
- struct lttng_ht_iter iter;
-
- rcu_read_lock();
- cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- health_code_update();
-
- if (stream->session_id != session_id) {
- continue;
- }
-
- viewer_stream_delete(stream);
- assert(stream->ctf_trace);
-
- if (stream->metadata_flag) {
- /*
- * The metadata viewer stream is destroyed once the refcount on the
- * ctf trace goes to 0 in the destroy stream function thus there is
- * no explicit call to that function here.
- */
- stream->ctf_trace->metadata_sent = 0;
- stream->ctf_trace->viewer_metadata_stream = NULL;
- } else {
- viewer_stream_destroy(stream);
- }
- }
- rcu_read_unlock();
-}
-
-/*
- * Delete and free a connection.
- *
- * RCU read side lock MUST be acquired.
- */
-static
-void del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht_iter *iter, struct relay_command *relay_connection)
-{
- int ret;
-
- assert(relay_connections_ht);
- assert(iter);
- assert(relay_connection);
-
- DBG("Cleaning connection of session ID %" PRIu64,
- relay_connection->session_id);
-
- ret = lttng_ht_del(relay_connections_ht, iter);
- assert(!ret);
-
- viewer_del_streams(relay_connection->session_id);
-
- call_rcu(&relay_connection->rcu_node, deferred_free_connection);
-}
-