Fix: kconsumer: missing wait for metadata thread in do_sync_metadata
[lttng-tools.git] / src / common / consumer / consumer-stream.c
1 /*
2 * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #define _LGPL_SOURCE
11 #include <assert.h>
12 #include <inttypes.h>
13 #include <sys/mman.h>
14 #include <unistd.h>
15
16 #include <common/common.h>
17 #include <common/index/index.h>
18 #include <common/kernel-consumer/kernel-consumer.h>
19 #include <common/relayd/relayd.h>
20 #include <common/ust-consumer/ust-consumer.h>
21 #include <common/utils.h>
22 #include <common/consumer/consumer.h>
23 #include <common/consumer/consumer-timer.h>
24 #include <common/consumer/metadata-bucket.h>
25
26 #include "consumer-stream.h"
27
28 /*
29 * RCU call to free stream. MUST only be used with call_rcu().
30 */
31 static void free_stream_rcu(struct rcu_head *head)
32 {
33 struct lttng_ht_node_u64 *node =
34 caa_container_of(head, struct lttng_ht_node_u64, head);
35 struct lttng_consumer_stream *stream =
36 caa_container_of(node, struct lttng_consumer_stream, node);
37
38 pthread_mutex_destroy(&stream->lock);
39 free(stream);
40 }
41
42 static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream)
43 {
44 pthread_mutex_lock(&stream->chan->lock);
45 pthread_mutex_lock(&stream->lock);
46 }
47
48 static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream)
49 {
50 pthread_mutex_unlock(&stream->lock);
51 pthread_mutex_unlock(&stream->chan->lock);
52 }
53
54 static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
55 {
56 consumer_stream_data_lock_all(stream);
57 pthread_mutex_lock(&stream->metadata_rdv_lock);
58 }
59
60 static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream)
61 {
62 pthread_mutex_unlock(&stream->metadata_rdv_lock);
63 consumer_stream_data_unlock_all(stream);
64 }
65
66 /* Only used for data streams. */
67 static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
68 const struct stream_subbuffer *subbuf)
69 {
70 int ret = 0;
71 uint64_t sequence_number;
72 const uint64_t discarded_events = subbuf->info.data.events_discarded;
73
74 if (!subbuf->info.data.sequence_number.is_set) {
75 /* Command not supported by the tracer. */
76 sequence_number = -1ULL;
77 stream->sequence_number_unavailable = true;
78 } else {
79 sequence_number = subbuf->info.data.sequence_number.value;
80 }
81
82 /*
83 * Start the sequence when we extract the first packet in case we don't
84 * start at 0 (for example if a consumer is not connected to the
85 * session immediately after the beginning).
86 */
87 if (stream->last_sequence_number == -1ULL) {
88 stream->last_sequence_number = sequence_number;
89 } else if (sequence_number > stream->last_sequence_number) {
90 stream->chan->lost_packets += sequence_number -
91 stream->last_sequence_number - 1;
92 } else {
93 /* seq <= last_sequence_number */
94 ERR("Sequence number inconsistent : prev = %" PRIu64
95 ", current = %" PRIu64,
96 stream->last_sequence_number, sequence_number);
97 ret = -1;
98 goto end;
99 }
100 stream->last_sequence_number = sequence_number;
101
102 if (discarded_events < stream->last_discarded_events) {
103 /*
104 * Overflow has occurred. We assume only one wrap-around
105 * has occurred.
106 */
107 stream->chan->discarded_events +=
108 (1ULL << (CAA_BITS_PER_LONG - 1)) -
109 stream->last_discarded_events +
110 discarded_events;
111 } else {
112 stream->chan->discarded_events += discarded_events -
113 stream->last_discarded_events;
114 }
115 stream->last_discarded_events = discarded_events;
116 ret = 0;
117
118 end:
119 return ret;
120 }
121
122 static
123 void ctf_packet_index_populate(struct ctf_packet_index *index,
124 off_t offset, const struct stream_subbuffer *subbuffer)
125 {
126 *index = (typeof(*index)){
127 .offset = htobe64(offset),
128 .packet_size = htobe64(subbuffer->info.data.packet_size),
129 .content_size = htobe64(subbuffer->info.data.content_size),
130 .timestamp_begin = htobe64(
131 subbuffer->info.data.timestamp_begin),
132 .timestamp_end = htobe64(
133 subbuffer->info.data.timestamp_end),
134 .events_discarded = htobe64(
135 subbuffer->info.data.events_discarded),
136 .stream_id = htobe64(subbuffer->info.data.stream_id),
137 .stream_instance_id = htobe64(
138 subbuffer->info.data.stream_instance_id.is_set ?
139 subbuffer->info.data.stream_instance_id.value : -1ULL),
140 .packet_seq_num = htobe64(
141 subbuffer->info.data.sequence_number.is_set ?
142 subbuffer->info.data.sequence_number.value : -1ULL),
143 };
144 }
145
146 static ssize_t consumer_stream_consume_mmap(
147 struct lttng_consumer_local_data *ctx,
148 struct lttng_consumer_stream *stream,
149 const struct stream_subbuffer *subbuffer)
150 {
151 const unsigned long padding_size =
152 subbuffer->info.data.padded_subbuf_size -
153 subbuffer->info.data.subbuf_size;
154
155 return lttng_consumer_on_read_subbuffer_mmap(
156 stream, &subbuffer->buffer.buffer, padding_size);
157 }
158
159 static ssize_t consumer_stream_consume_splice(
160 struct lttng_consumer_local_data *ctx,
161 struct lttng_consumer_stream *stream,
162 const struct stream_subbuffer *subbuffer)
163 {
164 return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
165 subbuffer->info.data.padded_subbuf_size, 0);
166 }
167
168 static int consumer_stream_send_index(
169 struct lttng_consumer_stream *stream,
170 const struct stream_subbuffer *subbuffer,
171 struct lttng_consumer_local_data *ctx)
172 {
173 off_t packet_offset = 0;
174 struct ctf_packet_index index = {};
175
176 /*
177 * This is called after consuming the sub-buffer; substract the
178 * effect this sub-buffer from the offset.
179 */
180 if (stream->net_seq_idx == (uint64_t) -1ULL) {
181 packet_offset = stream->out_fd_offset -
182 subbuffer->info.data.padded_subbuf_size;
183 }
184
185 ctf_packet_index_populate(&index, packet_offset, subbuffer);
186 return consumer_stream_write_index(stream, &index);
187 }
188
189 /*
190 * Actually do the metadata sync using the given metadata stream.
191 *
192 * Return 0 on success else a negative value. ENODATA can be returned also
193 * indicating that there is no metadata available for that stream.
194 */
195 static int do_sync_metadata(struct lttng_consumer_stream *metadata,
196 struct lttng_consumer_local_data *ctx)
197 {
198 int ret;
199 enum sync_metadata_status status;
200
201 assert(metadata);
202 assert(metadata->metadata_flag);
203 assert(ctx);
204
205 /*
206 * In UST, since we have to write the metadata from the cache packet
207 * by packet, we might need to start this procedure multiple times
208 * until all the metadata from the cache has been extracted.
209 */
210 do {
211 /*
212 * Steps :
213 * - Lock the metadata stream
214 * - Check if metadata stream node was deleted before locking.
215 * - if yes, release and return success
216 * - Check if new metadata is ready (flush + snapshot pos)
217 * - If nothing : release and return.
218 * - Lock the metadata_rdv_lock
219 * - Unlock the metadata stream
220 * - cond_wait on metadata_rdv to wait the wakeup from the
221 * metadata thread
222 * - Unlock the metadata_rdv_lock
223 */
224 pthread_mutex_lock(&metadata->lock);
225
226 /*
227 * There is a possibility that we were able to acquire a reference on the
228 * stream from the RCU hash table but between then and now, the node might
229 * have been deleted just before the lock is acquired. Thus, after locking,
230 * we make sure the metadata node has not been deleted which means that the
231 * buffers are closed.
232 *
233 * In that case, there is no need to sync the metadata hence returning a
234 * success return code.
235 */
236 ret = cds_lfht_is_node_deleted(&metadata->node.node);
237 if (ret) {
238 ret = 0;
239 goto end_unlock_mutex;
240 }
241
242 switch (ctx->type) {
243 case LTTNG_CONSUMER_KERNEL:
244 /*
245 * Empty the metadata cache and flush the current stream.
246 */
247 status = lttng_kconsumer_sync_metadata(metadata);
248 break;
249 case LTTNG_CONSUMER32_UST:
250 case LTTNG_CONSUMER64_UST:
251 /*
252 * Ask the sessiond if we have new metadata waiting and update the
253 * consumer metadata cache.
254 */
255 status = lttng_ustconsumer_sync_metadata(ctx, metadata);
256 break;
257 default:
258 abort();
259 }
260
261 switch (status) {
262 case SYNC_METADATA_STATUS_NEW_DATA:
263 break;
264 case SYNC_METADATA_STATUS_NO_DATA:
265 ret = 0;
266 goto end_unlock_mutex;
267 case SYNC_METADATA_STATUS_ERROR:
268 ret = -1;
269 goto end_unlock_mutex;
270 default:
271 abort();
272 }
273
274 /*
275 * At this point, new metadata have been flushed, so we wait on the
276 * rendez-vous point for the metadata thread to wake us up when it
277 * finishes consuming the metadata and continue execution.
278 */
279
280 pthread_mutex_lock(&metadata->metadata_rdv_lock);
281
282 /*
283 * Release metadata stream lock so the metadata thread can process it.
284 */
285 pthread_mutex_unlock(&metadata->lock);
286
287 /*
288 * Wait on the rendez-vous point. Once woken up, it means the metadata was
289 * consumed and thus synchronization is achieved.
290 */
291 pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
292 pthread_mutex_unlock(&metadata->metadata_rdv_lock);
293 } while (status == SYNC_METADATA_STATUS_NEW_DATA);
294
295 /* Success */
296 return 0;
297
298 end_unlock_mutex:
299 pthread_mutex_unlock(&metadata->lock);
300 return ret;
301 }
302
303 /*
304 * Synchronize the metadata using a given session ID. A successful acquisition
305 * of a metadata stream will trigger a request to the session daemon and a
306 * snapshot so the metadata thread can consume it.
307 *
308 * This function call is a rendez-vous point between the metadata thread and
309 * the data thread.
310 *
311 * Return 0 on success or else a negative value.
312 */
313 int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
314 uint64_t session_id)
315 {
316 int ret;
317 struct lttng_consumer_stream *stream = NULL;
318 struct lttng_ht_iter iter;
319 struct lttng_ht *ht;
320
321 assert(ctx);
322
323 /* Ease our life a bit. */
324 ht = consumer_data.stream_list_ht;
325
326 rcu_read_lock();
327
328 /* Search the metadata associated with the session id of the given stream. */
329
330 cds_lfht_for_each_entry_duplicate(ht->ht,
331 ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
332 &session_id, &iter.iter, stream, node_session_id.node) {
333 if (!stream->metadata_flag) {
334 continue;
335 }
336
337 ret = do_sync_metadata(stream, ctx);
338 if (ret < 0) {
339 goto end;
340 }
341 }
342
343 /*
344 * Force return code to 0 (success) since ret might be ENODATA for instance
345 * which is not an error but rather that we should come back.
346 */
347 ret = 0;
348
349 end:
350 rcu_read_unlock();
351 return ret;
352 }
353
354 static int consumer_stream_sync_metadata_index(
355 struct lttng_consumer_stream *stream,
356 const struct stream_subbuffer *subbuffer,
357 struct lttng_consumer_local_data *ctx)
358 {
359 int ret;
360
361 /* Block until all the metadata is sent. */
362 pthread_mutex_lock(&stream->metadata_timer_lock);
363 assert(!stream->missed_metadata_flush);
364 stream->waiting_on_metadata = true;
365 pthread_mutex_unlock(&stream->metadata_timer_lock);
366
367 ret = consumer_stream_sync_metadata(ctx, stream->session_id);
368
369 pthread_mutex_lock(&stream->metadata_timer_lock);
370 stream->waiting_on_metadata = false;
371 if (stream->missed_metadata_flush) {
372 stream->missed_metadata_flush = false;
373 pthread_mutex_unlock(&stream->metadata_timer_lock);
374 (void) stream->read_subbuffer_ops.send_live_beacon(stream);
375 } else {
376 pthread_mutex_unlock(&stream->metadata_timer_lock);
377 }
378 if (ret < 0) {
379 goto end;
380 }
381
382 ret = consumer_stream_send_index(stream, subbuffer, ctx);
383 end:
384 return ret;
385 }
386
387 /*
388 * Check if the local version of the metadata stream matches with the version
389 * of the metadata stream in the kernel. If it was updated, set the reset flag
390 * on the stream.
391 */
392 static
393 int metadata_stream_check_version(struct lttng_consumer_stream *stream,
394 const struct stream_subbuffer *subbuffer)
395 {
396 if (stream->metadata_version == subbuffer->info.metadata.version) {
397 goto end;
398 }
399
400 DBG("New metadata version detected");
401 consumer_stream_metadata_set_version(stream,
402 subbuffer->info.metadata.version);
403
404 if (stream->read_subbuffer_ops.reset_metadata) {
405 stream->read_subbuffer_ops.reset_metadata(stream);
406 }
407
408 end:
409 return 0;
410 }
411
412 struct lttng_consumer_stream *consumer_stream_create(
413 struct lttng_consumer_channel *channel,
414 uint64_t channel_key,
415 uint64_t stream_key,
416 const char *channel_name,
417 uint64_t relayd_id,
418 uint64_t session_id,
419 struct lttng_trace_chunk *trace_chunk,
420 int cpu,
421 int *alloc_ret,
422 enum consumer_channel_type type,
423 unsigned int monitor)
424 {
425 int ret;
426 struct lttng_consumer_stream *stream;
427
428 stream = zmalloc(sizeof(*stream));
429 if (stream == NULL) {
430 PERROR("malloc struct lttng_consumer_stream");
431 ret = -ENOMEM;
432 goto end;
433 }
434
435 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
436 ERR("Failed to acquire trace chunk reference during the creation of a stream");
437 ret = -1;
438 goto error;
439 }
440
441 rcu_read_lock();
442 stream->chan = channel;
443 stream->key = stream_key;
444 stream->trace_chunk = trace_chunk;
445 stream->out_fd = -1;
446 stream->out_fd_offset = 0;
447 stream->output_written = 0;
448 stream->net_seq_idx = relayd_id;
449 stream->session_id = session_id;
450 stream->monitor = monitor;
451 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
452 stream->index_file = NULL;
453 stream->last_sequence_number = -1ULL;
454 stream->rotate_position = -1ULL;
455 pthread_mutex_init(&stream->lock, NULL);
456 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
457
458 /* If channel is the metadata, flag this stream as metadata. */
459 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
460 stream->metadata_flag = 1;
461 /* Metadata is flat out. */
462 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
463 /* Live rendez-vous point. */
464 pthread_cond_init(&stream->metadata_rdv, NULL);
465 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
466 } else {
467 /* Format stream name to <channel_name>_<cpu_number> */
468 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
469 channel_name, cpu);
470 if (ret < 0) {
471 PERROR("snprintf stream name");
472 goto error;
473 }
474 }
475
476 switch (channel->output) {
477 case CONSUMER_CHANNEL_SPLICE:
478 stream->output = LTTNG_EVENT_SPLICE;
479 ret = utils_create_pipe(stream->splice_pipe);
480 if (ret < 0) {
481 goto error;
482 }
483 break;
484 case CONSUMER_CHANNEL_MMAP:
485 stream->output = LTTNG_EVENT_MMAP;
486 break;
487 default:
488 abort();
489 }
490
491 /* Key is always the wait_fd for streams. */
492 lttng_ht_node_init_u64(&stream->node, stream->key);
493
494 /* Init node per channel id key */
495 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
496
497 /* Init session id node with the stream session id */
498 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
499
500 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
501 " relayd_id %" PRIu64 ", session_id %" PRIu64,
502 stream->name, stream->key, channel_key,
503 stream->net_seq_idx, stream->session_id);
504
505 rcu_read_unlock();
506
507 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
508 stream->read_subbuffer_ops.lock =
509 consumer_stream_metadata_lock_all;
510 stream->read_subbuffer_ops.unlock =
511 consumer_stream_metadata_unlock_all;
512 stream->read_subbuffer_ops.pre_consume_subbuffer =
513 metadata_stream_check_version;
514 } else {
515 stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
516 stream->read_subbuffer_ops.unlock =
517 consumer_stream_data_unlock_all;
518 stream->read_subbuffer_ops.pre_consume_subbuffer =
519 consumer_stream_update_stats;
520 if (channel->is_live) {
521 stream->read_subbuffer_ops.post_consume =
522 consumer_stream_sync_metadata_index;
523 } else {
524 stream->read_subbuffer_ops.post_consume =
525 consumer_stream_send_index;
526 }
527 }
528
529 if (channel->output == CONSUMER_CHANNEL_MMAP) {
530 stream->read_subbuffer_ops.consume_subbuffer =
531 consumer_stream_consume_mmap;
532 } else {
533 stream->read_subbuffer_ops.consume_subbuffer =
534 consumer_stream_consume_splice;
535 }
536
537 return stream;
538
539 error:
540 rcu_read_unlock();
541 lttng_trace_chunk_put(stream->trace_chunk);
542 free(stream);
543 end:
544 if (alloc_ret) {
545 *alloc_ret = ret;
546 }
547 return NULL;
548 }
549
550 /*
551 * Close stream on the relayd side. This call can destroy a relayd if the
552 * conditions are met.
553 *
554 * A RCU read side lock MUST be acquired if the relayd object was looked up in
555 * a hash table before calling this.
556 */
557 void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
558 struct consumer_relayd_sock_pair *relayd)
559 {
560 int ret;
561
562 assert(stream);
563 assert(relayd);
564
565 if (stream->sent_to_relayd) {
566 uatomic_dec(&relayd->refcount);
567 assert(uatomic_read(&relayd->refcount) >= 0);
568 }
569
570 /* Closing streams requires to lock the control socket. */
571 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
572 ret = relayd_send_close_stream(&relayd->control_sock,
573 stream->relayd_stream_id,
574 stream->next_net_seq_num - 1);
575 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
576 if (ret < 0) {
577 ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
578 lttng_consumer_cleanup_relayd(relayd);
579 }
580
581 /* Both conditions are met, we destroy the relayd. */
582 if (uatomic_read(&relayd->refcount) == 0 &&
583 uatomic_read(&relayd->destroy_flag)) {
584 consumer_destroy_relayd(relayd);
585 }
586 stream->net_seq_idx = (uint64_t) -1ULL;
587 stream->sent_to_relayd = 0;
588 }
589
590 /*
591 * Close stream's file descriptors and, if needed, close stream also on the
592 * relayd side.
593 *
594 * The consumer data lock MUST be acquired.
595 * The stream lock MUST be acquired.
596 */
597 void consumer_stream_close(struct lttng_consumer_stream *stream)
598 {
599 int ret;
600 struct consumer_relayd_sock_pair *relayd;
601
602 assert(stream);
603
604 switch (consumer_data.type) {
605 case LTTNG_CONSUMER_KERNEL:
606 if (stream->mmap_base != NULL) {
607 ret = munmap(stream->mmap_base, stream->mmap_len);
608 if (ret != 0) {
609 PERROR("munmap");
610 }
611 }
612
613 if (stream->wait_fd >= 0) {
614 ret = close(stream->wait_fd);
615 if (ret) {
616 PERROR("close");
617 }
618 stream->wait_fd = -1;
619 }
620 if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
621 utils_close_pipe(stream->splice_pipe);
622 }
623 break;
624 case LTTNG_CONSUMER32_UST:
625 case LTTNG_CONSUMER64_UST:
626 {
627 /*
628 * Special case for the metadata since the wait fd is an internal pipe
629 * polled in the metadata thread.
630 */
631 if (stream->metadata_flag && stream->chan->monitor) {
632 int rpipe = stream->ust_metadata_poll_pipe[0];
633
634 /*
635 * This will stop the channel timer if one and close the write side
636 * of the metadata poll pipe.
637 */
638 lttng_ustconsumer_close_metadata(stream->chan);
639 if (rpipe >= 0) {
640 ret = close(rpipe);
641 if (ret < 0) {
642 PERROR("closing metadata pipe read side");
643 }
644 stream->ust_metadata_poll_pipe[0] = -1;
645 }
646 }
647 break;
648 }
649 default:
650 ERR("Unknown consumer_data type");
651 assert(0);
652 }
653
654 /* Close output fd. Could be a socket or local file at this point. */
655 if (stream->out_fd >= 0) {
656 ret = close(stream->out_fd);
657 if (ret) {
658 PERROR("close");
659 }
660 stream->out_fd = -1;
661 }
662
663 if (stream->index_file) {
664 lttng_index_file_put(stream->index_file);
665 stream->index_file = NULL;
666 }
667
668 lttng_trace_chunk_put(stream->trace_chunk);
669 stream->trace_chunk = NULL;
670
671 /* Check and cleanup relayd if needed. */
672 rcu_read_lock();
673 relayd = consumer_find_relayd(stream->net_seq_idx);
674 if (relayd != NULL) {
675 consumer_stream_relayd_close(stream, relayd);
676 }
677 rcu_read_unlock();
678 }
679
680 /*
681 * Delete the stream from all possible hash tables.
682 *
683 * The consumer data lock MUST be acquired.
684 * The stream lock MUST be acquired.
685 */
686 void consumer_stream_delete(struct lttng_consumer_stream *stream,
687 struct lttng_ht *ht)
688 {
689 int ret;
690 struct lttng_ht_iter iter;
691
692 assert(stream);
693 /* Should NEVER be called not in monitor mode. */
694 assert(stream->chan->monitor);
695
696 rcu_read_lock();
697
698 if (ht) {
699 iter.iter.node = &stream->node.node;
700 ret = lttng_ht_del(ht, &iter);
701 assert(!ret);
702 }
703
704 /* Delete from stream per channel ID hash table. */
705 iter.iter.node = &stream->node_channel_id.node;
706 /*
707 * The returned value is of no importance. Even if the node is NOT in the
708 * hash table, we continue since we may have been called by a code path
709 * that did not add the stream to a (all) hash table. Same goes for the
710 * next call ht del call.
711 */
712 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
713
714 /* Delete from the global stream list. */
715 iter.iter.node = &stream->node_session_id.node;
716 /* See the previous ht del on why we ignore the returned value. */
717 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
718
719 rcu_read_unlock();
720
721 if (!stream->metadata_flag) {
722 /* Decrement the stream count of the global consumer data. */
723 assert(consumer_data.stream_count > 0);
724 consumer_data.stream_count--;
725 }
726 }
727
728 /*
729 * Free the given stream within a RCU call.
730 */
731 void consumer_stream_free(struct lttng_consumer_stream *stream)
732 {
733 assert(stream);
734
735 metadata_bucket_destroy(stream->metadata_bucket);
736 call_rcu(&stream->node.head, free_stream_rcu);
737 }
738
739 /*
740 * Destroy the stream's buffers of the tracer.
741 */
742 void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
743 {
744 assert(stream);
745
746 switch (consumer_data.type) {
747 case LTTNG_CONSUMER_KERNEL:
748 break;
749 case LTTNG_CONSUMER32_UST:
750 case LTTNG_CONSUMER64_UST:
751 lttng_ustconsumer_del_stream(stream);
752 break;
753 default:
754 ERR("Unknown consumer_data type");
755 assert(0);
756 }
757 }
758
759 /*
760 * Destroy and close a already created stream.
761 */
762 static void destroy_close_stream(struct lttng_consumer_stream *stream)
763 {
764 assert(stream);
765
766 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
767
768 /* Destroy tracer buffers of the stream. */
769 consumer_stream_destroy_buffers(stream);
770 /* Close down everything including the relayd if one. */
771 consumer_stream_close(stream);
772 }
773
774 /*
775 * Decrement the stream's channel refcount and if down to 0, return the channel
776 * pointer so it can be destroyed by the caller or NULL if not.
777 */
778 static struct lttng_consumer_channel *unref_channel(
779 struct lttng_consumer_stream *stream)
780 {
781 struct lttng_consumer_channel *free_chan = NULL;
782
783 assert(stream);
784 assert(stream->chan);
785
786 /* Update refcount of channel and see if we need to destroy it. */
787 if (!uatomic_sub_return(&stream->chan->refcount, 1)
788 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
789 free_chan = stream->chan;
790 }
791
792 return free_chan;
793 }
794
795 /*
796 * Destroy a stream completely. This will delete, close and free the stream.
797 * Once return, the stream is NO longer usable. Its channel may get destroyed
798 * if conditions are met for a monitored stream.
799 *
800 * This MUST be called WITHOUT the consumer data and stream lock acquired if
801 * the stream is in _monitor_ mode else it does not matter.
802 */
803 void consumer_stream_destroy(struct lttng_consumer_stream *stream,
804 struct lttng_ht *ht)
805 {
806 assert(stream);
807
808 /* Stream is in monitor mode. */
809 if (stream->monitor) {
810 struct lttng_consumer_channel *free_chan = NULL;
811
812 /*
813 * This means that the stream was successfully removed from the streams
814 * list of the channel and sent to the right thread managing this
815 * stream thus being globally visible.
816 */
817 if (stream->globally_visible) {
818 pthread_mutex_lock(&consumer_data.lock);
819 pthread_mutex_lock(&stream->chan->lock);
820 pthread_mutex_lock(&stream->lock);
821 /* Remove every reference of the stream in the consumer. */
822 consumer_stream_delete(stream, ht);
823
824 destroy_close_stream(stream);
825
826 /* Update channel's refcount of the stream. */
827 free_chan = unref_channel(stream);
828
829 /* Indicates that the consumer data state MUST be updated after this. */
830 consumer_data.need_update = 1;
831
832 pthread_mutex_unlock(&stream->lock);
833 pthread_mutex_unlock(&stream->chan->lock);
834 pthread_mutex_unlock(&consumer_data.lock);
835 } else {
836 /*
837 * If the stream is not visible globally, this needs to be done
838 * outside of the consumer data lock section.
839 */
840 free_chan = unref_channel(stream);
841 }
842
843 if (free_chan) {
844 consumer_del_channel(free_chan);
845 }
846 } else {
847 destroy_close_stream(stream);
848 }
849
850 /* Free stream within a RCU call. */
851 lttng_trace_chunk_put(stream->trace_chunk);
852 stream->trace_chunk = NULL;
853 consumer_stream_free(stream);
854 }
855
856 /*
857 * Write index of a specific stream either on the relayd or local disk.
858 *
859 * Return 0 on success or else a negative value.
860 */
861 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
862 struct ctf_packet_index *element)
863 {
864 int ret;
865
866 assert(stream);
867 assert(element);
868
869 rcu_read_lock();
870 if (stream->net_seq_idx != (uint64_t) -1ULL) {
871 struct consumer_relayd_sock_pair *relayd;
872 relayd = consumer_find_relayd(stream->net_seq_idx);
873 if (relayd) {
874 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
875 ret = relayd_send_index(&relayd->control_sock, element,
876 stream->relayd_stream_id, stream->next_net_seq_num - 1);
877 if (ret < 0) {
878 /*
879 * Communication error with lttng-relayd,
880 * perform cleanup now
881 */
882 ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
883 lttng_consumer_cleanup_relayd(relayd);
884 ret = -1;
885 }
886 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
887 } else {
888 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
889 stream->key, stream->net_seq_idx);
890 ret = -1;
891 }
892 } else {
893 if (lttng_index_file_write(stream->index_file, element)) {
894 ret = -1;
895 } else {
896 ret = 0;
897 }
898 }
899 if (ret < 0) {
900 goto error;
901 }
902
903 error:
904 rcu_read_unlock();
905 return ret;
906 }
907
908 int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
909 bool create_index)
910 {
911 int ret;
912 enum lttng_trace_chunk_status chunk_status;
913 const int flags = O_WRONLY | O_CREAT | O_TRUNC;
914 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
915 char stream_path[LTTNG_PATH_MAX];
916
917 ASSERT_LOCKED(stream->lock);
918 assert(stream->trace_chunk);
919
920 ret = utils_stream_file_path(stream->chan->pathname, stream->name,
921 stream->chan->tracefile_size,
922 stream->tracefile_count_current, NULL,
923 stream_path, sizeof(stream_path));
924 if (ret < 0) {
925 goto end;
926 }
927
928 if (stream->out_fd >= 0) {
929 ret = close(stream->out_fd);
930 if (ret < 0) {
931 PERROR("Failed to close stream file \"%s\"",
932 stream->name);
933 goto end;
934 }
935 stream->out_fd = -1;
936 }
937
938 DBG("Opening stream output file \"%s\"", stream_path);
939 chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
940 flags, mode, &stream->out_fd, false);
941 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
942 ERR("Failed to open stream file \"%s\"", stream->name);
943 ret = -1;
944 goto end;
945 }
946
947 if (!stream->metadata_flag && (create_index || stream->index_file)) {
948 if (stream->index_file) {
949 lttng_index_file_put(stream->index_file);
950 }
951 chunk_status = lttng_index_file_create_from_trace_chunk(
952 stream->trace_chunk,
953 stream->chan->pathname,
954 stream->name,
955 stream->chan->tracefile_size,
956 stream->tracefile_count_current,
957 CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
958 false, &stream->index_file);
959 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
960 ret = -1;
961 goto end;
962 }
963 }
964
965 /* Reset current size because we just perform a rotation. */
966 stream->tracefile_size_current = 0;
967 stream->out_fd_offset = 0;
968 end:
969 return ret;
970 }
971
972 int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
973 {
974 int ret;
975
976 stream->tracefile_count_current++;
977 if (stream->chan->tracefile_count > 0) {
978 stream->tracefile_count_current %=
979 stream->chan->tracefile_count;
980 }
981
982 DBG("Rotating output files of stream \"%s\"", stream->name);
983 ret = consumer_stream_create_output_files(stream, true);
984 if (ret) {
985 goto end;
986 }
987
988 end:
989 return ret;
990 }
991
992 bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
993 {
994 /*
995 * This function does not take a const stream since
996 * cds_lfht_is_node_deleted was not const before liburcu 0.12.
997 */
998 assert(stream);
999 return cds_lfht_is_node_deleted(&stream->node.node);
1000 }
1001
1002 static ssize_t metadata_bucket_flush(
1003 const struct stream_subbuffer *buffer, void *data)
1004 {
1005 ssize_t ret;
1006 struct lttng_consumer_stream *stream = data;
1007
1008 ret = consumer_stream_consume_mmap(NULL, stream, buffer);
1009 if (ret < 0) {
1010 goto end;
1011 }
1012 end:
1013 return ret;
1014 }
1015
1016 static ssize_t metadata_bucket_consume(
1017 struct lttng_consumer_local_data *unused,
1018 struct lttng_consumer_stream *stream,
1019 const struct stream_subbuffer *subbuffer)
1020 {
1021 ssize_t ret;
1022 enum metadata_bucket_status status;
1023
1024 status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
1025 switch (status) {
1026 case METADATA_BUCKET_STATUS_OK:
1027 /* Return consumed size. */
1028 ret = subbuffer->buffer.buffer.size;
1029 break;
1030 default:
1031 ret = -1;
1032 }
1033
1034 return ret;
1035 }
1036
1037 int consumer_stream_enable_metadata_bucketization(
1038 struct lttng_consumer_stream *stream)
1039 {
1040 int ret = 0;
1041
1042 assert(stream->metadata_flag);
1043 assert(!stream->metadata_bucket);
1044 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1045
1046 stream->metadata_bucket = metadata_bucket_create(
1047 metadata_bucket_flush, stream);
1048 if (!stream->metadata_bucket) {
1049 ret = -1;
1050 goto end;
1051 }
1052
1053 stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
1054 end:
1055 return ret;
1056 }
1057
1058 void consumer_stream_metadata_set_version(
1059 struct lttng_consumer_stream *stream, uint64_t new_version)
1060 {
1061 assert(new_version > stream->metadata_version);
1062 stream->metadata_version = new_version;
1063 stream->reset_metadata_flag = 1;
1064
1065 if (stream->metadata_bucket) {
1066 metadata_bucket_reset(stream->metadata_bucket);
1067 }
1068 }
This page took 0.084916 seconds and 4 git commands to generate.