Generate local kernel and UST indexes
[lttng-tools.git] / src / common / consumer-stream.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _GNU_SOURCE
21 #include <assert.h>
22 #include <inttypes.h>
23 #include <sys/mman.h>
24 #include <unistd.h>
25
26 #include <common/common.h>
27 #include <common/relayd/relayd.h>
28 #include <common/ust-consumer/ust-consumer.h>
29
30 #include "consumer-stream.h"
31
32 /*
33 * RCU call to free stream. MUST only be used with call_rcu().
34 */
35 static void free_stream_rcu(struct rcu_head *head)
36 {
37 struct lttng_ht_node_u64 *node =
38 caa_container_of(head, struct lttng_ht_node_u64, head);
39 struct lttng_consumer_stream *stream =
40 caa_container_of(node, struct lttng_consumer_stream, node);
41
42 pthread_mutex_destroy(&stream->lock);
43 free(stream);
44 }
45
46 /*
47 * Close stream on the relayd side. This call can destroy a relayd if the
48 * conditions are met.
49 *
50 * A RCU read side lock MUST be acquired if the relayd object was looked up in
51 * a hash table before calling this.
52 */
53 void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
54 struct consumer_relayd_sock_pair *relayd)
55 {
56 int ret;
57
58 assert(stream);
59 assert(relayd);
60
61 if (stream->sent_to_relayd) {
62 uatomic_dec(&relayd->refcount);
63 assert(uatomic_read(&relayd->refcount) >= 0);
64 }
65
66 /* Closing streams requires to lock the control socket. */
67 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
68 ret = relayd_send_close_stream(&relayd->control_sock,
69 stream->relayd_stream_id,
70 stream->next_net_seq_num - 1);
71 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
72 if (ret < 0) {
73 DBG("Unable to close stream on the relayd. Continuing");
74 /*
75 * Continue here. There is nothing we can do for the relayd.
76 * Chances are that the relayd has closed the socket so we just
77 * continue cleaning up.
78 */
79 }
80
81 /* Both conditions are met, we destroy the relayd. */
82 if (uatomic_read(&relayd->refcount) == 0 &&
83 uatomic_read(&relayd->destroy_flag)) {
84 consumer_destroy_relayd(relayd);
85 }
86 stream->net_seq_idx = (uint64_t) -1ULL;
87 stream->sent_to_relayd = 0;
88 }
89
90 /*
91 * Close stream's file descriptors and, if needed, close stream also on the
92 * relayd side.
93 *
94 * The consumer data lock MUST be acquired.
95 * The stream lock MUST be acquired.
96 */
97 void consumer_stream_close(struct lttng_consumer_stream *stream)
98 {
99 int ret;
100 struct consumer_relayd_sock_pair *relayd;
101
102 assert(stream);
103
104 switch (consumer_data.type) {
105 case LTTNG_CONSUMER_KERNEL:
106 if (stream->mmap_base != NULL) {
107 ret = munmap(stream->mmap_base, stream->mmap_len);
108 if (ret != 0) {
109 PERROR("munmap");
110 }
111 }
112
113 if (stream->wait_fd >= 0) {
114 ret = close(stream->wait_fd);
115 if (ret) {
116 PERROR("close");
117 }
118 stream->wait_fd = -1;
119 }
120 break;
121 case LTTNG_CONSUMER32_UST:
122 case LTTNG_CONSUMER64_UST:
123 break;
124 default:
125 ERR("Unknown consumer_data type");
126 assert(0);
127 }
128
129 /* Close output fd. Could be a socket or local file at this point. */
130 if (stream->out_fd >= 0) {
131 ret = close(stream->out_fd);
132 if (ret) {
133 PERROR("close");
134 }
135 stream->out_fd = -1;
136 }
137
138 if (stream->index_fd >= 0) {
139 ret = close(stream->index_fd);
140 if (ret) {
141 PERROR("close stream index_fd");
142 }
143 stream->index_fd = -1;
144 }
145
146 /* Check and cleanup relayd if needed. */
147 rcu_read_lock();
148 relayd = consumer_find_relayd(stream->net_seq_idx);
149 if (relayd != NULL) {
150 consumer_stream_relayd_close(stream, relayd);
151 }
152 rcu_read_unlock();
153 }
154
155 /*
156 * Delete the stream from all possible hash tables.
157 *
158 * The consumer data lock MUST be acquired.
159 * The stream lock MUST be acquired.
160 */
161 void consumer_stream_delete(struct lttng_consumer_stream *stream,
162 struct lttng_ht *ht)
163 {
164 int ret;
165 struct lttng_ht_iter iter;
166
167 assert(stream);
168 /* Should NEVER be called not in monitor mode. */
169 assert(stream->chan->monitor);
170
171 rcu_read_lock();
172
173 if (ht) {
174 iter.iter.node = &stream->node.node;
175 ret = lttng_ht_del(ht, &iter);
176 assert(!ret);
177 }
178
179 /* Delete from stream per channel ID hash table. */
180 iter.iter.node = &stream->node_channel_id.node;
181 /*
182 * The returned value is of no importance. Even if the node is NOT in the
183 * hash table, we continue since we may have been called by a code path
184 * that did not add the stream to a (all) hash table. Same goes for the
185 * next call ht del call.
186 */
187 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
188
189 /* Delete from the global stream list. */
190 iter.iter.node = &stream->node_session_id.node;
191 /* See the previous ht del on why we ignore the returned value. */
192 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
193
194 rcu_read_unlock();
195
196 /* Decrement the stream count of the global consumer data. */
197 assert(consumer_data.stream_count > 0);
198 consumer_data.stream_count--;
199 }
200
201 /*
202 * Free the given stream within a RCU call.
203 */
204 void consumer_stream_free(struct lttng_consumer_stream *stream)
205 {
206 assert(stream);
207
208 call_rcu(&stream->node.head, free_stream_rcu);
209 }
210
211 /*
212 * Destroy the stream's buffers of the tracer.
213 */
214 void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
215 {
216 assert(stream);
217
218 switch (consumer_data.type) {
219 case LTTNG_CONSUMER_KERNEL:
220 break;
221 case LTTNG_CONSUMER32_UST:
222 case LTTNG_CONSUMER64_UST:
223 lttng_ustconsumer_del_stream(stream);
224 break;
225 default:
226 ERR("Unknown consumer_data type");
227 assert(0);
228 }
229 }
230
231 /*
232 * Destroy and close a already created stream.
233 */
234 static void destroy_close_stream(struct lttng_consumer_stream *stream)
235 {
236 assert(stream);
237
238 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
239
240 /* Destroy tracer buffers of the stream. */
241 consumer_stream_destroy_buffers(stream);
242 /* Close down everything including the relayd if one. */
243 consumer_stream_close(stream);
244 }
245
246 /*
247 * Decrement the stream's channel refcount and if down to 0, return the channel
248 * pointer so it can be destroyed by the caller or NULL if not.
249 */
250 static struct lttng_consumer_channel *unref_channel(
251 struct lttng_consumer_stream *stream)
252 {
253 struct lttng_consumer_channel *free_chan = NULL;
254
255 assert(stream);
256 assert(stream->chan);
257
258 /* Update refcount of channel and see if we need to destroy it. */
259 if (!uatomic_sub_return(&stream->chan->refcount, 1)
260 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
261 free_chan = stream->chan;
262 }
263
264 return free_chan;
265 }
266
267 /*
268 * Destroy a stream completely. This will delete, close and free the stream.
269 * Once return, the stream is NO longer usable. Its channel may get destroyed
270 * if conditions are met for a monitored stream.
271 *
272 * This MUST be called WITHOUT the consumer data and stream lock acquired if
273 * the stream is in _monitor_ mode else it does not matter.
274 */
275 void consumer_stream_destroy(struct lttng_consumer_stream *stream,
276 struct lttng_ht *ht)
277 {
278 assert(stream);
279
280 /* Stream is in monitor mode. */
281 if (stream->monitor) {
282 struct lttng_consumer_channel *free_chan = NULL;
283
284 /*
285 * This means that the stream was successfully removed from the streams
286 * list of the channel and sent to the right thread managing this
287 * stream thus being globally visible.
288 */
289 if (stream->globally_visible) {
290 pthread_mutex_lock(&consumer_data.lock);
291 pthread_mutex_lock(&stream->chan->lock);
292 pthread_mutex_lock(&stream->lock);
293 /* Remove every reference of the stream in the consumer. */
294 consumer_stream_delete(stream, ht);
295
296 destroy_close_stream(stream);
297
298 /* Update channel's refcount of the stream. */
299 free_chan = unref_channel(stream);
300
301 /* Indicates that the consumer data state MUST be updated after this. */
302 consumer_data.need_update = 1;
303
304 pthread_mutex_unlock(&stream->lock);
305 pthread_mutex_unlock(&stream->chan->lock);
306 pthread_mutex_unlock(&consumer_data.lock);
307 } else {
308 /*
309 * If the stream is not visible globally, this needs to be done
310 * outside of the consumer data lock section.
311 */
312 free_chan = unref_channel(stream);
313 }
314
315 if (free_chan) {
316 consumer_del_channel(free_chan);
317 }
318 } else {
319 destroy_close_stream(stream);
320 }
321
322 /* Free stream within a RCU call. */
323 consumer_stream_free(stream);
324 }
This page took 0.035113 seconds and 4 git commands to generate.