Fix: kconsumer: missing wait for metadata thread in do_sync_metadata
[lttng-tools.git] / src / common / consumer / consumer-stream.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 - David Goulet <dgoulet@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License, version 2 only, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #define _LGPL_SOURCE
21 #include <assert.h>
22 #include <inttypes.h>
23 #include <sys/mman.h>
24 #include <unistd.h>
25
26 #include <common/common.h>
27 #include <common/index/index.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/relayd/relayd.h>
30 #include <common/ust-consumer/ust-consumer.h>
31 #include <common/utils.h>
32 #include <common/consumer/consumer.h>
33 #include <common/consumer/consumer-timer.h>
34 #include <common/consumer/metadata-bucket.h>
35
36 #include "consumer-stream.h"
37
38 /*
39 * RCU call to free stream. MUST only be used with call_rcu().
40 */
41 static void free_stream_rcu(struct rcu_head *head)
42 {
43 struct lttng_ht_node_u64 *node =
44 caa_container_of(head, struct lttng_ht_node_u64, head);
45 struct lttng_consumer_stream *stream =
46 caa_container_of(node, struct lttng_consumer_stream, node);
47
48 pthread_mutex_destroy(&stream->lock);
49 free(stream);
50 }
51
52 static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream)
53 {
54 pthread_mutex_lock(&stream->chan->lock);
55 pthread_mutex_lock(&stream->lock);
56 }
57
58 static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream)
59 {
60 pthread_mutex_unlock(&stream->lock);
61 pthread_mutex_unlock(&stream->chan->lock);
62 }
63
64 static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
65 {
66 consumer_stream_data_lock_all(stream);
67 pthread_mutex_lock(&stream->metadata_rdv_lock);
68 }
69
70 static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream)
71 {
72 pthread_mutex_unlock(&stream->metadata_rdv_lock);
73 consumer_stream_data_unlock_all(stream);
74 }
75
76 /* Only used for data streams. */
77 static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
78 const struct stream_subbuffer *subbuf)
79 {
80 int ret = 0;
81 uint64_t sequence_number;
82 const uint64_t discarded_events = subbuf->info.data.events_discarded;
83
84 if (!subbuf->info.data.sequence_number.is_set) {
85 /* Command not supported by the tracer. */
86 sequence_number = -1ULL;
87 stream->sequence_number_unavailable = true;
88 } else {
89 sequence_number = subbuf->info.data.sequence_number.value;
90 }
91
92 /*
93 * Start the sequence when we extract the first packet in case we don't
94 * start at 0 (for example if a consumer is not connected to the
95 * session immediately after the beginning).
96 */
97 if (stream->last_sequence_number == -1ULL) {
98 stream->last_sequence_number = sequence_number;
99 } else if (sequence_number > stream->last_sequence_number) {
100 stream->chan->lost_packets += sequence_number -
101 stream->last_sequence_number - 1;
102 } else {
103 /* seq <= last_sequence_number */
104 ERR("Sequence number inconsistent : prev = %" PRIu64
105 ", current = %" PRIu64,
106 stream->last_sequence_number, sequence_number);
107 ret = -1;
108 goto end;
109 }
110 stream->last_sequence_number = sequence_number;
111
112 if (discarded_events < stream->last_discarded_events) {
113 /*
114 * Overflow has occurred. We assume only one wrap-around
115 * has occurred.
116 */
117 stream->chan->discarded_events +=
118 (1ULL << (CAA_BITS_PER_LONG - 1)) -
119 stream->last_discarded_events +
120 discarded_events;
121 } else {
122 stream->chan->discarded_events += discarded_events -
123 stream->last_discarded_events;
124 }
125 stream->last_discarded_events = discarded_events;
126 ret = 0;
127
128 end:
129 return ret;
130 }
131
132 static
133 void ctf_packet_index_populate(struct ctf_packet_index *index,
134 off_t offset, const struct stream_subbuffer *subbuffer)
135 {
136 *index = (typeof(*index)){
137 .offset = htobe64(offset),
138 .packet_size = htobe64(subbuffer->info.data.packet_size),
139 .content_size = htobe64(subbuffer->info.data.content_size),
140 .timestamp_begin = htobe64(
141 subbuffer->info.data.timestamp_begin),
142 .timestamp_end = htobe64(
143 subbuffer->info.data.timestamp_end),
144 .events_discarded = htobe64(
145 subbuffer->info.data.events_discarded),
146 .stream_id = htobe64(subbuffer->info.data.stream_id),
147 .stream_instance_id = htobe64(
148 subbuffer->info.data.stream_instance_id.is_set ?
149 subbuffer->info.data.stream_instance_id.value : -1ULL),
150 .packet_seq_num = htobe64(
151 subbuffer->info.data.sequence_number.is_set ?
152 subbuffer->info.data.sequence_number.value : -1ULL),
153 };
154 }
155
156 static ssize_t consumer_stream_consume_mmap(
157 struct lttng_consumer_local_data *ctx,
158 struct lttng_consumer_stream *stream,
159 const struct stream_subbuffer *subbuffer)
160 {
161 const unsigned long padding_size =
162 subbuffer->info.data.padded_subbuf_size -
163 subbuffer->info.data.subbuf_size;
164
165 return lttng_consumer_on_read_subbuffer_mmap(
166 stream, &subbuffer->buffer.buffer, padding_size);
167 }
168
169 static ssize_t consumer_stream_consume_splice(
170 struct lttng_consumer_local_data *ctx,
171 struct lttng_consumer_stream *stream,
172 const struct stream_subbuffer *subbuffer)
173 {
174 return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
175 subbuffer->info.data.padded_subbuf_size, 0);
176 }
177
178 static int consumer_stream_send_index(
179 struct lttng_consumer_stream *stream,
180 const struct stream_subbuffer *subbuffer,
181 struct lttng_consumer_local_data *ctx)
182 {
183 off_t packet_offset = 0;
184 struct ctf_packet_index index = {};
185
186 /*
187 * This is called after consuming the sub-buffer; substract the
188 * effect this sub-buffer from the offset.
189 */
190 if (stream->net_seq_idx == (uint64_t) -1ULL) {
191 packet_offset = stream->out_fd_offset -
192 subbuffer->info.data.padded_subbuf_size;
193 }
194
195 ctf_packet_index_populate(&index, packet_offset, subbuffer);
196 return consumer_stream_write_index(stream, &index);
197 }
198
199 /*
200 * Actually do the metadata sync using the given metadata stream.
201 *
202 * Return 0 on success else a negative value. ENODATA can be returned also
203 * indicating that there is no metadata available for that stream.
204 */
205 static int do_sync_metadata(struct lttng_consumer_stream *metadata,
206 struct lttng_consumer_local_data *ctx)
207 {
208 int ret;
209 enum sync_metadata_status status;
210
211 assert(metadata);
212 assert(metadata->metadata_flag);
213 assert(ctx);
214
215 /*
216 * In UST, since we have to write the metadata from the cache packet
217 * by packet, we might need to start this procedure multiple times
218 * until all the metadata from the cache has been extracted.
219 */
220 do {
221 /*
222 * Steps :
223 * - Lock the metadata stream
224 * - Check if metadata stream node was deleted before locking.
225 * - if yes, release and return success
226 * - Check if new metadata is ready (flush + snapshot pos)
227 * - If nothing : release and return.
228 * - Lock the metadata_rdv_lock
229 * - Unlock the metadata stream
230 * - cond_wait on metadata_rdv to wait the wakeup from the
231 * metadata thread
232 * - Unlock the metadata_rdv_lock
233 */
234 pthread_mutex_lock(&metadata->lock);
235
236 /*
237 * There is a possibility that we were able to acquire a reference on the
238 * stream from the RCU hash table but between then and now, the node might
239 * have been deleted just before the lock is acquired. Thus, after locking,
240 * we make sure the metadata node has not been deleted which means that the
241 * buffers are closed.
242 *
243 * In that case, there is no need to sync the metadata hence returning a
244 * success return code.
245 */
246 ret = cds_lfht_is_node_deleted(&metadata->node.node);
247 if (ret) {
248 ret = 0;
249 goto end_unlock_mutex;
250 }
251
252 switch (ctx->type) {
253 case LTTNG_CONSUMER_KERNEL:
254 /*
255 * Empty the metadata cache and flush the current stream.
256 */
257 status = lttng_kconsumer_sync_metadata(metadata);
258 break;
259 case LTTNG_CONSUMER32_UST:
260 case LTTNG_CONSUMER64_UST:
261 /*
262 * Ask the sessiond if we have new metadata waiting and update the
263 * consumer metadata cache.
264 */
265 status = lttng_ustconsumer_sync_metadata(ctx, metadata);
266 break;
267 default:
268 abort();
269 }
270
271 switch (status) {
272 case SYNC_METADATA_STATUS_NEW_DATA:
273 break;
274 case SYNC_METADATA_STATUS_NO_DATA:
275 ret = 0;
276 goto end_unlock_mutex;
277 case SYNC_METADATA_STATUS_ERROR:
278 ret = -1;
279 goto end_unlock_mutex;
280 default:
281 abort();
282 }
283
284 /*
285 * At this point, new metadata have been flushed, so we wait on the
286 * rendez-vous point for the metadata thread to wake us up when it
287 * finishes consuming the metadata and continue execution.
288 */
289
290 pthread_mutex_lock(&metadata->metadata_rdv_lock);
291
292 /*
293 * Release metadata stream lock so the metadata thread can process it.
294 */
295 pthread_mutex_unlock(&metadata->lock);
296
297 /*
298 * Wait on the rendez-vous point. Once woken up, it means the metadata was
299 * consumed and thus synchronization is achieved.
300 */
301 pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
302 pthread_mutex_unlock(&metadata->metadata_rdv_lock);
303 } while (status == SYNC_METADATA_STATUS_NEW_DATA);
304
305 /* Success */
306 return 0;
307
308 end_unlock_mutex:
309 pthread_mutex_unlock(&metadata->lock);
310 return ret;
311 }
312
313 /*
314 * Synchronize the metadata using a given session ID. A successful acquisition
315 * of a metadata stream will trigger a request to the session daemon and a
316 * snapshot so the metadata thread can consume it.
317 *
318 * This function call is a rendez-vous point between the metadata thread and
319 * the data thread.
320 *
321 * Return 0 on success or else a negative value.
322 */
323 int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
324 uint64_t session_id)
325 {
326 int ret;
327 struct lttng_consumer_stream *stream = NULL;
328 struct lttng_ht_iter iter;
329 struct lttng_ht *ht;
330
331 assert(ctx);
332
333 /* Ease our life a bit. */
334 ht = consumer_data.stream_list_ht;
335
336 rcu_read_lock();
337
338 /* Search the metadata associated with the session id of the given stream. */
339
340 cds_lfht_for_each_entry_duplicate(ht->ht,
341 ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
342 &session_id, &iter.iter, stream, node_session_id.node) {
343 if (!stream->metadata_flag) {
344 continue;
345 }
346
347 ret = do_sync_metadata(stream, ctx);
348 if (ret < 0) {
349 goto end;
350 }
351 }
352
353 /*
354 * Force return code to 0 (success) since ret might be ENODATA for instance
355 * which is not an error but rather that we should come back.
356 */
357 ret = 0;
358
359 end:
360 rcu_read_unlock();
361 return ret;
362 }
363
364 static int consumer_stream_sync_metadata_index(
365 struct lttng_consumer_stream *stream,
366 const struct stream_subbuffer *subbuffer,
367 struct lttng_consumer_local_data *ctx)
368 {
369 int ret;
370
371 /* Block until all the metadata is sent. */
372 pthread_mutex_lock(&stream->metadata_timer_lock);
373 assert(!stream->missed_metadata_flush);
374 stream->waiting_on_metadata = true;
375 pthread_mutex_unlock(&stream->metadata_timer_lock);
376
377 ret = consumer_stream_sync_metadata(ctx, stream->session_id);
378
379 pthread_mutex_lock(&stream->metadata_timer_lock);
380 stream->waiting_on_metadata = false;
381 if (stream->missed_metadata_flush) {
382 stream->missed_metadata_flush = false;
383 pthread_mutex_unlock(&stream->metadata_timer_lock);
384 (void) stream->read_subbuffer_ops.send_live_beacon(stream);
385 } else {
386 pthread_mutex_unlock(&stream->metadata_timer_lock);
387 }
388 if (ret < 0) {
389 goto end;
390 }
391
392 ret = consumer_stream_send_index(stream, subbuffer, ctx);
393 end:
394 return ret;
395 }
396
397 /*
398 * Check if the local version of the metadata stream matches with the version
399 * of the metadata stream in the kernel. If it was updated, set the reset flag
400 * on the stream.
401 */
402 static
403 int metadata_stream_check_version(struct lttng_consumer_stream *stream,
404 const struct stream_subbuffer *subbuffer)
405 {
406 if (stream->metadata_version == subbuffer->info.metadata.version) {
407 goto end;
408 }
409
410 DBG("New metadata version detected");
411 consumer_stream_metadata_set_version(stream,
412 subbuffer->info.metadata.version);
413
414 if (stream->read_subbuffer_ops.reset_metadata) {
415 stream->read_subbuffer_ops.reset_metadata(stream);
416 }
417
418 end:
419 return 0;
420 }
421
422 struct lttng_consumer_stream *consumer_stream_create(
423 struct lttng_consumer_channel *channel,
424 uint64_t channel_key,
425 uint64_t stream_key,
426 const char *channel_name,
427 uint64_t relayd_id,
428 uint64_t session_id,
429 struct lttng_trace_chunk *trace_chunk,
430 int cpu,
431 int *alloc_ret,
432 enum consumer_channel_type type,
433 unsigned int monitor)
434 {
435 int ret;
436 struct lttng_consumer_stream *stream;
437
438 stream = zmalloc(sizeof(*stream));
439 if (stream == NULL) {
440 PERROR("malloc struct lttng_consumer_stream");
441 ret = -ENOMEM;
442 goto end;
443 }
444
445 if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
446 ERR("Failed to acquire trace chunk reference during the creation of a stream");
447 ret = -1;
448 goto error;
449 }
450
451 rcu_read_lock();
452 stream->chan = channel;
453 stream->key = stream_key;
454 stream->trace_chunk = trace_chunk;
455 stream->out_fd = -1;
456 stream->out_fd_offset = 0;
457 stream->output_written = 0;
458 stream->net_seq_idx = relayd_id;
459 stream->session_id = session_id;
460 stream->monitor = monitor;
461 stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
462 stream->index_file = NULL;
463 stream->last_sequence_number = -1ULL;
464 stream->rotate_position = -1ULL;
465 pthread_mutex_init(&stream->lock, NULL);
466 pthread_mutex_init(&stream->metadata_timer_lock, NULL);
467
468 /* If channel is the metadata, flag this stream as metadata. */
469 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
470 stream->metadata_flag = 1;
471 /* Metadata is flat out. */
472 strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
473 /* Live rendez-vous point. */
474 pthread_cond_init(&stream->metadata_rdv, NULL);
475 pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
476 } else {
477 /* Format stream name to <channel_name>_<cpu_number> */
478 ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
479 channel_name, cpu);
480 if (ret < 0) {
481 PERROR("snprintf stream name");
482 goto error;
483 }
484 }
485
486 switch (channel->output) {
487 case CONSUMER_CHANNEL_SPLICE:
488 stream->output = LTTNG_EVENT_SPLICE;
489 ret = utils_create_pipe(stream->splice_pipe);
490 if (ret < 0) {
491 goto error;
492 }
493 break;
494 case CONSUMER_CHANNEL_MMAP:
495 stream->output = LTTNG_EVENT_MMAP;
496 break;
497 default:
498 abort();
499 }
500
501 /* Key is always the wait_fd for streams. */
502 lttng_ht_node_init_u64(&stream->node, stream->key);
503
504 /* Init node per channel id key */
505 lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
506
507 /* Init session id node with the stream session id */
508 lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
509
510 DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
511 " relayd_id %" PRIu64 ", session_id %" PRIu64,
512 stream->name, stream->key, channel_key,
513 stream->net_seq_idx, stream->session_id);
514
515 rcu_read_unlock();
516
517 if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
518 stream->read_subbuffer_ops.lock =
519 consumer_stream_metadata_lock_all;
520 stream->read_subbuffer_ops.unlock =
521 consumer_stream_metadata_unlock_all;
522 stream->read_subbuffer_ops.pre_consume_subbuffer =
523 metadata_stream_check_version;
524 } else {
525 stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
526 stream->read_subbuffer_ops.unlock =
527 consumer_stream_data_unlock_all;
528 stream->read_subbuffer_ops.pre_consume_subbuffer =
529 consumer_stream_update_stats;
530 if (channel->is_live) {
531 stream->read_subbuffer_ops.post_consume =
532 consumer_stream_sync_metadata_index;
533 } else {
534 stream->read_subbuffer_ops.post_consume =
535 consumer_stream_send_index;
536 }
537 }
538
539 if (channel->output == CONSUMER_CHANNEL_MMAP) {
540 stream->read_subbuffer_ops.consume_subbuffer =
541 consumer_stream_consume_mmap;
542 } else {
543 stream->read_subbuffer_ops.consume_subbuffer =
544 consumer_stream_consume_splice;
545 }
546
547 return stream;
548
549 error:
550 rcu_read_unlock();
551 lttng_trace_chunk_put(stream->trace_chunk);
552 free(stream);
553 end:
554 if (alloc_ret) {
555 *alloc_ret = ret;
556 }
557 return NULL;
558 }
559
560 /*
561 * Close stream on the relayd side. This call can destroy a relayd if the
562 * conditions are met.
563 *
564 * A RCU read side lock MUST be acquired if the relayd object was looked up in
565 * a hash table before calling this.
566 */
567 void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
568 struct consumer_relayd_sock_pair *relayd)
569 {
570 int ret;
571
572 assert(stream);
573 assert(relayd);
574
575 if (stream->sent_to_relayd) {
576 uatomic_dec(&relayd->refcount);
577 assert(uatomic_read(&relayd->refcount) >= 0);
578 }
579
580 /* Closing streams requires to lock the control socket. */
581 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
582 ret = relayd_send_close_stream(&relayd->control_sock,
583 stream->relayd_stream_id,
584 stream->next_net_seq_num - 1);
585 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
586 if (ret < 0) {
587 ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
588 lttng_consumer_cleanup_relayd(relayd);
589 }
590
591 /* Both conditions are met, we destroy the relayd. */
592 if (uatomic_read(&relayd->refcount) == 0 &&
593 uatomic_read(&relayd->destroy_flag)) {
594 consumer_destroy_relayd(relayd);
595 }
596 stream->net_seq_idx = (uint64_t) -1ULL;
597 stream->sent_to_relayd = 0;
598 }
599
600 /*
601 * Close stream's file descriptors and, if needed, close stream also on the
602 * relayd side.
603 *
604 * The consumer data lock MUST be acquired.
605 * The stream lock MUST be acquired.
606 */
607 void consumer_stream_close(struct lttng_consumer_stream *stream)
608 {
609 int ret;
610 struct consumer_relayd_sock_pair *relayd;
611
612 assert(stream);
613
614 switch (consumer_data.type) {
615 case LTTNG_CONSUMER_KERNEL:
616 if (stream->mmap_base != NULL) {
617 ret = munmap(stream->mmap_base, stream->mmap_len);
618 if (ret != 0) {
619 PERROR("munmap");
620 }
621 }
622
623 if (stream->wait_fd >= 0) {
624 ret = close(stream->wait_fd);
625 if (ret) {
626 PERROR("close");
627 }
628 stream->wait_fd = -1;
629 }
630 if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
631 utils_close_pipe(stream->splice_pipe);
632 }
633 break;
634 case LTTNG_CONSUMER32_UST:
635 case LTTNG_CONSUMER64_UST:
636 {
637 /*
638 * Special case for the metadata since the wait fd is an internal pipe
639 * polled in the metadata thread.
640 */
641 if (stream->metadata_flag && stream->chan->monitor) {
642 int rpipe = stream->ust_metadata_poll_pipe[0];
643
644 /*
645 * This will stop the channel timer if one and close the write side
646 * of the metadata poll pipe.
647 */
648 lttng_ustconsumer_close_metadata(stream->chan);
649 if (rpipe >= 0) {
650 ret = close(rpipe);
651 if (ret < 0) {
652 PERROR("closing metadata pipe read side");
653 }
654 stream->ust_metadata_poll_pipe[0] = -1;
655 }
656 }
657 break;
658 }
659 default:
660 ERR("Unknown consumer_data type");
661 assert(0);
662 }
663
664 /* Close output fd. Could be a socket or local file at this point. */
665 if (stream->out_fd >= 0) {
666 ret = close(stream->out_fd);
667 if (ret) {
668 PERROR("close");
669 }
670 stream->out_fd = -1;
671 }
672
673 if (stream->index_file) {
674 lttng_index_file_put(stream->index_file);
675 stream->index_file = NULL;
676 }
677
678 lttng_trace_chunk_put(stream->trace_chunk);
679 stream->trace_chunk = NULL;
680
681 /* Check and cleanup relayd if needed. */
682 rcu_read_lock();
683 relayd = consumer_find_relayd(stream->net_seq_idx);
684 if (relayd != NULL) {
685 consumer_stream_relayd_close(stream, relayd);
686 }
687 rcu_read_unlock();
688 }
689
690 /*
691 * Delete the stream from all possible hash tables.
692 *
693 * The consumer data lock MUST be acquired.
694 * The stream lock MUST be acquired.
695 */
696 void consumer_stream_delete(struct lttng_consumer_stream *stream,
697 struct lttng_ht *ht)
698 {
699 int ret;
700 struct lttng_ht_iter iter;
701
702 assert(stream);
703 /* Should NEVER be called not in monitor mode. */
704 assert(stream->chan->monitor);
705
706 rcu_read_lock();
707
708 if (ht) {
709 iter.iter.node = &stream->node.node;
710 ret = lttng_ht_del(ht, &iter);
711 assert(!ret);
712 }
713
714 /* Delete from stream per channel ID hash table. */
715 iter.iter.node = &stream->node_channel_id.node;
716 /*
717 * The returned value is of no importance. Even if the node is NOT in the
718 * hash table, we continue since we may have been called by a code path
719 * that did not add the stream to a (all) hash table. Same goes for the
720 * next call ht del call.
721 */
722 (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
723
724 /* Delete from the global stream list. */
725 iter.iter.node = &stream->node_session_id.node;
726 /* See the previous ht del on why we ignore the returned value. */
727 (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
728
729 rcu_read_unlock();
730
731 if (!stream->metadata_flag) {
732 /* Decrement the stream count of the global consumer data. */
733 assert(consumer_data.stream_count > 0);
734 consumer_data.stream_count--;
735 }
736 }
737
738 /*
739 * Free the given stream within a RCU call.
740 */
741 void consumer_stream_free(struct lttng_consumer_stream *stream)
742 {
743 assert(stream);
744
745 metadata_bucket_destroy(stream->metadata_bucket);
746 call_rcu(&stream->node.head, free_stream_rcu);
747 }
748
749 /*
750 * Destroy the stream's buffers of the tracer.
751 */
752 void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream)
753 {
754 assert(stream);
755
756 switch (consumer_data.type) {
757 case LTTNG_CONSUMER_KERNEL:
758 break;
759 case LTTNG_CONSUMER32_UST:
760 case LTTNG_CONSUMER64_UST:
761 lttng_ustconsumer_del_stream(stream);
762 break;
763 default:
764 ERR("Unknown consumer_data type");
765 assert(0);
766 }
767 }
768
769 /*
770 * Destroy and close a already created stream.
771 */
772 static void destroy_close_stream(struct lttng_consumer_stream *stream)
773 {
774 assert(stream);
775
776 DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key);
777
778 /* Destroy tracer buffers of the stream. */
779 consumer_stream_destroy_buffers(stream);
780 /* Close down everything including the relayd if one. */
781 consumer_stream_close(stream);
782 }
783
784 /*
785 * Decrement the stream's channel refcount and if down to 0, return the channel
786 * pointer so it can be destroyed by the caller or NULL if not.
787 */
788 static struct lttng_consumer_channel *unref_channel(
789 struct lttng_consumer_stream *stream)
790 {
791 struct lttng_consumer_channel *free_chan = NULL;
792
793 assert(stream);
794 assert(stream->chan);
795
796 /* Update refcount of channel and see if we need to destroy it. */
797 if (!uatomic_sub_return(&stream->chan->refcount, 1)
798 && !uatomic_read(&stream->chan->nb_init_stream_left)) {
799 free_chan = stream->chan;
800 }
801
802 return free_chan;
803 }
804
805 /*
806 * Destroy a stream completely. This will delete, close and free the stream.
807 * Once return, the stream is NO longer usable. Its channel may get destroyed
808 * if conditions are met for a monitored stream.
809 *
810 * This MUST be called WITHOUT the consumer data and stream lock acquired if
811 * the stream is in _monitor_ mode else it does not matter.
812 */
813 void consumer_stream_destroy(struct lttng_consumer_stream *stream,
814 struct lttng_ht *ht)
815 {
816 assert(stream);
817
818 /* Stream is in monitor mode. */
819 if (stream->monitor) {
820 struct lttng_consumer_channel *free_chan = NULL;
821
822 /*
823 * This means that the stream was successfully removed from the streams
824 * list of the channel and sent to the right thread managing this
825 * stream thus being globally visible.
826 */
827 if (stream->globally_visible) {
828 pthread_mutex_lock(&consumer_data.lock);
829 pthread_mutex_lock(&stream->chan->lock);
830 pthread_mutex_lock(&stream->lock);
831 /* Remove every reference of the stream in the consumer. */
832 consumer_stream_delete(stream, ht);
833
834 destroy_close_stream(stream);
835
836 /* Update channel's refcount of the stream. */
837 free_chan = unref_channel(stream);
838
839 /* Indicates that the consumer data state MUST be updated after this. */
840 consumer_data.need_update = 1;
841
842 pthread_mutex_unlock(&stream->lock);
843 pthread_mutex_unlock(&stream->chan->lock);
844 pthread_mutex_unlock(&consumer_data.lock);
845 } else {
846 /*
847 * If the stream is not visible globally, this needs to be done
848 * outside of the consumer data lock section.
849 */
850 free_chan = unref_channel(stream);
851 }
852
853 if (free_chan) {
854 consumer_del_channel(free_chan);
855 }
856 } else {
857 destroy_close_stream(stream);
858 }
859
860 /* Free stream within a RCU call. */
861 lttng_trace_chunk_put(stream->trace_chunk);
862 stream->trace_chunk = NULL;
863 consumer_stream_free(stream);
864 }
865
866 /*
867 * Write index of a specific stream either on the relayd or local disk.
868 *
869 * Return 0 on success or else a negative value.
870 */
871 int consumer_stream_write_index(struct lttng_consumer_stream *stream,
872 struct ctf_packet_index *element)
873 {
874 int ret;
875
876 assert(stream);
877 assert(element);
878
879 rcu_read_lock();
880 if (stream->net_seq_idx != (uint64_t) -1ULL) {
881 struct consumer_relayd_sock_pair *relayd;
882 relayd = consumer_find_relayd(stream->net_seq_idx);
883 if (relayd) {
884 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
885 ret = relayd_send_index(&relayd->control_sock, element,
886 stream->relayd_stream_id, stream->next_net_seq_num - 1);
887 if (ret < 0) {
888 /*
889 * Communication error with lttng-relayd,
890 * perform cleanup now
891 */
892 ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
893 lttng_consumer_cleanup_relayd(relayd);
894 ret = -1;
895 }
896 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
897 } else {
898 ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
899 stream->key, stream->net_seq_idx);
900 ret = -1;
901 }
902 } else {
903 if (lttng_index_file_write(stream->index_file, element)) {
904 ret = -1;
905 } else {
906 ret = 0;
907 }
908 }
909 if (ret < 0) {
910 goto error;
911 }
912
913 error:
914 rcu_read_unlock();
915 return ret;
916 }
917
918 int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
919 bool create_index)
920 {
921 int ret;
922 enum lttng_trace_chunk_status chunk_status;
923 const int flags = O_WRONLY | O_CREAT | O_TRUNC;
924 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
925 char stream_path[LTTNG_PATH_MAX];
926
927 ASSERT_LOCKED(stream->lock);
928 assert(stream->trace_chunk);
929
930 ret = utils_stream_file_path(stream->chan->pathname, stream->name,
931 stream->chan->tracefile_size,
932 stream->tracefile_count_current, NULL,
933 stream_path, sizeof(stream_path));
934 if (ret < 0) {
935 goto end;
936 }
937
938 if (stream->out_fd >= 0) {
939 ret = close(stream->out_fd);
940 if (ret < 0) {
941 PERROR("Failed to close stream file \"%s\"",
942 stream->name);
943 goto end;
944 }
945 stream->out_fd = -1;
946 }
947
948 DBG("Opening stream output file \"%s\"", stream_path);
949 chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
950 flags, mode, &stream->out_fd);
951 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
952 ERR("Failed to open stream file \"%s\"", stream->name);
953 ret = -1;
954 goto end;
955 }
956
957 if (!stream->metadata_flag && (create_index || stream->index_file)) {
958 if (stream->index_file) {
959 lttng_index_file_put(stream->index_file);
960 }
961 stream->index_file = lttng_index_file_create_from_trace_chunk(
962 stream->trace_chunk,
963 stream->chan->pathname,
964 stream->name,
965 stream->chan->tracefile_size,
966 stream->tracefile_count_current,
967 CTF_INDEX_MAJOR, CTF_INDEX_MINOR,
968 false);
969 if (!stream->index_file) {
970 ret = -1;
971 goto end;
972 }
973 }
974
975 /* Reset current size because we just perform a rotation. */
976 stream->tracefile_size_current = 0;
977 stream->out_fd_offset = 0;
978 end:
979 return ret;
980 }
981
982 int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream)
983 {
984 int ret;
985
986 stream->tracefile_count_current++;
987 if (stream->chan->tracefile_count > 0) {
988 stream->tracefile_count_current %=
989 stream->chan->tracefile_count;
990 }
991
992 DBG("Rotating output files of stream \"%s\"", stream->name);
993 ret = consumer_stream_create_output_files(stream, true);
994 if (ret) {
995 goto end;
996 }
997
998 end:
999 return ret;
1000 }
1001
1002 bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
1003 {
1004 /*
1005 * This function does not take a const stream since
1006 * cds_lfht_is_node_deleted was not const before liburcu 0.12.
1007 */
1008 assert(stream);
1009 return cds_lfht_is_node_deleted(&stream->node.node);
1010 }
1011
1012 static ssize_t metadata_bucket_flush(
1013 const struct stream_subbuffer *buffer, void *data)
1014 {
1015 ssize_t ret;
1016 struct lttng_consumer_stream *stream = data;
1017
1018 ret = consumer_stream_consume_mmap(NULL, stream, buffer);
1019 if (ret < 0) {
1020 goto end;
1021 }
1022 end:
1023 return ret;
1024 }
1025
1026 static ssize_t metadata_bucket_consume(
1027 struct lttng_consumer_local_data *unused,
1028 struct lttng_consumer_stream *stream,
1029 const struct stream_subbuffer *subbuffer)
1030 {
1031 ssize_t ret;
1032 enum metadata_bucket_status status;
1033
1034 status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
1035 switch (status) {
1036 case METADATA_BUCKET_STATUS_OK:
1037 /* Return consumed size. */
1038 ret = subbuffer->buffer.buffer.size;
1039 break;
1040 default:
1041 ret = -1;
1042 }
1043
1044 return ret;
1045 }
1046
1047 int consumer_stream_enable_metadata_bucketization(
1048 struct lttng_consumer_stream *stream)
1049 {
1050 int ret = 0;
1051
1052 assert(stream->metadata_flag);
1053 assert(!stream->metadata_bucket);
1054 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1055
1056 stream->metadata_bucket = metadata_bucket_create(
1057 metadata_bucket_flush, stream);
1058 if (!stream->metadata_bucket) {
1059 ret = -1;
1060 goto end;
1061 }
1062
1063 stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
1064 end:
1065 return ret;
1066 }
1067
1068 void consumer_stream_metadata_set_version(
1069 struct lttng_consumer_stream *stream, uint64_t new_version)
1070 {
1071 assert(new_version > stream->metadata_version);
1072 stream->metadata_version = new_version;
1073 stream->reset_metadata_flag = 1;
1074
1075 if (stream->metadata_bucket) {
1076 metadata_bucket_reset(stream->metadata_bucket);
1077 }
1078 }
This page took 0.080265 seconds and 4 git commands to generate.