Fix: kconsumer: missing wait for metadata thread in do_sync_metadata
[lttng-tools.git] / src / common / consumer / consumer-stream.c
CommitLineData
51230d70
DG
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
6c1c0768 20#define _LGPL_SOURCE
51230d70 21#include <assert.h>
10a50311 22#include <inttypes.h>
51230d70
DG
23#include <sys/mman.h>
24#include <unistd.h>
25
26#include <common/common.h>
1c20f0e2 27#include <common/index/index.h>
94d49140 28#include <common/kernel-consumer/kernel-consumer.h>
51230d70
DG
29#include <common/relayd/relayd.h>
30#include <common/ust-consumer/ust-consumer.h>
a2361a61 31#include <common/utils.h>
6f1177cf
JG
32#include <common/consumer/consumer.h>
33#include <common/consumer/consumer-timer.h>
8d18bcae 34#include <common/consumer/metadata-bucket.h>
51230d70
DG
35
36#include "consumer-stream.h"
37
38/*
39 * RCU call to free stream. MUST only be used with call_rcu().
40 */
41static void free_stream_rcu(struct rcu_head *head)
42{
43 struct lttng_ht_node_u64 *node =
44 caa_container_of(head, struct lttng_ht_node_u64, head);
45 struct lttng_consumer_stream *stream =
46 caa_container_of(node, struct lttng_consumer_stream, node);
47
48 pthread_mutex_destroy(&stream->lock);
49 free(stream);
50}
51
6f1177cf
JG
52static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream)
53{
54 pthread_mutex_lock(&stream->chan->lock);
55 pthread_mutex_lock(&stream->lock);
56}
57
58static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream)
59{
60 pthread_mutex_unlock(&stream->lock);
61 pthread_mutex_unlock(&stream->chan->lock);
62}
63
64static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
65{
66 consumer_stream_data_lock_all(stream);
67 pthread_mutex_lock(&stream->metadata_rdv_lock);
68}
69
70static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream)
71{
72 pthread_mutex_unlock(&stream->metadata_rdv_lock);
73 consumer_stream_data_unlock_all(stream);
74}
75
76/* Only used for data streams. */
77static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
78 const struct stream_subbuffer *subbuf)
79{
80 int ret = 0;
81 uint64_t sequence_number;
ec8b7679 82 const uint64_t discarded_events = subbuf->info.data.events_discarded;
6f1177cf
JG
83
84 if (!subbuf->info.data.sequence_number.is_set) {
85 /* Command not supported by the tracer. */
86 sequence_number = -1ULL;
87 stream->sequence_number_unavailable = true;
88 } else {
89 sequence_number = subbuf->info.data.sequence_number.value;
90 }
91
92 /*
93 * Start the sequence when we extract the first packet in case we don't
94 * start at 0 (for example if a consumer is not connected to the
95 * session immediately after the beginning).
96 */
97 if (stream->last_sequence_number == -1ULL) {
98 stream->last_sequence_number = sequence_number;
99 } else if (sequence_number > stream->last_sequence_number) {
100 stream->chan->lost_packets += sequence_number -
101 stream->last_sequence_number - 1;
102 } else {
103 /* seq <= last_sequence_number */
104 ERR("Sequence number inconsistent : prev = %" PRIu64
105 ", current = %" PRIu64,
106 stream->last_sequence_number, sequence_number);
107 ret = -1;
108 goto end;
109 }
110 stream->last_sequence_number = sequence_number;
111
112 if (discarded_events < stream->last_discarded_events) {
113 /*
114 * Overflow has occurred. We assume only one wrap-around
115 * has occurred.
116 */
117 stream->chan->discarded_events +=
118 (1ULL << (CAA_BITS_PER_LONG - 1)) -
119 stream->last_discarded_events +
120 discarded_events;
121 } else {
122 stream->chan->discarded_events += discarded_events -
123 stream->last_discarded_events;
124 }
125 stream->last_discarded_events = discarded_events;
126 ret = 0;
127
128end:
129 return ret;
130}
131
132static
133void ctf_packet_index_populate(struct ctf_packet_index *index,
134 off_t offset, const struct stream_subbuffer *subbuffer)
135{
136 *index = (typeof(*index)){
137 .offset = htobe64(offset),
138 .packet_size = htobe64(subbuffer->info.data.packet_size),
139 .content_size = htobe64(subbuffer->info.data.content_size),
140 .timestamp_begin = htobe64(
141 subbuffer->info.data.timestamp_begin),
142 .timestamp_end = htobe64(
143 subbuffer->info.data.timestamp_end),
144 .events_discarded = htobe64(
145 subbuffer->info.data.events_discarded),
146 .stream_id = htobe64(subbuffer->info.data.stream_id),
147 .stream_instance_id = htobe64(
148 subbuffer->info.data.stream_instance_id.is_set ?
149 subbuffer->info.data.stream_instance_id.value : -1ULL),
150 .packet_seq_num = htobe64(
151 subbuffer->info.data.sequence_number.is_set ?
152 subbuffer->info.data.sequence_number.value : -1ULL),
153 };
154}
155
156static ssize_t consumer_stream_consume_mmap(
157 struct lttng_consumer_local_data *ctx,
158 struct lttng_consumer_stream *stream,
159 const struct stream_subbuffer *subbuffer)
160{
161 const unsigned long padding_size =
162 subbuffer->info.data.padded_subbuf_size -
163 subbuffer->info.data.subbuf_size;
164
165 return lttng_consumer_on_read_subbuffer_mmap(
8d18bcae 166 stream, &subbuffer->buffer.buffer, padding_size);
6f1177cf
JG
167}
168
169static ssize_t consumer_stream_consume_splice(
170 struct lttng_consumer_local_data *ctx,
171 struct lttng_consumer_stream *stream,
172 const struct stream_subbuffer *subbuffer)
173{
174 return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
175 subbuffer->info.data.padded_subbuf_size, 0);
176}
177
178static int consumer_stream_send_index(
179 struct lttng_consumer_stream *stream,
180 const struct stream_subbuffer *subbuffer,
181 struct lttng_consumer_local_data *ctx)
182{
183 off_t packet_offset = 0;
184 struct ctf_packet_index index = {};
185
186 /*
187 * This is called after consuming the sub-buffer; substract the
188 * effect this sub-buffer from the offset.
189 */
190 if (stream->net_seq_idx == (uint64_t) -1ULL) {
191 packet_offset = stream->out_fd_offset -
192 subbuffer->info.data.padded_subbuf_size;
193 }
194
195 ctf_packet_index_populate(&index, packet_offset, subbuffer);
196 return consumer_stream_write_index(stream, &index);
197}
198
199/*
200 * Actually do the metadata sync using the given metadata stream.
201 *
202 * Return 0 on success else a negative value. ENODATA can be returned also
203 * indicating that there is no metadata available for that stream.
204 */
205static int do_sync_metadata(struct lttng_consumer_stream *metadata,
206 struct lttng_consumer_local_data *ctx)
207{
208 int ret;
27e2fa5d 209 enum sync_metadata_status status;
6f1177cf
JG
210
211 assert(metadata);
212 assert(metadata->metadata_flag);
213 assert(ctx);
214
215 /*
216 * In UST, since we have to write the metadata from the cache packet
217 * by packet, we might need to start this procedure multiple times
218 * until all the metadata from the cache has been extracted.
219 */
220 do {
221 /*
222 * Steps :
223 * - Lock the metadata stream
224 * - Check if metadata stream node was deleted before locking.
225 * - if yes, release and return success
226 * - Check if new metadata is ready (flush + snapshot pos)
227 * - If nothing : release and return.
228 * - Lock the metadata_rdv_lock
229 * - Unlock the metadata stream
230 * - cond_wait on metadata_rdv to wait the wakeup from the
231 * metadata thread
232 * - Unlock the metadata_rdv_lock
233 */
234 pthread_mutex_lock(&metadata->lock);
235
236 /*
237 * There is a possibility that we were able to acquire a reference on the
238 * stream from the RCU hash table but between then and now, the node might
239 * have been deleted just before the lock is acquired. Thus, after locking,
240 * we make sure the metadata node has not been deleted which means that the
241 * buffers are closed.
242 *
243 * In that case, there is no need to sync the metadata hence returning a
244 * success return code.
245 */
246 ret = cds_lfht_is_node_deleted(&metadata->node.node);
247 if (ret) {
248 ret = 0;
249 goto end_unlock_mutex;
250 }
251
252 switch (ctx->type) {
253 case LTTNG_CONSUMER_KERNEL:
254 /*
255 * Empty the metadata cache and flush the current stream.
256 */
27e2fa5d 257 status = lttng_kconsumer_sync_metadata(metadata);
6f1177cf
JG
258 break;
259 case LTTNG_CONSUMER32_UST:
260 case LTTNG_CONSUMER64_UST:
261 /*
262 * Ask the sessiond if we have new metadata waiting and update the
263 * consumer metadata cache.
264 */
27e2fa5d 265 status = lttng_ustconsumer_sync_metadata(ctx, metadata);
6f1177cf
JG
266 break;
267 default:
27e2fa5d 268 abort();
6f1177cf 269 }
27e2fa5d
JG
270
271 switch (status) {
272 case SYNC_METADATA_STATUS_NEW_DATA:
273 break;
274 case SYNC_METADATA_STATUS_NO_DATA:
275 ret = 0;
6f1177cf 276 goto end_unlock_mutex;
27e2fa5d
JG
277 case SYNC_METADATA_STATUS_ERROR:
278 ret = -1;
279 goto end_unlock_mutex;
280 default:
281 abort();
6f1177cf
JG
282 }
283
284 /*
285 * At this point, new metadata have been flushed, so we wait on the
286 * rendez-vous point for the metadata thread to wake us up when it
287 * finishes consuming the metadata and continue execution.
288 */
289
290 pthread_mutex_lock(&metadata->metadata_rdv_lock);
291
292 /*
293 * Release metadata stream lock so the metadata thread can process it.
294 */
295 pthread_mutex_unlock(&metadata->lock);
296
297 /*
298 * Wait on the rendez-vous point. Once woken up, it means the metadata was
299 * consumed and thus synchronization is achieved.
300 */
301 pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
302 pthread_mutex_unlock(&metadata->metadata_rdv_lock);
27e2fa5d 303 } while (status == SYNC_METADATA_STATUS_NEW_DATA);
6f1177cf
JG
304
305 /* Success */
306 return 0;
307
308end_unlock_mutex:
309 pthread_mutex_unlock(&metadata->lock);
310 return ret;
311}
312
313/*
314 * Synchronize the metadata using a given session ID. A successful acquisition
315 * of a metadata stream will trigger a request to the session daemon and a
316 * snapshot so the metadata thread can consume it.
317 *
318 * This function call is a rendez-vous point between the metadata thread and
319 * the data thread.
320 *
321 * Return 0 on success or else a negative value.
322 */
323int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
324 uint64_t session_id)
325{
326 int ret;
327 struct lttng_consumer_stream *stream = NULL;
328 struct lttng_ht_iter iter;
329 struct lttng_ht *ht;
330
331 assert(ctx);
332
333 /* Ease our life a bit. */
334 ht = consumer_data.stream_list_ht;
335
336 rcu_read_lock();
337
338 /* Search the metadata associated with the session id of the given stream. */
339
340 cds_lfht_for_each_entry_duplicate(ht->ht,
341 ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
342 &session_id, &iter.iter, stream, node_session_id.node) {
343 if (!stream->metadata_flag) {
344 continue;
345 }
346
347 ret = do_sync_metadata(stream, ctx);
348 if (ret < 0) {
349 goto end;
350 }
351 }
352
353 /*
354 * Force return code to 0 (success) since ret might be ENODATA for instance
355 * which is not an error but rather that we should come back.
356 */
357 ret = 0;
358
359end:
360 rcu_read_unlock();
361 return ret;
362}
363
364static int consumer_stream_sync_metadata_index(
365 struct lttng_consumer_stream *stream,
366 const struct stream_subbuffer *subbuffer,
367 struct lttng_consumer_local_data *ctx)
368{
369 int ret;
370
371 /* Block until all the metadata is sent. */
372 pthread_mutex_lock(&stream->metadata_timer_lock);
373 assert(!stream->missed_metadata_flush);
374 stream->waiting_on_metadata = true;
375 pthread_mutex_unlock(&stream->metadata_timer_lock);
376
377 ret = consumer_stream_sync_metadata(ctx, stream->session_id);
378
379 pthread_mutex_lock(&stream->metadata_timer_lock);
380 stream->waiting_on_metadata = false;
381 if (stream->missed_metadata_flush) {
382 stream->missed_metadata_flush = false;
383 pthread_mutex_unlock(&stream->metadata_timer_lock);
384 (void) stream->read_subbuffer_ops.send_live_beacon(stream);
385 } else {
386 pthread_mutex_unlock(&stream->metadata_timer_lock);
387 }
388 if (ret < 0) {
389 goto end;
390 }
391
392 ret = consumer_stream_send_index(stream, subbuffer, ctx);
393end:
394 return ret;
395}
396
397/*
398 * Check if the local version of the metadata stream matches with the version
399 * of the metadata stream in the kernel. If it was updated, set the reset flag
400 * on the stream.
401 */
402static
403int metadata_stream_check_version(struct lttng_consumer_stream *stream,
404 const struct stream_subbuffer *subbuffer)
405{
406 if (stream->metadata_version == subbuffer->info.metadata.version) {
407 goto end;
408 }
409
410 DBG("New metadata version detected");
443d6e6f
JG
411 consumer_stream_metadata_set_version(stream,
412 subbuffer->info.metadata.version);
8d18bcae 413
6f1177cf
JG
414 if (stream->read_subbuffer_ops.reset_metadata) {
415 stream->read_subbuffer_ops.reset_metadata(stream);
416 }
417
418end:
419 return 0;
420}
421
422struct lttng_consumer_stream *consumer_stream_create(
423 struct lttng_consumer_channel *channel,
424 uint64_t channel_key,
425 uint64_t stream_key,
426 const char *channel_name,
427 uint64_t relayd_id,
428 uint64_t session_id,
429 struct lttng_trace_chunk *trace_chunk,
430 int cpu,
431 int *alloc_ret,
432 enum consumer_channel_type type,
433 unsigned int monitor)
434{
435 int ret;
436 struct lttng_consumer_stream *stream;
437
438 stream = zmalloc(sizeof(*stream));
439 if (stream == NULL) {
440 PERROR("malloc struct lttng_consumer_stream");
441 ret = -ENOMEM;
442 goto end;
443 }
444
445 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
446 ERR("Failed to acquire trace chunk reference during the creation of a stream");
447 ret = -1;
448 goto error;
449 }
450
451 rcu_read_lock();
452 stream->chan = channel;
453 stream->key = stream_key;
454 stream->trace_chunk = trace_chunk;
455 stream->out_fd = -1;
456 stream->out_fd_offset = 0;
457 stream->output_written = 0;
458 stream->net_seq_idx = relayd_id;
459 stream->session_id = session_id;
460 stream->monitor = monitor;
461 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
462 stream->index_file = NULL;
463 stream->last_sequence_number = -1ULL;
464 stream->rotate_position = -1ULL;
465 pthread_mutex_init(&stream->lock, NULL);
466 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
467
468 /* If channel is the metadata, flag this stream as metadata. */
469 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
470 stream->metadata_flag = 1;
471 /* Metadata is flat out. */
472 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
473 /* Live rendez-vous point. */
474 pthread_cond_init(&stream->metadata_rdv, NULL);
475 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
476 } else {
477 /* Format stream name to <channel_name>_<cpu_number> */
478 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
479 channel_name, cpu);
480 if (ret < 0) {
481 PERROR("snprintf stream name");
482 goto error;
483 }
484 }
485
486 switch (channel->output) {
487 case CONSUMER_CHANNEL_SPLICE:
488 stream->output = LTTNG_EVENT_SPLICE;
489 ret = utils_create_pipe(stream->splice_pipe);
490 if (ret < 0) {
491 goto error;
492 }
493 break;
494 case CONSUMER_CHANNEL_MMAP:
495 stream->output = LTTNG_EVENT_MMAP;
496 break;
497 default:
498 abort();
499 }
500
501 /* Key is always the wait_fd for streams. */
502 lttng_ht_node_init_u64(&stream->node, stream->key);
503
504 /* Init node per channel id key */
505 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
506
507 /* Init session id node with the stream session id */
508 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
509
510 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
511 " relayd_id %" PRIu64 ", session_id %" PRIu64,
512 stream->name, stream->key, channel_key,
513 stream->net_seq_idx, stream->session_id);
514
515 rcu_read_unlock();
516
517 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
518 stream->read_subbuffer_ops.lock =
519 consumer_stream_metadata_lock_all;
520 stream->read_subbuffer_ops.unlock =
521 consumer_stream_metadata_unlock_all;
522 stream->read_subbuffer_ops.pre_consume_subbuffer =
523 metadata_stream_check_version;
524 } else {
525 stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
526 stream->read_subbuffer_ops.unlock =
527 consumer_stream_data_unlock_all;
528 stream->read_subbuffer_ops.pre_consume_subbuffer =
529 consumer_stream_update_stats;
530 if (channel->is_live) {
531 stream->read_subbuffer_ops.post_consume =
532 consumer_stream_sync_metadata_index;
533 } else {
534 stream->read_subbuffer_ops.post_consume =
535 consumer_stream_send_index;
536 }
537 }
538
539 if (channel->output == CONSUMER_CHANNEL_MMAP) {
540 stream->read_subbuffer_ops.consume_subbuffer =
541 consumer_stream_consume_mmap;
542 } else {
543 stream->read_subbuffer_ops.consume_subbuffer =
544 consumer_stream_consume_splice;
545 }
546
547 return stream;
548
549error:
550 rcu_read_unlock();
551 lttng_trace_chunk_put(stream->trace_chunk);
552 free(stream);
553end:
554 if (alloc_ret) {
555 *alloc_ret = ret;
556 }
557 return NULL;
558}
559
51230d70
DG
560/*
561 * Close stream on the relayd side. This call can destroy a relayd if the
562 * conditions are met.
563 *
564 * A RCU read side lock MUST be acquired if the relayd object was looked up in
565 * a hash table before calling this.
566 */
567void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
568 struct consumer_relayd_sock_pair *relayd)
569{
570 int ret;
571
572 assert(stream);
573 assert(relayd);
574
d01178b6
DG
575 if (stream->sent_to_relayd) {
576 uatomic_dec(&relayd->refcount);
577 assert(uatomic_read(&relayd->refcount) >= 0);
578 }
51230d70
DG
579
580 /* Closing streams requires to lock the control socket. */
581 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
582 ret = relayd_send_close_stream(&relayd->control_sock,
583 stream->relayd_stream_id,
584 stream->next_net_seq_num - 1);
585 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
586 if (ret < 0) {
11413bb9
JR
587 ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
588 lttng_consumer_cleanup_relayd(relayd);
51230d70
DG
589 }
590
591 /* Both conditions are met, we destroy the relayd. */
592 if (uatomic_read(&relayd->refcount) == 0 &&
593 uatomic_read(&relayd->destroy_flag)) {
594 consumer_destroy_relayd(relayd);
595 }
10a50311 596 stream->net_seq_idx = (uint64_t) -1ULL;
d01178b6 597 stream->sent_to_relayd = 0;
51230d70
DG
598}
599
600/*
601 * Close stream's file descriptors and, if needed, close stream also on the
602 * relayd side.
603 *
604 * The consumer data lock MUST be acquired.
605 * The stream lock MUST be acquired.
606 */
607void consumer_stream_close(struct lttng_consumer_stream *stream)
608{
609 int ret;
610 struct consumer_relayd_sock_pair *relayd;
611
612 assert(stream);
613
614 switch (consumer_data.type) {
615 case LTTNG_CONSUMER_KERNEL:
616 if (stream->mmap_base != NULL) {
617 ret = munmap(stream->mmap_base, stream->mmap_len);
618 if (ret != 0) {
619 PERROR("munmap");
620 }
621 }
622
623 if (stream->wait_fd >= 0) {
624 ret = close(stream->wait_fd);
625 if (ret) {
626 PERROR("close");
627 }
10a50311 628 stream->wait_fd = -1;
51230d70 629 }
a2361a61
JD
630 if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
631 utils_close_pipe(stream->splice_pipe);
632 }
51230d70
DG
633 break;
634 case LTTNG_CONSUMER32_UST:
635 case LTTNG_CONSUMER64_UST:
6d574024
DG
636 {
637 /*
638 * Special case for the metadata since the wait fd is an internal pipe
639 * polled in the metadata thread.
640 */
641 if (stream->metadata_flag && stream->chan->monitor) {
642 int rpipe = stream->ust_metadata_poll_pipe[0];
643
644 /*
645 * This will stop the channel timer if one and close the write side
646 * of the metadata poll pipe.
647 */
648 lttng_ustconsumer_close_metadata(stream->chan);
649 if (rpipe >= 0) {
650 ret = close(rpipe);
651 if (ret < 0) {
b4a650f3 652 PERROR("closing metadata pipe read side");
6d574024
DG
653 }
654 stream->ust_metadata_poll_pipe[0] = -1;
655 }
656 }
51230d70 657 break;
6d574024 658 }
51230d70
DG
659 default:
660 ERR("Unknown consumer_data type");
661 assert(0);
662 }
663
664 /* Close output fd. Could be a socket or local file at this point. */
665 if (stream->out_fd >= 0) {
666 ret = close(stream->out_fd);
667 if (ret) {
668 PERROR("close");
669 }
10a50311 670 stream->out_fd = -1;
51230d70
DG
671 }
672
f8f3885c
MD
673 if (stream->index_file) {
674 lttng_index_file_put(stream->index_file);
675 stream->index_file = NULL;
309167d2
JD
676 }
677
e5148e25
JG
678 lttng_trace_chunk_put(stream->trace_chunk);
679 stream->trace_chunk = NULL;
680
51230d70
DG
681 /* Check and cleanup relayd if needed. */
682 rcu_read_lock();
683 relayd = consumer_find_relayd(stream->net_seq_idx);
684 if (relayd != NULL) {
685 consumer_stream_relayd_close(stream, relayd);
686 }
687 rcu_read_unlock();
688}
689
690/*
691 * Delete the stream from all possible hash tables.
692 *
693 * The consumer data lock MUST be acquired.
694 * The stream lock MUST be acquired.
695 */
696void consumer_stream_delete(struct lttng_consumer_stream *stream,
697 struct lttng_ht *ht)
698{
699 int ret;
700 struct lttng_ht_iter iter;
701
702 assert(stream);
10a50311
JD
703 /* Should NEVER be called not in monitor mode. */
704 assert(stream->chan->monitor);
51230d70
DG
705
706 rcu_read_lock();
707
708 if (ht) {
709 iter.iter.node = &stream->node.node;
710 ret = lttng_ht_del(ht, &iter);
711 assert(!ret);
712 }
713
714 /* Delete from stream per channel ID hash table. */
715 iter.iter.node = &stream->node_channel_id.node;
716 /*
717 * The returned value is of no importance. Even if the node is NOT in the
718 * hash table, we continue since we may have been called by a code path
719 * that did not add the stream to a (all) hash table. Same goes for the
720 * next call ht del call.
721 */
722 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
723
724 /* Delete from the global stream list. */
725 iter.iter.node = &stream->node_session_id.node;
726 /* See the previous ht del on why we ignore the returned value. */
727 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
728
729 rcu_read_unlock();
730
6d574024
DG
731 if (!stream->metadata_flag) {
732 /* Decrement the stream count of the global consumer data. */
733 assert(consumer_data.stream_count > 0);
734 consumer_data.stream_count--;
735 }
51230d70
DG
736}
737
738/*
739 * Free the given stream within a RCU call.
740 */
741void consumer_stream_free(struct lttng_consumer_stream *stream)
742{
743 assert(stream);
744
8d18bcae 745 metadata_bucket_destroy(stream->metadata_bucket);
51230d70
DG
746 call_rcu(&stream->node.head, free_stream_rcu);
747}
748
749/*
10a50311 750 * Destroy the stream's buffers of the tracer.
51230d70 751 */
10a50311 752void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
51230d70 753{
10a50311
JD
754 assert(stream);
755
756 switch (consumer_data.type) {
757 case LTTNG_CONSUMER_KERNEL:
758 break;
759 case LTTNG_CONSUMER32_UST:
760 case LTTNG_CONSUMER64_UST:
761 lttng_ustconsumer_del_stream(stream);
762 break;
763 default:
764 ERR("Unknown consumer_data type");
765 assert(0);
766 }
767}
51230d70 768
10a50311 769/*
4891ece8 770 * Destroy and close a already created stream.
10a50311 771 */
4891ece8 772static void destroy_close_stream(struct lttng_consumer_stream *stream)
10a50311 773{
51230d70
DG
774 assert(stream);
775
4891ece8 776 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
10a50311
JD
777
778 /* Destroy tracer buffers of the stream. */
779 consumer_stream_destroy_buffers(stream);
780 /* Close down everything including the relayd if one. */
781 consumer_stream_close(stream);
782}
51230d70 783
10a50311 784/*
4891ece8
DG
785 * Decrement the stream's channel refcount and if down to 0, return the channel
786 * pointer so it can be destroyed by the caller or NULL if not.
10a50311 787 */
4891ece8
DG
788static struct lttng_consumer_channel *unref_channel(
789 struct lttng_consumer_stream *stream)
10a50311 790{
4891ece8
DG
791 struct lttng_consumer_channel *free_chan = NULL;
792
10a50311 793 assert(stream);
4891ece8 794 assert(stream->chan);
10a50311 795
4891ece8
DG
796 /* Update refcount of channel and see if we need to destroy it. */
797 if (!uatomic_sub_return(&stream->chan->refcount, 1)
798 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
799 free_chan = stream->chan;
800 }
51230d70 801
4891ece8 802 return free_chan;
10a50311 803}
51230d70 804
10a50311
JD
805/*
806 * Destroy a stream completely. This will delete, close and free the stream.
807 * Once return, the stream is NO longer usable. Its channel may get destroyed
808 * if conditions are met for a monitored stream.
809 *
810 * This MUST be called WITHOUT the consumer data and stream lock acquired if
811 * the stream is in _monitor_ mode else it does not matter.
812 */
813void consumer_stream_destroy(struct lttng_consumer_stream *stream,
814 struct lttng_ht *ht)
815{
816 assert(stream);
817
818 /* Stream is in monitor mode. */
4891ece8 819 if (stream->monitor) {
10a50311 820 struct lttng_consumer_channel *free_chan = NULL;
51230d70 821
4891ece8
DG
822 /*
823 * This means that the stream was successfully removed from the streams
824 * list of the channel and sent to the right thread managing this
825 * stream thus being globally visible.
826 */
827 if (stream->globally_visible) {
828 pthread_mutex_lock(&consumer_data.lock);
a9838785 829 pthread_mutex_lock(&stream->chan->lock);
4891ece8
DG
830 pthread_mutex_lock(&stream->lock);
831 /* Remove every reference of the stream in the consumer. */
832 consumer_stream_delete(stream, ht);
833
834 destroy_close_stream(stream);
835
836 /* Update channel's refcount of the stream. */
837 free_chan = unref_channel(stream);
838
839 /* Indicates that the consumer data state MUST be updated after this. */
840 consumer_data.need_update = 1;
841
842 pthread_mutex_unlock(&stream->lock);
a9838785 843 pthread_mutex_unlock(&stream->chan->lock);
4891ece8
DG
844 pthread_mutex_unlock(&consumer_data.lock);
845 } else {
846 /*
847 * If the stream is not visible globally, this needs to be done
848 * outside of the consumer data lock section.
849 */
850 free_chan = unref_channel(stream);
10a50311
JD
851 }
852
10a50311
JD
853 if (free_chan) {
854 consumer_del_channel(free_chan);
855 }
856 } else {
4891ece8 857 destroy_close_stream(stream);
51230d70
DG
858 }
859
860 /* Free stream within a RCU call. */
e5148e25
JG
861 lttng_trace_chunk_put(stream->trace_chunk);
862 stream->trace_chunk = NULL;
51230d70
DG
863 consumer_stream_free(stream);
864}
1c20f0e2
JD
865
866/*
867 * Write index of a specific stream either on the relayd or local disk.
868 *
869 * Return 0 on success or else a negative value.
870 */
871int consumer_stream_write_index(struct lttng_consumer_stream *stream,
f8f3885c 872 struct ctf_packet_index *element)
1c20f0e2
JD
873{
874 int ret;
1c20f0e2
JD
875
876 assert(stream);
f8f3885c 877 assert(element);
1c20f0e2
JD
878
879 rcu_read_lock();
23c910e5
JR
880 if (stream->net_seq_idx != (uint64_t) -1ULL) {
881 struct consumer_relayd_sock_pair *relayd;
882 relayd = consumer_find_relayd(stream->net_seq_idx);
883 if (relayd) {
884 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
885 ret = relayd_send_index(&relayd->control_sock, element,
1c20f0e2 886 stream->relayd_stream_id, stream->next_net_seq_num - 1);
11413bb9
JR
887 if (ret < 0) {
888 /*
889 * Communication error with lttng-relayd,
890 * perform cleanup now
891 */
892 ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
893 lttng_consumer_cleanup_relayd(relayd);
894 ret = -1;
895 }
23c910e5
JR
896 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
897 } else {
898 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
899 stream->key, stream->net_seq_idx);
900 ret = -1;
901 }
1c20f0e2 902 } else {
f8f3885c 903 if (lttng_index_file_write(stream->index_file, element)) {
6cd525e8
MD
904 ret = -1;
905 } else {
906 ret = 0;
907 }
1c20f0e2
JD
908 }
909 if (ret < 0) {
910 goto error;
911 }
912
913error:
914 rcu_read_unlock();
915 return ret;
916}
94d49140 917
e5148e25
JG
918int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
919 bool create_index)
920{
921 int ret;
922 enum lttng_trace_chunk_status chunk_status;
923 const int flags = O_WRONLY | O_CREAT | O_TRUNC;
924 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
925 char stream_path[LTTNG_PATH_MAX];
926
927 ASSERT_LOCKED(stream->lock);
928 assert(stream->trace_chunk);
929
930 ret = utils_stream_file_path(stream->chan->pathname, stream->name,
931 stream->chan->tracefile_size,
13d934a5 932 stream->tracefile_count_current, NULL,
e5148e25
JG
933 stream_path, sizeof(stream_path));
934 if (ret < 0) {
935 goto end;
936 }
937
938 if (stream->out_fd >= 0) {
939 ret = close(stream->out_fd);
940 if (ret < 0) {
941 PERROR("Failed to close stream file \"%s\"",
942 stream->name);
943 goto end;
944 }
945 stream->out_fd = -1;
946 }
947
948 DBG("Opening stream output file \"%s\"", stream_path);
949 chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
950 flags, mode, &stream->out_fd);
951 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
952 ERR("Failed to open stream file \"%s\"", stream->name);
953 ret = -1;
954 goto end;
955 }
956
957 if (!stream->metadata_flag && (create_index || stream->index_file)) {
958 if (stream->index_file) {
959 lttng_index_file_put(stream->index_file);
960 }
961 stream->index_file = lttng_index_file_create_from_trace_chunk(
962 stream->trace_chunk,
963 stream->chan->pathname,
964 stream->name,
965 stream->chan->tracefile_size,
966 stream->tracefile_count_current,
967 CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
968 false);
969 if (!stream->index_file) {
970 ret = -1;
971 goto end;
972 }
973 }
974
975 /* Reset current size because we just perform a rotation. */
976 stream->tracefile_size_current = 0;
977 stream->out_fd_offset = 0;
978end:
979 return ret;
980}
981
982int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
983{
984 int ret;
985
986 stream->tracefile_count_current++;
987 if (stream->chan->tracefile_count > 0) {
988 stream->tracefile_count_current %=
989 stream->chan->tracefile_count;
990 }
991
992 DBG("Rotating output files of stream \"%s\"", stream->name);
993 ret = consumer_stream_create_output_files(stream, true);
994 if (ret) {
995 goto end;
996 }
997
998end:
999 return ret;
1000}
1d18f2d9
JG
1001
1002bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
1003{
1004 /*
1005 * This function does not take a const stream since
1006 * cds_lfht_is_node_deleted was not const before liburcu 0.12.
1007 */
1008 assert(stream);
1009 return cds_lfht_is_node_deleted(&stream->node.node);
1010}
8d18bcae
JG
1011
1012static ssize_t metadata_bucket_flush(
1013 const struct stream_subbuffer *buffer, void *data)
1014{
1015 ssize_t ret;
1016 struct lttng_consumer_stream *stream = data;
1017
1018 ret = consumer_stream_consume_mmap(NULL, stream, buffer);
1019 if (ret < 0) {
1020 goto end;
1021 }
1022end:
1023 return ret;
1024}
1025
1026static ssize_t metadata_bucket_consume(
1027 struct lttng_consumer_local_data *unused,
1028 struct lttng_consumer_stream *stream,
1029 const struct stream_subbuffer *subbuffer)
1030{
1031 ssize_t ret;
1032 enum metadata_bucket_status status;
1033
1034 status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
1035 switch (status) {
1036 case METADATA_BUCKET_STATUS_OK:
1037 /* Return consumed size. */
1038 ret = subbuffer->buffer.buffer.size;
1039 break;
1040 default:
1041 ret = -1;
1042 }
1043
1044 return ret;
1045}
1046
1047int consumer_stream_enable_metadata_bucketization(
1048 struct lttng_consumer_stream *stream)
1049{
1050 int ret = 0;
1051
1052 assert(stream->metadata_flag);
1053 assert(!stream->metadata_bucket);
1054 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1055
1056 stream->metadata_bucket = metadata_bucket_create(
1057 metadata_bucket_flush, stream);
1058 if (!stream->metadata_bucket) {
1059 ret = -1;
1060 goto end;
1061 }
1062
1063 stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
1064end:
1065 return ret;
1066}
443d6e6f
JG
1067
1068void consumer_stream_metadata_set_version(
1069 struct lttng_consumer_stream *stream, uint64_t new_version)
1070{
1071 assert(new_version > stream->metadata_version);
1072 stream->metadata_version = new_version;
1073 stream->reset_metadata_flag = 1;
1074
1075 if (stream->metadata_bucket) {
1076 metadata_bucket_reset(stream->metadata_bucket);
1077 }
1078}
This page took 0.087331 seconds and 4 git commands to generate.