Fix: big relayd cleanup and refactor
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
... / ...
CommitLineData
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <common/common.h>
21
22#include "index.h"
23#include "stream.h"
24#include "viewer-stream.h"
25
26static void rcu_destroy_stream(struct rcu_head *head)
27{
28 struct relay_stream *stream =
29 caa_container_of(head, struct relay_stream, rcu_node);
30
31 free(stream->path_name);
32 free(stream->channel_name);
33 free(stream);
34}
35
36/*
37 * Get stream from stream id from the given hash table. Return stream if found
38 * else NULL.
39 *
40 * Need to be called with RCU read-side lock held.
41 */
42struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
43 uint64_t stream_id)
44{
45 struct lttng_ht_node_u64 *node;
46 struct lttng_ht_iter iter;
47 struct relay_stream *stream = NULL;
48
49 assert(ht);
50
51 lttng_ht_lookup(ht, &stream_id, &iter);
52 node = lttng_ht_iter_get_node_u64(&iter);
53 if (node == NULL) {
54 DBG("Relay stream %" PRIu64 " not found", stream_id);
55 goto end;
56 }
57 stream = caa_container_of(node, struct relay_stream, node);
58
59end:
60 return stream;
61}
62
63/*
64 * Close a given stream. If an assosiated viewer stream exists it is updated.
65 *
66 * RCU read side lock MUST be acquired.
67 *
68 * Return 0 if close was successful or 1 if already closed.
69 */
70int stream_close(struct relay_session *session, struct relay_stream *stream)
71{
72 int delret, ret;
73 struct relay_viewer_stream *vstream;
74 struct ctf_trace *ctf_trace;
75
76 assert(stream);
77
78 pthread_mutex_lock(&stream->lock);
79
80 if (stream->terminated_flag) {
81 /* This stream is already closed. Ignore. */
82 ret = 1;
83 goto end_unlock;
84 }
85
86 DBG("Closing stream id %" PRIu64, stream->stream_handle);
87
88 if (stream->fd >= 0) {
89 delret = close(stream->fd);
90 if (delret < 0) {
91 PERROR("close stream");
92 }
93 }
94
95 if (stream->index_fd >= 0) {
96 delret = close(stream->index_fd);
97 if (delret < 0) {
98 PERROR("close stream index_fd");
99 }
100 }
101
102 vstream = viewer_stream_find_by_id(stream->stream_handle);
103 if (vstream) {
104 /*
105 * Set the last good value into the viewer stream. This is done
106 * right before the stream gets deleted from the hash table. The
107 * lookup failure on the live thread side of a stream indicates
108 * that the viewer stream index received value should be used.
109 */
110 pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
111 vstream->total_index_received = stream->total_index_received;
112 vstream->tracefile_count_last = stream->tracefile_count_current;
113 vstream->close_write_flag = 1;
114 pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
115 }
116
117 /* Cleanup index of that stream. */
118 relay_index_destroy_by_stream_id(stream->stream_handle);
119
120 ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
121 stream->path_name);
122 assert(ctf_trace);
123 ctf_trace_put_ref(ctf_trace);
124
125 stream->terminated_flag = 1;
126 ret = 0;
127
128end_unlock:
129 pthread_mutex_unlock(&stream->lock);
130 return ret;
131}
132
133void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
134{
135 int ret;
136 struct lttng_ht_iter iter;
137
138 assert(ht);
139 assert(stream);
140
141 iter.iter.node = &stream->node.node;
142 ret = lttng_ht_del(ht, &iter);
143 assert(!ret);
144
145 cds_list_del(&stream->trace_list);
146}
147
148void stream_destroy(struct relay_stream *stream)
149{
150 assert(stream);
151
152 call_rcu(&stream->rcu_node, rcu_destroy_stream);
153}
This page took 0.02331 seconds and 4 git commands to generate.