Fix: post-clear trace chunk has a late beginning packet
[lttng-tools.git] / src / common / consumer / consumer-stream.c
CommitLineData
51230d70 1/*
ab5be9fa
MJ
2 * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
51230d70 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
51230d70 7 *
51230d70
DG
8 */
9
6c1c0768 10#define _LGPL_SOURCE
51230d70 11#include <assert.h>
10a50311 12#include <inttypes.h>
51230d70
DG
13#include <sys/mman.h>
14#include <unistd.h>
15
16#include <common/common.h>
1c20f0e2 17#include <common/index/index.h>
94d49140 18#include <common/kernel-consumer/kernel-consumer.h>
51230d70
DG
19#include <common/relayd/relayd.h>
20#include <common/ust-consumer/ust-consumer.h>
a2361a61 21#include <common/utils.h>
6f9449c2
JG
22#include <common/consumer/consumer.h>
23#include <common/consumer/consumer-timer.h>
f5ba75b4 24#include <common/consumer/metadata-bucket.h>
51230d70
DG
25
26#include "consumer-stream.h"
27
28/*
29 * RCU call to free stream. MUST only be used with call_rcu().
30 */
31static void free_stream_rcu(struct rcu_head *head)
32{
33 struct lttng_ht_node_u64 *node =
34 caa_container_of(head, struct lttng_ht_node_u64, head);
35 struct lttng_consumer_stream *stream =
36 caa_container_of(node, struct lttng_consumer_stream, node);
37
38 pthread_mutex_destroy(&stream->lock);
39 free(stream);
40}
41
6f9449c2
JG
42static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream)
43{
44 pthread_mutex_lock(&stream->chan->lock);
45 pthread_mutex_lock(&stream->lock);
46}
47
48static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream)
49{
50 pthread_mutex_unlock(&stream->lock);
51 pthread_mutex_unlock(&stream->chan->lock);
52}
53
54static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
55{
56 consumer_stream_data_lock_all(stream);
57 pthread_mutex_lock(&stream->metadata_rdv_lock);
58}
59
60static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream)
61{
62 pthread_mutex_unlock(&stream->metadata_rdv_lock);
63 consumer_stream_data_unlock_all(stream);
64}
65
66/* Only used for data streams. */
67static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
68 const struct stream_subbuffer *subbuf)
69{
70 int ret = 0;
71 uint64_t sequence_number;
d9b063d7 72 const uint64_t discarded_events = subbuf->info.data.events_discarded;
6f9449c2
JG
73
74 if (!subbuf->info.data.sequence_number.is_set) {
75 /* Command not supported by the tracer. */
76 sequence_number = -1ULL;
77 stream->sequence_number_unavailable = true;
78 } else {
79 sequence_number = subbuf->info.data.sequence_number.value;
80 }
81
82 /*
83 * Start the sequence when we extract the first packet in case we don't
84 * start at 0 (for example if a consumer is not connected to the
85 * session immediately after the beginning).
86 */
87 if (stream->last_sequence_number == -1ULL) {
88 stream->last_sequence_number = sequence_number;
89 } else if (sequence_number > stream->last_sequence_number) {
90 stream->chan->lost_packets += sequence_number -
91 stream->last_sequence_number - 1;
92 } else {
93 /* seq <= last_sequence_number */
94 ERR("Sequence number inconsistent : prev = %" PRIu64
95 ", current = %" PRIu64,
96 stream->last_sequence_number, sequence_number);
97 ret = -1;
98 goto end;
99 }
100 stream->last_sequence_number = sequence_number;
101
102 if (discarded_events < stream->last_discarded_events) {
103 /*
104 * Overflow has occurred. We assume only one wrap-around
105 * has occurred.
106 */
107 stream->chan->discarded_events +=
108 (1ULL << (CAA_BITS_PER_LONG - 1)) -
109 stream->last_discarded_events +
110 discarded_events;
111 } else {
112 stream->chan->discarded_events += discarded_events -
113 stream->last_discarded_events;
114 }
115 stream->last_discarded_events = discarded_events;
116 ret = 0;
117
118end:
119 return ret;
120}
121
122static
123void ctf_packet_index_populate(struct ctf_packet_index *index,
124 off_t offset, const struct stream_subbuffer *subbuffer)
125{
126 *index = (typeof(*index)){
127 .offset = htobe64(offset),
128 .packet_size = htobe64(subbuffer->info.data.packet_size),
129 .content_size = htobe64(subbuffer->info.data.content_size),
130 .timestamp_begin = htobe64(
131 subbuffer->info.data.timestamp_begin),
132 .timestamp_end = htobe64(
133 subbuffer->info.data.timestamp_end),
134 .events_discarded = htobe64(
135 subbuffer->info.data.events_discarded),
136 .stream_id = htobe64(subbuffer->info.data.stream_id),
137 .stream_instance_id = htobe64(
138 subbuffer->info.data.stream_instance_id.is_set ?
139 subbuffer->info.data.stream_instance_id.value : -1ULL),
140 .packet_seq_num = htobe64(
141 subbuffer->info.data.sequence_number.is_set ?
142 subbuffer->info.data.sequence_number.value : -1ULL),
143 };
144}
145
146static ssize_t consumer_stream_consume_mmap(
147 struct lttng_consumer_local_data *ctx,
148 struct lttng_consumer_stream *stream,
149 const struct stream_subbuffer *subbuffer)
150{
151 const unsigned long padding_size =
152 subbuffer->info.data.padded_subbuf_size -
153 subbuffer->info.data.subbuf_size;
154
155 return lttng_consumer_on_read_subbuffer_mmap(
f5ba75b4 156 stream, &subbuffer->buffer.buffer, padding_size);
6f9449c2
JG
157}
158
159static ssize_t consumer_stream_consume_splice(
160 struct lttng_consumer_local_data *ctx,
161 struct lttng_consumer_stream *stream,
162 const struct stream_subbuffer *subbuffer)
163{
164 return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
165 subbuffer->info.data.padded_subbuf_size, 0);
166}
167
168static int consumer_stream_send_index(
169 struct lttng_consumer_stream *stream,
170 const struct stream_subbuffer *subbuffer,
171 struct lttng_consumer_local_data *ctx)
172{
173 off_t packet_offset = 0;
174 struct ctf_packet_index index = {};
175
176 /*
177 * This is called after consuming the sub-buffer; substract the
178 * effect this sub-buffer from the offset.
179 */
180 if (stream->net_seq_idx == (uint64_t) -1ULL) {
181 packet_offset = stream->out_fd_offset -
182 subbuffer->info.data.padded_subbuf_size;
183 }
184
185 ctf_packet_index_populate(&index, packet_offset, subbuffer);
186 return consumer_stream_write_index(stream, &index);
187}
188
189/*
190 * Actually do the metadata sync using the given metadata stream.
191 *
192 * Return 0 on success else a negative value. ENODATA can be returned also
193 * indicating that there is no metadata available for that stream.
194 */
195static int do_sync_metadata(struct lttng_consumer_stream *metadata,
196 struct lttng_consumer_local_data *ctx)
197{
198 int ret;
577eea73 199 enum sync_metadata_status status;
6f9449c2
JG
200
201 assert(metadata);
202 assert(metadata->metadata_flag);
203 assert(ctx);
204
205 /*
206 * In UST, since we have to write the metadata from the cache packet
207 * by packet, we might need to start this procedure multiple times
208 * until all the metadata from the cache has been extracted.
209 */
210 do {
211 /*
212 * Steps :
213 * - Lock the metadata stream
214 * - Check if metadata stream node was deleted before locking.
215 * - if yes, release and return success
216 * - Check if new metadata is ready (flush + snapshot pos)
217 * - If nothing : release and return.
218 * - Lock the metadata_rdv_lock
219 * - Unlock the metadata stream
220 * - cond_wait on metadata_rdv to wait the wakeup from the
221 * metadata thread
222 * - Unlock the metadata_rdv_lock
223 */
224 pthread_mutex_lock(&metadata->lock);
225
226 /*
227 * There is a possibility that we were able to acquire a reference on the
228 * stream from the RCU hash table but between then and now, the node might
229 * have been deleted just before the lock is acquired. Thus, after locking,
230 * we make sure the metadata node has not been deleted which means that the
231 * buffers are closed.
232 *
233 * In that case, there is no need to sync the metadata hence returning a
234 * success return code.
235 */
236 ret = cds_lfht_is_node_deleted(&metadata->node.node);
237 if (ret) {
238 ret = 0;
239 goto end_unlock_mutex;
240 }
241
242 switch (ctx->type) {
243 case LTTNG_CONSUMER_KERNEL:
244 /*
245 * Empty the metadata cache and flush the current stream.
246 */
577eea73 247 status = lttng_kconsumer_sync_metadata(metadata);
6f9449c2
JG
248 break;
249 case LTTNG_CONSUMER32_UST:
250 case LTTNG_CONSUMER64_UST:
251 /*
252 * Ask the sessiond if we have new metadata waiting and update the
253 * consumer metadata cache.
254 */
577eea73 255 status = lttng_ustconsumer_sync_metadata(ctx, metadata);
6f9449c2
JG
256 break;
257 default:
577eea73 258 abort();
6f9449c2 259 }
577eea73
JG
260
261 switch (status) {
262 case SYNC_METADATA_STATUS_NEW_DATA:
263 break;
264 case SYNC_METADATA_STATUS_NO_DATA:
265 ret = 0;
6f9449c2 266 goto end_unlock_mutex;
577eea73
JG
267 case SYNC_METADATA_STATUS_ERROR:
268 ret = -1;
269 goto end_unlock_mutex;
270 default:
271 abort();
6f9449c2
JG
272 }
273
274 /*
275 * At this point, new metadata have been flushed, so we wait on the
276 * rendez-vous point for the metadata thread to wake us up when it
277 * finishes consuming the metadata and continue execution.
278 */
279
280 pthread_mutex_lock(&metadata->metadata_rdv_lock);
281
282 /*
283 * Release metadata stream lock so the metadata thread can process it.
284 */
285 pthread_mutex_unlock(&metadata->lock);
286
287 /*
288 * Wait on the rendez-vous point. Once woken up, it means the metadata was
289 * consumed and thus synchronization is achieved.
290 */
291 pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
292 pthread_mutex_unlock(&metadata->metadata_rdv_lock);
577eea73 293 } while (status == SYNC_METADATA_STATUS_NEW_DATA);
6f9449c2
JG
294
295 /* Success */
296 return 0;
297
298end_unlock_mutex:
299 pthread_mutex_unlock(&metadata->lock);
300 return ret;
301}
302
303/*
304 * Synchronize the metadata using a given session ID. A successful acquisition
305 * of a metadata stream will trigger a request to the session daemon and a
306 * snapshot so the metadata thread can consume it.
307 *
308 * This function call is a rendez-vous point between the metadata thread and
309 * the data thread.
310 *
311 * Return 0 on success or else a negative value.
312 */
313int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
314 uint64_t session_id)
315{
316 int ret;
317 struct lttng_consumer_stream *stream = NULL;
318 struct lttng_ht_iter iter;
319 struct lttng_ht *ht;
320
321 assert(ctx);
322
323 /* Ease our life a bit. */
324 ht = consumer_data.stream_list_ht;
325
326 rcu_read_lock();
327
328 /* Search the metadata associated with the session id of the given stream. */
329
330 cds_lfht_for_each_entry_duplicate(ht->ht,
331 ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
332 &session_id, &iter.iter, stream, node_session_id.node) {
333 if (!stream->metadata_flag) {
334 continue;
335 }
336
337 ret = do_sync_metadata(stream, ctx);
338 if (ret < 0) {
339 goto end;
340 }
341 }
342
343 /*
344 * Force return code to 0 (success) since ret might be ENODATA for instance
345 * which is not an error but rather that we should come back.
346 */
347 ret = 0;
348
349end:
350 rcu_read_unlock();
351 return ret;
352}
353
354static int consumer_stream_sync_metadata_index(
355 struct lttng_consumer_stream *stream,
356 const struct stream_subbuffer *subbuffer,
357 struct lttng_consumer_local_data *ctx)
358{
359 int ret;
360
361 /* Block until all the metadata is sent. */
362 pthread_mutex_lock(&stream->metadata_timer_lock);
363 assert(!stream->missed_metadata_flush);
364 stream->waiting_on_metadata = true;
365 pthread_mutex_unlock(&stream->metadata_timer_lock);
366
367 ret = consumer_stream_sync_metadata(ctx, stream->session_id);
368
369 pthread_mutex_lock(&stream->metadata_timer_lock);
370 stream->waiting_on_metadata = false;
371 if (stream->missed_metadata_flush) {
372 stream->missed_metadata_flush = false;
373 pthread_mutex_unlock(&stream->metadata_timer_lock);
374 (void) stream->read_subbuffer_ops.send_live_beacon(stream);
375 } else {
376 pthread_mutex_unlock(&stream->metadata_timer_lock);
377 }
378 if (ret < 0) {
379 goto end;
380 }
381
382 ret = consumer_stream_send_index(stream, subbuffer, ctx);
383end:
384 return ret;
385}
386
387/*
388 * Check if the local version of the metadata stream matches with the version
389 * of the metadata stream in the kernel. If it was updated, set the reset flag
390 * on the stream.
391 */
392static
393int metadata_stream_check_version(struct lttng_consumer_stream *stream,
394 const struct stream_subbuffer *subbuffer)
395{
396 if (stream->metadata_version == subbuffer->info.metadata.version) {
397 goto end;
398 }
399
400 DBG("New metadata version detected");
55954e07
JG
401 consumer_stream_metadata_set_version(stream,
402 subbuffer->info.metadata.version);
f5ba75b4 403
6f9449c2
JG
404 if (stream->read_subbuffer_ops.reset_metadata) {
405 stream->read_subbuffer_ops.reset_metadata(stream);
406 }
407
408end:
409 return 0;
410}
411
412struct lttng_consumer_stream *consumer_stream_create(
413 struct lttng_consumer_channel *channel,
414 uint64_t channel_key,
415 uint64_t stream_key,
416 const char *channel_name,
417 uint64_t relayd_id,
418 uint64_t session_id,
419 struct lttng_trace_chunk *trace_chunk,
420 int cpu,
421 int *alloc_ret,
422 enum consumer_channel_type type,
423 unsigned int monitor)
424{
425 int ret;
426 struct lttng_consumer_stream *stream;
427
428 stream = zmalloc(sizeof(*stream));
429 if (stream == NULL) {
430 PERROR("malloc struct lttng_consumer_stream");
431 ret = -ENOMEM;
432 goto end;
433 }
434
435 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
436 ERR("Failed to acquire trace chunk reference during the creation of a stream");
437 ret = -1;
438 goto error;
439 }
440
441 rcu_read_lock();
442 stream->chan = channel;
443 stream->key = stream_key;
444 stream->trace_chunk = trace_chunk;
445 stream->out_fd = -1;
446 stream->out_fd_offset = 0;
447 stream->output_written = 0;
448 stream->net_seq_idx = relayd_id;
449 stream->session_id = session_id;
450 stream->monitor = monitor;
451 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
452 stream->index_file = NULL;
453 stream->last_sequence_number = -1ULL;
454 stream->rotate_position = -1ULL;
f96af312
JG
455 /* Buffer is created with an open packet. */
456 stream->opened_packet_in_current_trace_chunk = true;
6f9449c2
JG
457 pthread_mutex_init(&stream->lock, NULL);
458 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
459
460 /* If channel is the metadata, flag this stream as metadata. */
461 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
462 stream->metadata_flag = 1;
463 /* Metadata is flat out. */
464 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
465 /* Live rendez-vous point. */
466 pthread_cond_init(&stream->metadata_rdv, NULL);
467 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
468 } else {
469 /* Format stream name to <channel_name>_<cpu_number> */
470 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
471 channel_name, cpu);
472 if (ret < 0) {
473 PERROR("snprintf stream name");
474 goto error;
475 }
476 }
477
478 switch (channel->output) {
479 case CONSUMER_CHANNEL_SPLICE:
480 stream->output = LTTNG_EVENT_SPLICE;
481 ret = utils_create_pipe(stream->splice_pipe);
482 if (ret < 0) {
483 goto error;
484 }
485 break;
486 case CONSUMER_CHANNEL_MMAP:
487 stream->output = LTTNG_EVENT_MMAP;
488 break;
489 default:
490 abort();
491 }
492
493 /* Key is always the wait_fd for streams. */
494 lttng_ht_node_init_u64(&stream->node, stream->key);
495
496 /* Init node per channel id key */
497 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
498
499 /* Init session id node with the stream session id */
500 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
501
502 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
503 " relayd_id %" PRIu64 ", session_id %" PRIu64,
504 stream->name, stream->key, channel_key,
505 stream->net_seq_idx, stream->session_id);
506
507 rcu_read_unlock();
508
509 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
510 stream->read_subbuffer_ops.lock =
511 consumer_stream_metadata_lock_all;
512 stream->read_subbuffer_ops.unlock =
513 consumer_stream_metadata_unlock_all;
514 stream->read_subbuffer_ops.pre_consume_subbuffer =
515 metadata_stream_check_version;
516 } else {
517 stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
518 stream->read_subbuffer_ops.unlock =
519 consumer_stream_data_unlock_all;
520 stream->read_subbuffer_ops.pre_consume_subbuffer =
521 consumer_stream_update_stats;
522 if (channel->is_live) {
523 stream->read_subbuffer_ops.post_consume =
524 consumer_stream_sync_metadata_index;
525 } else {
526 stream->read_subbuffer_ops.post_consume =
527 consumer_stream_send_index;
528 }
529 }
530
531 if (channel->output == CONSUMER_CHANNEL_MMAP) {
532 stream->read_subbuffer_ops.consume_subbuffer =
533 consumer_stream_consume_mmap;
534 } else {
535 stream->read_subbuffer_ops.consume_subbuffer =
536 consumer_stream_consume_splice;
537 }
538
539 return stream;
540
541error:
542 rcu_read_unlock();
543 lttng_trace_chunk_put(stream->trace_chunk);
544 free(stream);
545end:
546 if (alloc_ret) {
547 *alloc_ret = ret;
548 }
549 return NULL;
550}
551
51230d70
DG
552/*
553 * Close stream on the relayd side. This call can destroy a relayd if the
554 * conditions are met.
555 *
556 * A RCU read side lock MUST be acquired if the relayd object was looked up in
557 * a hash table before calling this.
558 */
559void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
560 struct consumer_relayd_sock_pair *relayd)
561{
562 int ret;
563
564 assert(stream);
565 assert(relayd);
566
d01178b6
DG
567 if (stream->sent_to_relayd) {
568 uatomic_dec(&relayd->refcount);
569 assert(uatomic_read(&relayd->refcount) >= 0);
570 }
51230d70
DG
571
572 /* Closing streams requires to lock the control socket. */
573 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
574 ret = relayd_send_close_stream(&relayd->control_sock,
575 stream->relayd_stream_id,
576 stream->next_net_seq_num - 1);
577 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
578 if (ret < 0) {
9276e5c8
JR
579 ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
580 lttng_consumer_cleanup_relayd(relayd);
51230d70
DG
581 }
582
583 /* Both conditions are met, we destroy the relayd. */
584 if (uatomic_read(&relayd->refcount) == 0 &&
585 uatomic_read(&relayd->destroy_flag)) {
586 consumer_destroy_relayd(relayd);
587 }
10a50311 588 stream->net_seq_idx = (uint64_t) -1ULL;
d01178b6 589 stream->sent_to_relayd = 0;
51230d70
DG
590}
591
592/*
593 * Close stream's file descriptors and, if needed, close stream also on the
594 * relayd side.
595 *
596 * The consumer data lock MUST be acquired.
597 * The stream lock MUST be acquired.
598 */
599void consumer_stream_close(struct lttng_consumer_stream *stream)
600{
601 int ret;
602 struct consumer_relayd_sock_pair *relayd;
603
604 assert(stream);
605
606 switch (consumer_data.type) {
607 case LTTNG_CONSUMER_KERNEL:
608 if (stream->mmap_base != NULL) {
609 ret = munmap(stream->mmap_base, stream->mmap_len);
610 if (ret != 0) {
611 PERROR("munmap");
612 }
613 }
614
615 if (stream->wait_fd >= 0) {
616 ret = close(stream->wait_fd);
617 if (ret) {
618 PERROR("close");
619 }
10a50311 620 stream->wait_fd = -1;
51230d70 621 }
a2361a61
JD
622 if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
623 utils_close_pipe(stream->splice_pipe);
624 }
51230d70
DG
625 break;
626 case LTTNG_CONSUMER32_UST:
627 case LTTNG_CONSUMER64_UST:
6d574024
DG
628 {
629 /*
630 * Special case for the metadata since the wait fd is an internal pipe
631 * polled in the metadata thread.
632 */
633 if (stream->metadata_flag && stream->chan->monitor) {
634 int rpipe = stream->ust_metadata_poll_pipe[0];
635
636 /*
637 * This will stop the channel timer if one and close the write side
638 * of the metadata poll pipe.
639 */
640 lttng_ustconsumer_close_metadata(stream->chan);
641 if (rpipe >= 0) {
642 ret = close(rpipe);
643 if (ret < 0) {
b4a650f3 644 PERROR("closing metadata pipe read side");
6d574024
DG
645 }
646 stream->ust_metadata_poll_pipe[0] = -1;
647 }
648 }
51230d70 649 break;
6d574024 650 }
51230d70
DG
651 default:
652 ERR("Unknown consumer_data type");
653 assert(0);
654 }
655
656 /* Close output fd. Could be a socket or local file at this point. */
657 if (stream->out_fd >= 0) {
658 ret = close(stream->out_fd);
659 if (ret) {
660 PERROR("close");
661 }
10a50311 662 stream->out_fd = -1;
51230d70
DG
663 }
664
f8f3885c
MD
665 if (stream->index_file) {
666 lttng_index_file_put(stream->index_file);
667 stream->index_file = NULL;
309167d2
JD
668 }
669
d2956687
JG
670 lttng_trace_chunk_put(stream->trace_chunk);
671 stream->trace_chunk = NULL;
672
51230d70
DG
673 /* Check and cleanup relayd if needed. */
674 rcu_read_lock();
675 relayd = consumer_find_relayd(stream->net_seq_idx);
676 if (relayd != NULL) {
677 consumer_stream_relayd_close(stream, relayd);
678 }
679 rcu_read_unlock();
680}
681
682/*
683 * Delete the stream from all possible hash tables.
684 *
685 * The consumer data lock MUST be acquired.
686 * The stream lock MUST be acquired.
687 */
688void consumer_stream_delete(struct lttng_consumer_stream *stream,
689 struct lttng_ht *ht)
690{
691 int ret;
692 struct lttng_ht_iter iter;
693
694 assert(stream);
10a50311
JD
695 /* Should NEVER be called not in monitor mode. */
696 assert(stream->chan->monitor);
51230d70
DG
697
698 rcu_read_lock();
699
700 if (ht) {
701 iter.iter.node = &stream->node.node;
702 ret = lttng_ht_del(ht, &iter);
703 assert(!ret);
704 }
705
706 /* Delete from stream per channel ID hash table. */
707 iter.iter.node = &stream->node_channel_id.node;
708 /*
709 * The returned value is of no importance. Even if the node is NOT in the
710 * hash table, we continue since we may have been called by a code path
711 * that did not add the stream to a (all) hash table. Same goes for the
712 * next call ht del call.
713 */
714 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
715
716 /* Delete from the global stream list. */
717 iter.iter.node = &stream->node_session_id.node;
718 /* See the previous ht del on why we ignore the returned value. */
719 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
720
721 rcu_read_unlock();
722
6d574024
DG
723 if (!stream->metadata_flag) {
724 /* Decrement the stream count of the global consumer data. */
725 assert(consumer_data.stream_count > 0);
726 consumer_data.stream_count--;
727 }
51230d70
DG
728}
729
730/*
731 * Free the given stream within a RCU call.
732 */
733void consumer_stream_free(struct lttng_consumer_stream *stream)
734{
735 assert(stream);
736
f5ba75b4 737 metadata_bucket_destroy(stream->metadata_bucket);
51230d70
DG
738 call_rcu(&stream->node.head, free_stream_rcu);
739}
740
741/*
10a50311 742 * Destroy the stream's buffers of the tracer.
51230d70 743 */
10a50311 744void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
51230d70 745{
10a50311
JD
746 assert(stream);
747
748 switch (consumer_data.type) {
749 case LTTNG_CONSUMER_KERNEL:
750 break;
751 case LTTNG_CONSUMER32_UST:
752 case LTTNG_CONSUMER64_UST:
753 lttng_ustconsumer_del_stream(stream);
754 break;
755 default:
756 ERR("Unknown consumer_data type");
757 assert(0);
758 }
759}
51230d70 760
10a50311 761/*
4891ece8 762 * Destroy and close a already created stream.
10a50311 763 */
4891ece8 764static void destroy_close_stream(struct lttng_consumer_stream *stream)
10a50311 765{
51230d70
DG
766 assert(stream);
767
4891ece8 768 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
10a50311
JD
769
770 /* Destroy tracer buffers of the stream. */
771 consumer_stream_destroy_buffers(stream);
772 /* Close down everything including the relayd if one. */
773 consumer_stream_close(stream);
774}
51230d70 775
10a50311 776/*
4891ece8
DG
777 * Decrement the stream's channel refcount and if down to 0, return the channel
778 * pointer so it can be destroyed by the caller or NULL if not.
10a50311 779 */
4891ece8
DG
780static struct lttng_consumer_channel *unref_channel(
781 struct lttng_consumer_stream *stream)
10a50311 782{
4891ece8
DG
783 struct lttng_consumer_channel *free_chan = NULL;
784
10a50311 785 assert(stream);
4891ece8 786 assert(stream->chan);
10a50311 787
4891ece8
DG
788 /* Update refcount of channel and see if we need to destroy it. */
789 if (!uatomic_sub_return(&stream->chan->refcount, 1)
790 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
791 free_chan = stream->chan;
792 }
51230d70 793
4891ece8 794 return free_chan;
10a50311 795}
51230d70 796
10a50311
JD
797/*
798 * Destroy a stream completely. This will delete, close and free the stream.
799 * Once return, the stream is NO longer usable. Its channel may get destroyed
800 * if conditions are met for a monitored stream.
801 *
802 * This MUST be called WITHOUT the consumer data and stream lock acquired if
803 * the stream is in _monitor_ mode else it does not matter.
804 */
805void consumer_stream_destroy(struct lttng_consumer_stream *stream,
806 struct lttng_ht *ht)
807{
808 assert(stream);
809
810 /* Stream is in monitor mode. */
4891ece8 811 if (stream->monitor) {
10a50311 812 struct lttng_consumer_channel *free_chan = NULL;
51230d70 813
4891ece8
DG
814 /*
815 * This means that the stream was successfully removed from the streams
816 * list of the channel and sent to the right thread managing this
817 * stream thus being globally visible.
818 */
819 if (stream->globally_visible) {
820 pthread_mutex_lock(&consumer_data.lock);
a9838785 821 pthread_mutex_lock(&stream->chan->lock);
4891ece8
DG
822 pthread_mutex_lock(&stream->lock);
823 /* Remove every reference of the stream in the consumer. */
824 consumer_stream_delete(stream, ht);
825
826 destroy_close_stream(stream);
827
828 /* Update channel's refcount of the stream. */
829 free_chan = unref_channel(stream);
830
831 /* Indicates that the consumer data state MUST be updated after this. */
832 consumer_data.need_update = 1;
833
834 pthread_mutex_unlock(&stream->lock);
a9838785 835 pthread_mutex_unlock(&stream->chan->lock);
4891ece8
DG
836 pthread_mutex_unlock(&consumer_data.lock);
837 } else {
838 /*
839 * If the stream is not visible globally, this needs to be done
840 * outside of the consumer data lock section.
841 */
842 free_chan = unref_channel(stream);
10a50311
JD
843 }
844
10a50311
JD
845 if (free_chan) {
846 consumer_del_channel(free_chan);
847 }
848 } else {
4891ece8 849 destroy_close_stream(stream);
51230d70
DG
850 }
851
852 /* Free stream within a RCU call. */
d2956687
JG
853 lttng_trace_chunk_put(stream->trace_chunk);
854 stream->trace_chunk = NULL;
51230d70
DG
855 consumer_stream_free(stream);
856}
1c20f0e2
JD
857
858/*
859 * Write index of a specific stream either on the relayd or local disk.
860 *
861 * Return 0 on success or else a negative value.
862 */
863int consumer_stream_write_index(struct lttng_consumer_stream *stream,
f8f3885c 864 struct ctf_packet_index *element)
1c20f0e2
JD
865{
866 int ret;
1c20f0e2
JD
867
868 assert(stream);
f8f3885c 869 assert(element);
1c20f0e2
JD
870
871 rcu_read_lock();
23c910e5
JR
872 if (stream->net_seq_idx != (uint64_t) -1ULL) {
873 struct consumer_relayd_sock_pair *relayd;
874 relayd = consumer_find_relayd(stream->net_seq_idx);
875 if (relayd) {
876 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
877 ret = relayd_send_index(&relayd->control_sock, element,
1c20f0e2 878 stream->relayd_stream_id, stream->next_net_seq_num - 1);
9276e5c8
JR
879 if (ret < 0) {
880 /*
881 * Communication error with lttng-relayd,
882 * perform cleanup now
883 */
884 ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
885 lttng_consumer_cleanup_relayd(relayd);
886 ret = -1;
887 }
23c910e5
JR
888 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
889 } else {
890 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
891 stream->key, stream->net_seq_idx);
892 ret = -1;
893 }
1c20f0e2 894 } else {
f8f3885c 895 if (lttng_index_file_write(stream->index_file, element)) {
6cd525e8
MD
896 ret = -1;
897 } else {
898 ret = 0;
899 }
1c20f0e2
JD
900 }
901 if (ret < 0) {
902 goto error;
903 }
904
905error:
906 rcu_read_unlock();
907 return ret;
908}
94d49140 909
d2956687
JG
910int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
911 bool create_index)
912{
913 int ret;
914 enum lttng_trace_chunk_status chunk_status;
915 const int flags = O_WRONLY | O_CREAT | O_TRUNC;
916 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
917 char stream_path[LTTNG_PATH_MAX];
918
919 ASSERT_LOCKED(stream->lock);
920 assert(stream->trace_chunk);
921
922 ret = utils_stream_file_path(stream->chan->pathname, stream->name,
923 stream->chan->tracefile_size,
3b16476a 924 stream->tracefile_count_current, NULL,
d2956687
JG
925 stream_path, sizeof(stream_path));
926 if (ret < 0) {
927 goto end;
928 }
929
930 if (stream->out_fd >= 0) {
931 ret = close(stream->out_fd);
932 if (ret < 0) {
933 PERROR("Failed to close stream file \"%s\"",
934 stream->name);
935 goto end;
936 }
937 stream->out_fd = -1;
938 }
939
940 DBG("Opening stream output file \"%s\"", stream_path);
941 chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
3ff5c5db 942 flags, mode, &stream->out_fd, false);
d2956687
JG
943 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
944 ERR("Failed to open stream file \"%s\"", stream->name);
945 ret = -1;
946 goto end;
947 }
948
949 if (!stream->metadata_flag && (create_index || stream->index_file)) {
950 if (stream->index_file) {
951 lttng_index_file_put(stream->index_file);
952 }
3ff5c5db 953 chunk_status = lttng_index_file_create_from_trace_chunk(
d2956687
JG
954 stream->trace_chunk,
955 stream->chan->pathname,
956 stream->name,
957 stream->chan->tracefile_size,
958 stream->tracefile_count_current,
959 CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
3ff5c5db
MD
960 false, &stream->index_file);
961 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
d2956687
JG
962 ret = -1;
963 goto end;
964 }
965 }
966
967 /* Reset current size because we just perform a rotation. */
968 stream->tracefile_size_current = 0;
969 stream->out_fd_offset = 0;
970end:
971 return ret;
972}
973
974int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
975{
976 int ret;
977
978 stream->tracefile_count_current++;
979 if (stream->chan->tracefile_count > 0) {
980 stream->tracefile_count_current %=
981 stream->chan->tracefile_count;
982 }
983
984 DBG("Rotating output files of stream \"%s\"", stream->name);
985 ret = consumer_stream_create_output_files(stream, true);
986 if (ret) {
987 goto end;
988 }
989
990end:
991 return ret;
992}
cdb72e4e
JG
993
994bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
995{
996 /*
997 * This function does not take a const stream since
998 * cds_lfht_is_node_deleted was not const before liburcu 0.12.
999 */
1000 assert(stream);
1001 return cds_lfht_is_node_deleted(&stream->node.node);
1002}
f5ba75b4
JG
1003
1004static ssize_t metadata_bucket_flush(
1005 const struct stream_subbuffer *buffer, void *data)
1006{
1007 ssize_t ret;
1008 struct lttng_consumer_stream *stream = data;
1009
1010 ret = consumer_stream_consume_mmap(NULL, stream, buffer);
1011 if (ret < 0) {
1012 goto end;
1013 }
1014end:
1015 return ret;
1016}
1017
1018static ssize_t metadata_bucket_consume(
1019 struct lttng_consumer_local_data *unused,
1020 struct lttng_consumer_stream *stream,
1021 const struct stream_subbuffer *subbuffer)
1022{
1023 ssize_t ret;
1024 enum metadata_bucket_status status;
1025
1026 status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
1027 switch (status) {
1028 case METADATA_BUCKET_STATUS_OK:
1029 /* Return consumed size. */
1030 ret = subbuffer->buffer.buffer.size;
1031 break;
1032 default:
1033 ret = -1;
1034 }
1035
1036 return ret;
1037}
1038
1039int consumer_stream_enable_metadata_bucketization(
1040 struct lttng_consumer_stream *stream)
1041{
1042 int ret = 0;
1043
1044 assert(stream->metadata_flag);
1045 assert(!stream->metadata_bucket);
1046 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1047
1048 stream->metadata_bucket = metadata_bucket_create(
1049 metadata_bucket_flush, stream);
1050 if (!stream->metadata_bucket) {
1051 ret = -1;
1052 goto end;
1053 }
1054
1055 stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
1056end:
1057 return ret;
1058}
55954e07
JG
1059
1060void consumer_stream_metadata_set_version(
1061 struct lttng_consumer_stream *stream, uint64_t new_version)
1062{
1063 assert(new_version > stream->metadata_version);
1064 stream->metadata_version = new_version;
1065 stream->reset_metadata_flag = 1;
1066
1067 if (stream->metadata_bucket) {
1068 metadata_bucket_reset(stream->metadata_bucket);
1069 }
1070}
This page took 0.089125 seconds and 4 git commands to generate.