Fix: kconsumer: missing wait for metadata thread in do_sync_metadata
[lttng-tools.git] / src / common / consumer / consumer-stream.c
CommitLineData
51230d70 1/*
ab5be9fa
MJ
2 * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
51230d70 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
51230d70 7 *
51230d70
DG
8 */
9
6c1c0768 10#define _LGPL_SOURCE
51230d70 11#include <assert.h>
10a50311 12#include <inttypes.h>
51230d70
DG
13#include <sys/mman.h>
14#include <unistd.h>
15
16#include <common/common.h>
1c20f0e2 17#include <common/index/index.h>
94d49140 18#include <common/kernel-consumer/kernel-consumer.h>
51230d70
DG
19#include <common/relayd/relayd.h>
20#include <common/ust-consumer/ust-consumer.h>
a2361a61 21#include <common/utils.h>
6f9449c2
JG
22#include <common/consumer/consumer.h>
23#include <common/consumer/consumer-timer.h>
f5ba75b4 24#include <common/consumer/metadata-bucket.h>
51230d70
DG
25
26#include "consumer-stream.h"
27
28/*
29 * RCU call to free stream. MUST only be used with call_rcu().
30 */
31static void free_stream_rcu(struct rcu_head *head)
32{
33 struct lttng_ht_node_u64 *node =
34 caa_container_of(head, struct lttng_ht_node_u64, head);
35 struct lttng_consumer_stream *stream =
36 caa_container_of(node, struct lttng_consumer_stream, node);
37
38 pthread_mutex_destroy(&stream->lock);
39 free(stream);
40}
41
6f9449c2
JG
42static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream)
43{
44 pthread_mutex_lock(&stream->chan->lock);
45 pthread_mutex_lock(&stream->lock);
46}
47
48static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream)
49{
50 pthread_mutex_unlock(&stream->lock);
51 pthread_mutex_unlock(&stream->chan->lock);
52}
53
54static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
55{
56 consumer_stream_data_lock_all(stream);
57 pthread_mutex_lock(&stream->metadata_rdv_lock);
58}
59
60static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream)
61{
62 pthread_mutex_unlock(&stream->metadata_rdv_lock);
63 consumer_stream_data_unlock_all(stream);
64}
65
66/* Only used for data streams. */
67static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
68 const struct stream_subbuffer *subbuf)
69{
70 int ret = 0;
71 uint64_t sequence_number;
d9b063d7 72 const uint64_t discarded_events = subbuf->info.data.events_discarded;
6f9449c2
JG
73
74 if (!subbuf->info.data.sequence_number.is_set) {
75 /* Command not supported by the tracer. */
76 sequence_number = -1ULL;
77 stream->sequence_number_unavailable = true;
78 } else {
79 sequence_number = subbuf->info.data.sequence_number.value;
80 }
81
82 /*
83 * Start the sequence when we extract the first packet in case we don't
84 * start at 0 (for example if a consumer is not connected to the
85 * session immediately after the beginning).
86 */
87 if (stream->last_sequence_number == -1ULL) {
88 stream->last_sequence_number = sequence_number;
89 } else if (sequence_number > stream->last_sequence_number) {
90 stream->chan->lost_packets += sequence_number -
91 stream->last_sequence_number - 1;
92 } else {
93 /* seq <= last_sequence_number */
94 ERR("Sequence number inconsistent : prev = %" PRIu64
95 ", current = %" PRIu64,
96 stream->last_sequence_number, sequence_number);
97 ret = -1;
98 goto end;
99 }
100 stream->last_sequence_number = sequence_number;
101
102 if (discarded_events < stream->last_discarded_events) {
103 /*
104 * Overflow has occurred. We assume only one wrap-around
105 * has occurred.
106 */
107 stream->chan->discarded_events +=
108 (1ULL << (CAA_BITS_PER_LONG - 1)) -
109 stream->last_discarded_events +
110 discarded_events;
111 } else {
112 stream->chan->discarded_events += discarded_events -
113 stream->last_discarded_events;
114 }
115 stream->last_discarded_events = discarded_events;
116 ret = 0;
117
118end:
119 return ret;
120}
121
122static
123void ctf_packet_index_populate(struct ctf_packet_index *index,
124 off_t offset, const struct stream_subbuffer *subbuffer)
125{
126 *index = (typeof(*index)){
127 .offset = htobe64(offset),
128 .packet_size = htobe64(subbuffer->info.data.packet_size),
129 .content_size = htobe64(subbuffer->info.data.content_size),
130 .timestamp_begin = htobe64(
131 subbuffer->info.data.timestamp_begin),
132 .timestamp_end = htobe64(
133 subbuffer->info.data.timestamp_end),
134 .events_discarded = htobe64(
135 subbuffer->info.data.events_discarded),
136 .stream_id = htobe64(subbuffer->info.data.stream_id),
137 .stream_instance_id = htobe64(
138 subbuffer->info.data.stream_instance_id.is_set ?
139 subbuffer->info.data.stream_instance_id.value : -1ULL),
140 .packet_seq_num = htobe64(
141 subbuffer->info.data.sequence_number.is_set ?
142 subbuffer->info.data.sequence_number.value : -1ULL),
143 };
144}
145
146static ssize_t consumer_stream_consume_mmap(
147 struct lttng_consumer_local_data *ctx,
148 struct lttng_consumer_stream *stream,
149 const struct stream_subbuffer *subbuffer)
150{
151 const unsigned long padding_size =
152 subbuffer->info.data.padded_subbuf_size -
153 subbuffer->info.data.subbuf_size;
154
155 return lttng_consumer_on_read_subbuffer_mmap(
f5ba75b4 156 stream, &subbuffer->buffer.buffer, padding_size);
6f9449c2
JG
157}
158
159static ssize_t consumer_stream_consume_splice(
160 struct lttng_consumer_local_data *ctx,
161 struct lttng_consumer_stream *stream,
162 const struct stream_subbuffer *subbuffer)
163{
164 return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
165 subbuffer->info.data.padded_subbuf_size, 0);
166}
167
168static int consumer_stream_send_index(
169 struct lttng_consumer_stream *stream,
170 const struct stream_subbuffer *subbuffer,
171 struct lttng_consumer_local_data *ctx)
172{
173 off_t packet_offset = 0;
174 struct ctf_packet_index index = {};
175
176 /*
177 * This is called after consuming the sub-buffer; substract the
178 * effect this sub-buffer from the offset.
179 */
180 if (stream->net_seq_idx == (uint64_t) -1ULL) {
181 packet_offset = stream->out_fd_offset -
182 subbuffer->info.data.padded_subbuf_size;
183 }
184
185 ctf_packet_index_populate(&index, packet_offset, subbuffer);
186 return consumer_stream_write_index(stream, &index);
187}
188
189/*
190 * Actually do the metadata sync using the given metadata stream.
191 *
192 * Return 0 on success else a negative value. ENODATA can be returned also
193 * indicating that there is no metadata available for that stream.
194 */
195static int do_sync_metadata(struct lttng_consumer_stream *metadata,
196 struct lttng_consumer_local_data *ctx)
197{
198 int ret;
577eea73 199 enum sync_metadata_status status;
6f9449c2
JG
200
201 assert(metadata);
202 assert(metadata->metadata_flag);
203 assert(ctx);
204
205 /*
206 * In UST, since we have to write the metadata from the cache packet
207 * by packet, we might need to start this procedure multiple times
208 * until all the metadata from the cache has been extracted.
209 */
210 do {
211 /*
212 * Steps :
213 * - Lock the metadata stream
214 * - Check if metadata stream node was deleted before locking.
215 * - if yes, release and return success
216 * - Check if new metadata is ready (flush + snapshot pos)
217 * - If nothing : release and return.
218 * - Lock the metadata_rdv_lock
219 * - Unlock the metadata stream
220 * - cond_wait on metadata_rdv to wait the wakeup from the
221 * metadata thread
222 * - Unlock the metadata_rdv_lock
223 */
224 pthread_mutex_lock(&metadata->lock);
225
226 /*
227 * There is a possibility that we were able to acquire a reference on the
228 * stream from the RCU hash table but between then and now, the node might
229 * have been deleted just before the lock is acquired. Thus, after locking,
230 * we make sure the metadata node has not been deleted which means that the
231 * buffers are closed.
232 *
233 * In that case, there is no need to sync the metadata hence returning a
234 * success return code.
235 */
236 ret = cds_lfht_is_node_deleted(&metadata->node.node);
237 if (ret) {
238 ret = 0;
239 goto end_unlock_mutex;
240 }
241
242 switch (ctx->type) {
243 case LTTNG_CONSUMER_KERNEL:
244 /*
245 * Empty the metadata cache and flush the current stream.
246 */
577eea73 247 status = lttng_kconsumer_sync_metadata(metadata);
6f9449c2
JG
248 break;
249 case LTTNG_CONSUMER32_UST:
250 case LTTNG_CONSUMER64_UST:
251 /*
252 * Ask the sessiond if we have new metadata waiting and update the
253 * consumer metadata cache.
254 */
577eea73 255 status = lttng_ustconsumer_sync_metadata(ctx, metadata);
6f9449c2
JG
256 break;
257 default:
577eea73 258 abort();
6f9449c2 259 }
577eea73
JG
260
261 switch (status) {
262 case SYNC_METADATA_STATUS_NEW_DATA:
263 break;
264 case SYNC_METADATA_STATUS_NO_DATA:
265 ret = 0;
6f9449c2 266 goto end_unlock_mutex;
577eea73
JG
267 case SYNC_METADATA_STATUS_ERROR:
268 ret = -1;
269 goto end_unlock_mutex;
270 default:
271 abort();
6f9449c2
JG
272 }
273
274 /*
275 * At this point, new metadata have been flushed, so we wait on the
276 * rendez-vous point for the metadata thread to wake us up when it
277 * finishes consuming the metadata and continue execution.
278 */
279
280 pthread_mutex_lock(&metadata->metadata_rdv_lock);
281
282 /*
283 * Release metadata stream lock so the metadata thread can process it.
284 */
285 pthread_mutex_unlock(&metadata->lock);
286
287 /*
288 * Wait on the rendez-vous point. Once woken up, it means the metadata was
289 * consumed and thus synchronization is achieved.
290 */
291 pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
292 pthread_mutex_unlock(&metadata->metadata_rdv_lock);
577eea73 293 } while (status == SYNC_METADATA_STATUS_NEW_DATA);
6f9449c2
JG
294
295 /* Success */
296 return 0;
297
298end_unlock_mutex:
299 pthread_mutex_unlock(&metadata->lock);
300 return ret;
301}
302
303/*
304 * Synchronize the metadata using a given session ID. A successful acquisition
305 * of a metadata stream will trigger a request to the session daemon and a
306 * snapshot so the metadata thread can consume it.
307 *
308 * This function call is a rendez-vous point between the metadata thread and
309 * the data thread.
310 *
311 * Return 0 on success or else a negative value.
312 */
313int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
314 uint64_t session_id)
315{
316 int ret;
317 struct lttng_consumer_stream *stream = NULL;
318 struct lttng_ht_iter iter;
319 struct lttng_ht *ht;
320
321 assert(ctx);
322
323 /* Ease our life a bit. */
324 ht = consumer_data.stream_list_ht;
325
326 rcu_read_lock();
327
328 /* Search the metadata associated with the session id of the given stream. */
329
330 cds_lfht_for_each_entry_duplicate(ht->ht,
331 ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
332 &session_id, &iter.iter, stream, node_session_id.node) {
333 if (!stream->metadata_flag) {
334 continue;
335 }
336
337 ret = do_sync_metadata(stream, ctx);
338 if (ret < 0) {
339 goto end;
340 }
341 }
342
343 /*
344 * Force return code to 0 (success) since ret might be ENODATA for instance
345 * which is not an error but rather that we should come back.
346 */
347 ret = 0;
348
349end:
350 rcu_read_unlock();
351 return ret;
352}
353
354static int consumer_stream_sync_metadata_index(
355 struct lttng_consumer_stream *stream,
356 const struct stream_subbuffer *subbuffer,
357 struct lttng_consumer_local_data *ctx)
358{
359 int ret;
360
361 /* Block until all the metadata is sent. */
362 pthread_mutex_lock(&stream->metadata_timer_lock);
363 assert(!stream->missed_metadata_flush);
364 stream->waiting_on_metadata = true;
365 pthread_mutex_unlock(&stream->metadata_timer_lock);
366
367 ret = consumer_stream_sync_metadata(ctx, stream->session_id);
368
369 pthread_mutex_lock(&stream->metadata_timer_lock);
370 stream->waiting_on_metadata = false;
371 if (stream->missed_metadata_flush) {
372 stream->missed_metadata_flush = false;
373 pthread_mutex_unlock(&stream->metadata_timer_lock);
374 (void) stream->read_subbuffer_ops.send_live_beacon(stream);
375 } else {
376 pthread_mutex_unlock(&stream->metadata_timer_lock);
377 }
378 if (ret < 0) {
379 goto end;
380 }
381
382 ret = consumer_stream_send_index(stream, subbuffer, ctx);
383end:
384 return ret;
385}
386
387/*
388 * Check if the local version of the metadata stream matches with the version
389 * of the metadata stream in the kernel. If it was updated, set the reset flag
390 * on the stream.
391 */
392static
393int metadata_stream_check_version(struct lttng_consumer_stream *stream,
394 const struct stream_subbuffer *subbuffer)
395{
396 if (stream->metadata_version == subbuffer->info.metadata.version) {
397 goto end;
398 }
399
400 DBG("New metadata version detected");
55954e07
JG
401 consumer_stream_metadata_set_version(stream,
402 subbuffer->info.metadata.version);
f5ba75b4 403
6f9449c2
JG
404 if (stream->read_subbuffer_ops.reset_metadata) {
405 stream->read_subbuffer_ops.reset_metadata(stream);
406 }
407
408end:
409 return 0;
410}
411
412struct lttng_consumer_stream *consumer_stream_create(
413 struct lttng_consumer_channel *channel,
414 uint64_t channel_key,
415 uint64_t stream_key,
416 const char *channel_name,
417 uint64_t relayd_id,
418 uint64_t session_id,
419 struct lttng_trace_chunk *trace_chunk,
420 int cpu,
421 int *alloc_ret,
422 enum consumer_channel_type type,
423 unsigned int monitor)
424{
425 int ret;
426 struct lttng_consumer_stream *stream;
427
428 stream = zmalloc(sizeof(*stream));
429 if (stream == NULL) {
430 PERROR("malloc struct lttng_consumer_stream");
431 ret = -ENOMEM;
432 goto end;
433 }
434
435 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
436 ERR("Failed to acquire trace chunk reference during the creation of a stream");
437 ret = -1;
438 goto error;
439 }
440
441 rcu_read_lock();
442 stream->chan = channel;
443 stream->key = stream_key;
444 stream->trace_chunk = trace_chunk;
445 stream->out_fd = -1;
446 stream->out_fd_offset = 0;
447 stream->output_written = 0;
448 stream->net_seq_idx = relayd_id;
449 stream->session_id = session_id;
450 stream->monitor = monitor;
451 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
452 stream->index_file = NULL;
453 stream->last_sequence_number = -1ULL;
454 stream->rotate_position = -1ULL;
455 pthread_mutex_init(&stream->lock, NULL);
456 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
457
458 /* If channel is the metadata, flag this stream as metadata. */
459 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
460 stream->metadata_flag = 1;
461 /* Metadata is flat out. */
462 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
463 /* Live rendez-vous point. */
464 pthread_cond_init(&stream->metadata_rdv, NULL);
465 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
466 } else {
467 /* Format stream name to <channel_name>_<cpu_number> */
468 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
469 channel_name, cpu);
470 if (ret < 0) {
471 PERROR("snprintf stream name");
472 goto error;
473 }
474 }
475
476 switch (channel->output) {
477 case CONSUMER_CHANNEL_SPLICE:
478 stream->output = LTTNG_EVENT_SPLICE;
479 ret = utils_create_pipe(stream->splice_pipe);
480 if (ret < 0) {
481 goto error;
482 }
483 break;
484 case CONSUMER_CHANNEL_MMAP:
485 stream->output = LTTNG_EVENT_MMAP;
486 break;
487 default:
488 abort();
489 }
490
491 /* Key is always the wait_fd for streams. */
492 lttng_ht_node_init_u64(&stream->node, stream->key);
493
494 /* Init node per channel id key */
495 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
496
497 /* Init session id node with the stream session id */
498 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
499
500 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
501 " relayd_id %" PRIu64 ", session_id %" PRIu64,
502 stream->name, stream->key, channel_key,
503 stream->net_seq_idx, stream->session_id);
504
505 rcu_read_unlock();
506
507 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
508 stream->read_subbuffer_ops.lock =
509 consumer_stream_metadata_lock_all;
510 stream->read_subbuffer_ops.unlock =
511 consumer_stream_metadata_unlock_all;
512 stream->read_subbuffer_ops.pre_consume_subbuffer =
513 metadata_stream_check_version;
514 } else {
515 stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
516 stream->read_subbuffer_ops.unlock =
517 consumer_stream_data_unlock_all;
518 stream->read_subbuffer_ops.pre_consume_subbuffer =
519 consumer_stream_update_stats;
520 if (channel->is_live) {
521 stream->read_subbuffer_ops.post_consume =
522 consumer_stream_sync_metadata_index;
523 } else {
524 stream->read_subbuffer_ops.post_consume =
525 consumer_stream_send_index;
526 }
527 }
528
529 if (channel->output == CONSUMER_CHANNEL_MMAP) {
530 stream->read_subbuffer_ops.consume_subbuffer =
531 consumer_stream_consume_mmap;
532 } else {
533 stream->read_subbuffer_ops.consume_subbuffer =
534 consumer_stream_consume_splice;
535 }
536
537 return stream;
538
539error:
540 rcu_read_unlock();
541 lttng_trace_chunk_put(stream->trace_chunk);
542 free(stream);
543end:
544 if (alloc_ret) {
545 *alloc_ret = ret;
546 }
547 return NULL;
548}
549
51230d70
DG
550/*
551 * Close stream on the relayd side. This call can destroy a relayd if the
552 * conditions are met.
553 *
554 * A RCU read side lock MUST be acquired if the relayd object was looked up in
555 * a hash table before calling this.
556 */
557void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
558 struct consumer_relayd_sock_pair *relayd)
559{
560 int ret;
561
562 assert(stream);
563 assert(relayd);
564
d01178b6
DG
565 if (stream->sent_to_relayd) {
566 uatomic_dec(&relayd->refcount);
567 assert(uatomic_read(&relayd->refcount) >= 0);
568 }
51230d70
DG
569
570 /* Closing streams requires to lock the control socket. */
571 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
572 ret = relayd_send_close_stream(&relayd->control_sock,
573 stream->relayd_stream_id,
574 stream->next_net_seq_num - 1);
575 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
576 if (ret < 0) {
9276e5c8
JR
577 ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
578 lttng_consumer_cleanup_relayd(relayd);
51230d70
DG
579 }
580
581 /* Both conditions are met, we destroy the relayd. */
582 if (uatomic_read(&relayd->refcount) == 0 &&
583 uatomic_read(&relayd->destroy_flag)) {
584 consumer_destroy_relayd(relayd);
585 }
10a50311 586 stream->net_seq_idx = (uint64_t) -1ULL;
d01178b6 587 stream->sent_to_relayd = 0;
51230d70
DG
588}
589
590/*
591 * Close stream's file descriptors and, if needed, close stream also on the
592 * relayd side.
593 *
594 * The consumer data lock MUST be acquired.
595 * The stream lock MUST be acquired.
596 */
597void consumer_stream_close(struct lttng_consumer_stream *stream)
598{
599 int ret;
600 struct consumer_relayd_sock_pair *relayd;
601
602 assert(stream);
603
604 switch (consumer_data.type) {
605 case LTTNG_CONSUMER_KERNEL:
606 if (stream->mmap_base != NULL) {
607 ret = munmap(stream->mmap_base, stream->mmap_len);
608 if (ret != 0) {
609 PERROR("munmap");
610 }
611 }
612
613 if (stream->wait_fd >= 0) {
614 ret = close(stream->wait_fd);
615 if (ret) {
616 PERROR("close");
617 }
10a50311 618 stream->wait_fd = -1;
51230d70 619 }
a2361a61
JD
620 if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
621 utils_close_pipe(stream->splice_pipe);
622 }
51230d70
DG
623 break;
624 case LTTNG_CONSUMER32_UST:
625 case LTTNG_CONSUMER64_UST:
6d574024
DG
626 {
627 /*
628 * Special case for the metadata since the wait fd is an internal pipe
629 * polled in the metadata thread.
630 */
631 if (stream->metadata_flag && stream->chan->monitor) {
632 int rpipe = stream->ust_metadata_poll_pipe[0];
633
634 /*
635 * This will stop the channel timer if one and close the write side
636 * of the metadata poll pipe.
637 */
638 lttng_ustconsumer_close_metadata(stream->chan);
639 if (rpipe >= 0) {
640 ret = close(rpipe);
641 if (ret < 0) {
b4a650f3 642 PERROR("closing metadata pipe read side");
6d574024
DG
643 }
644 stream->ust_metadata_poll_pipe[0] = -1;
645 }
646 }
51230d70 647 break;
6d574024 648 }
51230d70
DG
649 default:
650 ERR("Unknown consumer_data type");
651 assert(0);
652 }
653
654 /* Close output fd. Could be a socket or local file at this point. */
655 if (stream->out_fd >= 0) {
656 ret = close(stream->out_fd);
657 if (ret) {
658 PERROR("close");
659 }
10a50311 660 stream->out_fd = -1;
51230d70
DG
661 }
662
f8f3885c
MD
663 if (stream->index_file) {
664 lttng_index_file_put(stream->index_file);
665 stream->index_file = NULL;
309167d2
JD
666 }
667
d2956687
JG
668 lttng_trace_chunk_put(stream->trace_chunk);
669 stream->trace_chunk = NULL;
670
51230d70
DG
671 /* Check and cleanup relayd if needed. */
672 rcu_read_lock();
673 relayd = consumer_find_relayd(stream->net_seq_idx);
674 if (relayd != NULL) {
675 consumer_stream_relayd_close(stream, relayd);
676 }
677 rcu_read_unlock();
678}
679
680/*
681 * Delete the stream from all possible hash tables.
682 *
683 * The consumer data lock MUST be acquired.
684 * The stream lock MUST be acquired.
685 */
686void consumer_stream_delete(struct lttng_consumer_stream *stream,
687 struct lttng_ht *ht)
688{
689 int ret;
690 struct lttng_ht_iter iter;
691
692 assert(stream);
10a50311
JD
693 /* Should NEVER be called not in monitor mode. */
694 assert(stream->chan->monitor);
51230d70
DG
695
696 rcu_read_lock();
697
698 if (ht) {
699 iter.iter.node = &stream->node.node;
700 ret = lttng_ht_del(ht, &iter);
701 assert(!ret);
702 }
703
704 /* Delete from stream per channel ID hash table. */
705 iter.iter.node = &stream->node_channel_id.node;
706 /*
707 * The returned value is of no importance. Even if the node is NOT in the
708 * hash table, we continue since we may have been called by a code path
709 * that did not add the stream to a (all) hash table. Same goes for the
710 * next call ht del call.
711 */
712 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
713
714 /* Delete from the global stream list. */
715 iter.iter.node = &stream->node_session_id.node;
716 /* See the previous ht del on why we ignore the returned value. */
717 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
718
719 rcu_read_unlock();
720
6d574024
DG
721 if (!stream->metadata_flag) {
722 /* Decrement the stream count of the global consumer data. */
723 assert(consumer_data.stream_count > 0);
724 consumer_data.stream_count--;
725 }
51230d70
DG
726}
727
728/*
729 * Free the given stream within a RCU call.
730 */
731void consumer_stream_free(struct lttng_consumer_stream *stream)
732{
733 assert(stream);
734
f5ba75b4 735 metadata_bucket_destroy(stream->metadata_bucket);
51230d70
DG
736 call_rcu(&stream->node.head, free_stream_rcu);
737}
738
739/*
10a50311 740 * Destroy the stream's buffers of the tracer.
51230d70 741 */
10a50311 742void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
51230d70 743{
10a50311
JD
744 assert(stream);
745
746 switch (consumer_data.type) {
747 case LTTNG_CONSUMER_KERNEL:
748 break;
749 case LTTNG_CONSUMER32_UST:
750 case LTTNG_CONSUMER64_UST:
751 lttng_ustconsumer_del_stream(stream);
752 break;
753 default:
754 ERR("Unknown consumer_data type");
755 assert(0);
756 }
757}
51230d70 758
10a50311 759/*
4891ece8 760 * Destroy and close a already created stream.
10a50311 761 */
4891ece8 762static void destroy_close_stream(struct lttng_consumer_stream *stream)
10a50311 763{
51230d70
DG
764 assert(stream);
765
4891ece8 766 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
10a50311
JD
767
768 /* Destroy tracer buffers of the stream. */
769 consumer_stream_destroy_buffers(stream);
770 /* Close down everything including the relayd if one. */
771 consumer_stream_close(stream);
772}
51230d70 773
10a50311 774/*
4891ece8
DG
775 * Decrement the stream's channel refcount and if down to 0, return the channel
776 * pointer so it can be destroyed by the caller or NULL if not.
10a50311 777 */
4891ece8
DG
778static struct lttng_consumer_channel *unref_channel(
779 struct lttng_consumer_stream *stream)
10a50311 780{
4891ece8
DG
781 struct lttng_consumer_channel *free_chan = NULL;
782
10a50311 783 assert(stream);
4891ece8 784 assert(stream->chan);
10a50311 785
4891ece8
DG
786 /* Update refcount of channel and see if we need to destroy it. */
787 if (!uatomic_sub_return(&stream->chan->refcount, 1)
788 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
789 free_chan = stream->chan;
790 }
51230d70 791
4891ece8 792 return free_chan;
10a50311 793}
51230d70 794
10a50311
JD
795/*
796 * Destroy a stream completely. This will delete, close and free the stream.
797 * Once return, the stream is NO longer usable. Its channel may get destroyed
798 * if conditions are met for a monitored stream.
799 *
800 * This MUST be called WITHOUT the consumer data and stream lock acquired if
801 * the stream is in _monitor_ mode else it does not matter.
802 */
803void consumer_stream_destroy(struct lttng_consumer_stream *stream,
804 struct lttng_ht *ht)
805{
806 assert(stream);
807
808 /* Stream is in monitor mode. */
4891ece8 809 if (stream->monitor) {
10a50311 810 struct lttng_consumer_channel *free_chan = NULL;
51230d70 811
4891ece8
DG
812 /*
813 * This means that the stream was successfully removed from the streams
814 * list of the channel and sent to the right thread managing this
815 * stream thus being globally visible.
816 */
817 if (stream->globally_visible) {
818 pthread_mutex_lock(&consumer_data.lock);
a9838785 819 pthread_mutex_lock(&stream->chan->lock);
4891ece8
DG
820 pthread_mutex_lock(&stream->lock);
821 /* Remove every reference of the stream in the consumer. */
822 consumer_stream_delete(stream, ht);
823
824 destroy_close_stream(stream);
825
826 /* Update channel's refcount of the stream. */
827 free_chan = unref_channel(stream);
828
829 /* Indicates that the consumer data state MUST be updated after this. */
830 consumer_data.need_update = 1;
831
832 pthread_mutex_unlock(&stream->lock);
a9838785 833 pthread_mutex_unlock(&stream->chan->lock);
4891ece8
DG
834 pthread_mutex_unlock(&consumer_data.lock);
835 } else {
836 /*
837 * If the stream is not visible globally, this needs to be done
838 * outside of the consumer data lock section.
839 */
840 free_chan = unref_channel(stream);
10a50311
JD
841 }
842
10a50311
JD
843 if (free_chan) {
844 consumer_del_channel(free_chan);
845 }
846 } else {
4891ece8 847 destroy_close_stream(stream);
51230d70
DG
848 }
849
850 /* Free stream within a RCU call. */
d2956687
JG
851 lttng_trace_chunk_put(stream->trace_chunk);
852 stream->trace_chunk = NULL;
51230d70
DG
853 consumer_stream_free(stream);
854}
1c20f0e2
JD
855
856/*
857 * Write index of a specific stream either on the relayd or local disk.
858 *
859 * Return 0 on success or else a negative value.
860 */
861int consumer_stream_write_index(struct lttng_consumer_stream *stream,
f8f3885c 862 struct ctf_packet_index *element)
1c20f0e2
JD
863{
864 int ret;
1c20f0e2
JD
865
866 assert(stream);
f8f3885c 867 assert(element);
1c20f0e2
JD
868
869 rcu_read_lock();
23c910e5
JR
870 if (stream->net_seq_idx != (uint64_t) -1ULL) {
871 struct consumer_relayd_sock_pair *relayd;
872 relayd = consumer_find_relayd(stream->net_seq_idx);
873 if (relayd) {
874 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
875 ret = relayd_send_index(&relayd->control_sock, element,
1c20f0e2 876 stream->relayd_stream_id, stream->next_net_seq_num - 1);
9276e5c8
JR
877 if (ret < 0) {
878 /*
879 * Communication error with lttng-relayd,
880 * perform cleanup now
881 */
882 ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
883 lttng_consumer_cleanup_relayd(relayd);
884 ret = -1;
885 }
23c910e5
JR
886 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
887 } else {
888 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
889 stream->key, stream->net_seq_idx);
890 ret = -1;
891 }
1c20f0e2 892 } else {
f8f3885c 893 if (lttng_index_file_write(stream->index_file, element)) {
6cd525e8
MD
894 ret = -1;
895 } else {
896 ret = 0;
897 }
1c20f0e2
JD
898 }
899 if (ret < 0) {
900 goto error;
901 }
902
903error:
904 rcu_read_unlock();
905 return ret;
906}
94d49140 907
d2956687
JG
908int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
909 bool create_index)
910{
911 int ret;
912 enum lttng_trace_chunk_status chunk_status;
913 const int flags = O_WRONLY | O_CREAT | O_TRUNC;
914 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
915 char stream_path[LTTNG_PATH_MAX];
916
917 ASSERT_LOCKED(stream->lock);
918 assert(stream->trace_chunk);
919
920 ret = utils_stream_file_path(stream->chan->pathname, stream->name,
921 stream->chan->tracefile_size,
3b16476a 922 stream->tracefile_count_current, NULL,
d2956687
JG
923 stream_path, sizeof(stream_path));
924 if (ret < 0) {
925 goto end;
926 }
927
928 if (stream->out_fd >= 0) {
929 ret = close(stream->out_fd);
930 if (ret < 0) {
931 PERROR("Failed to close stream file \"%s\"",
932 stream->name);
933 goto end;
934 }
935 stream->out_fd = -1;
936 }
937
938 DBG("Opening stream output file \"%s\"", stream_path);
939 chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
3ff5c5db 940 flags, mode, &stream->out_fd, false);
d2956687
JG
941 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
942 ERR("Failed to open stream file \"%s\"", stream->name);
943 ret = -1;
944 goto end;
945 }
946
947 if (!stream->metadata_flag && (create_index || stream->index_file)) {
948 if (stream->index_file) {
949 lttng_index_file_put(stream->index_file);
950 }
3ff5c5db 951 chunk_status = lttng_index_file_create_from_trace_chunk(
d2956687
JG
952 stream->trace_chunk,
953 stream->chan->pathname,
954 stream->name,
955 stream->chan->tracefile_size,
956 stream->tracefile_count_current,
957 CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
3ff5c5db
MD
958 false, &stream->index_file);
959 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
d2956687
JG
960 ret = -1;
961 goto end;
962 }
963 }
964
965 /* Reset current size because we just perform a rotation. */
966 stream->tracefile_size_current = 0;
967 stream->out_fd_offset = 0;
968end:
969 return ret;
970}
971
972int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
973{
974 int ret;
975
976 stream->tracefile_count_current++;
977 if (stream->chan->tracefile_count > 0) {
978 stream->tracefile_count_current %=
979 stream->chan->tracefile_count;
980 }
981
982 DBG("Rotating output files of stream \"%s\"", stream->name);
983 ret = consumer_stream_create_output_files(stream, true);
984 if (ret) {
985 goto end;
986 }
987
988end:
989 return ret;
990}
cdb72e4e
JG
991
992bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
993{
994 /*
995 * This function does not take a const stream since
996 * cds_lfht_is_node_deleted was not const before liburcu 0.12.
997 */
998 assert(stream);
999 return cds_lfht_is_node_deleted(&stream->node.node);
1000}
f5ba75b4
JG
1001
1002static ssize_t metadata_bucket_flush(
1003 const struct stream_subbuffer *buffer, void *data)
1004{
1005 ssize_t ret;
1006 struct lttng_consumer_stream *stream = data;
1007
1008 ret = consumer_stream_consume_mmap(NULL, stream, buffer);
1009 if (ret < 0) {
1010 goto end;
1011 }
1012end:
1013 return ret;
1014}
1015
1016static ssize_t metadata_bucket_consume(
1017 struct lttng_consumer_local_data *unused,
1018 struct lttng_consumer_stream *stream,
1019 const struct stream_subbuffer *subbuffer)
1020{
1021 ssize_t ret;
1022 enum metadata_bucket_status status;
1023
1024 status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
1025 switch (status) {
1026 case METADATA_BUCKET_STATUS_OK:
1027 /* Return consumed size. */
1028 ret = subbuffer->buffer.buffer.size;
1029 break;
1030 default:
1031 ret = -1;
1032 }
1033
1034 return ret;
1035}
1036
1037int consumer_stream_enable_metadata_bucketization(
1038 struct lttng_consumer_stream *stream)
1039{
1040 int ret = 0;
1041
1042 assert(stream->metadata_flag);
1043 assert(!stream->metadata_bucket);
1044 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1045
1046 stream->metadata_bucket = metadata_bucket_create(
1047 metadata_bucket_flush, stream);
1048 if (!stream->metadata_bucket) {
1049 ret = -1;
1050 goto end;
1051 }
1052
1053 stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
1054end:
1055 return ret;
1056}
55954e07
JG
1057
1058void consumer_stream_metadata_set_version(
1059 struct lttng_consumer_stream *stream, uint64_t new_version)
1060{
1061 assert(new_version > stream->metadata_version);
1062 stream->metadata_version = new_version;
1063 stream->reset_metadata_flag = 1;
1064
1065 if (stream->metadata_bucket) {
1066 metadata_bucket_reset(stream->metadata_bucket);
1067 }
1068}
This page took 0.088162 seconds and 4 git commands to generate.