shm-path: remove directory hierarchy on destroy
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #define _LGPL_SOURCE
21 #include <assert.h>
22 #include <lttng/ust-ctl.h>
23 #include <poll.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/mman.h>
28 #include <sys/socket.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <inttypes.h>
32 #include <unistd.h>
33 #include <urcu/list.h>
34 #include <signal.h>
35
36 #include <bin/lttng-consumerd/health-consumerd.h>
37 #include <common/common.h>
38 #include <common/sessiond-comm/sessiond-comm.h>
39 #include <common/relayd/relayd.h>
40 #include <common/compat/fcntl.h>
41 #include <common/compat/endian.h>
42 #include <common/consumer-metadata-cache.h>
43 #include <common/consumer-stream.h>
44 #include <common/consumer-timer.h>
45 #include <common/utils.h>
46 #include <common/index/index.h>
47
48 #include "ust-consumer.h"
49
50 extern struct lttng_consumer_global_data consumer_data;
51 extern int consumer_poll_timeout;
52 extern volatile int consumer_quit;
53
54 /*
55 * Free channel object and all streams associated with it. This MUST be used
56 * only and only if the channel has _NEVER_ been added to the global channel
57 * hash table.
58 */
59 static void destroy_channel(struct lttng_consumer_channel *channel)
60 {
61 struct lttng_consumer_stream *stream, *stmp;
62
63 assert(channel);
64
65 DBG("UST consumer cleaning stream list");
66
67 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
68 send_node) {
69
70 health_code_update();
71
72 cds_list_del(&stream->send_node);
73 ustctl_destroy_stream(stream->ustream);
74 free(stream);
75 }
76
77 /*
78 * If a channel is available meaning that was created before the streams
79 * were, delete it.
80 */
81 if (channel->uchan) {
82 lttng_ustconsumer_del_channel(channel);
83 }
84 /* Try to rmdir all directories under shm_path root. */
85 if (channel->root_shm_path[0]) {
86 (void) utils_recursive_rmdir(channel->root_shm_path);
87 }
88 free(channel);
89 }
90
91 /*
92 * Add channel to internal consumer state.
93 *
94 * Returns 0 on success or else a negative value.
95 */
96 static int add_channel(struct lttng_consumer_channel *channel,
97 struct lttng_consumer_local_data *ctx)
98 {
99 int ret = 0;
100
101 assert(channel);
102 assert(ctx);
103
104 if (ctx->on_recv_channel != NULL) {
105 ret = ctx->on_recv_channel(channel);
106 if (ret == 0) {
107 ret = consumer_add_channel(channel, ctx);
108 } else if (ret < 0) {
109 /* Most likely an ENOMEM. */
110 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
111 goto error;
112 }
113 } else {
114 ret = consumer_add_channel(channel, ctx);
115 }
116
117 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
118
119 error:
120 return ret;
121 }
122
123 /*
124 * Allocate and return a consumer channel object.
125 */
126 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
127 const char *pathname, const char *name, uid_t uid, gid_t gid,
128 uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
129 uint64_t tracefile_size, uint64_t tracefile_count,
130 uint64_t session_id_per_pid, unsigned int monitor,
131 unsigned int live_timer_interval,
132 const char *root_shm_path, const char *shm_path)
133 {
134 assert(pathname);
135 assert(name);
136
137 return consumer_allocate_channel(key, session_id, pathname, name, uid,
138 gid, relayd_id, output, tracefile_size,
139 tracefile_count, session_id_per_pid, monitor,
140 live_timer_interval, root_shm_path, shm_path);
141 }
142
143 /*
144 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
145 * error value if applicable is set in it else it is kept untouched.
146 *
147 * Return NULL on error else the newly allocated stream object.
148 */
149 static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
150 struct lttng_consumer_channel *channel,
151 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
152 {
153 int alloc_ret;
154 struct lttng_consumer_stream *stream = NULL;
155
156 assert(channel);
157 assert(ctx);
158
159 stream = consumer_allocate_stream(channel->key,
160 key,
161 LTTNG_CONSUMER_ACTIVE_STREAM,
162 channel->name,
163 channel->uid,
164 channel->gid,
165 channel->relayd_id,
166 channel->session_id,
167 cpu,
168 &alloc_ret,
169 channel->type,
170 channel->monitor);
171 if (stream == NULL) {
172 switch (alloc_ret) {
173 case -ENOENT:
174 /*
175 * We could not find the channel. Can happen if cpu hotplug
176 * happens while tearing down.
177 */
178 DBG3("Could not find channel");
179 break;
180 case -ENOMEM:
181 case -EINVAL:
182 default:
183 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
184 break;
185 }
186 goto error;
187 }
188
189 stream->chan = channel;
190
191 error:
192 if (_alloc_ret) {
193 *_alloc_ret = alloc_ret;
194 }
195 return stream;
196 }
197
198 /*
199 * Send the given stream pointer to the corresponding thread.
200 *
201 * Returns 0 on success else a negative value.
202 */
203 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
204 struct lttng_consumer_local_data *ctx)
205 {
206 int ret;
207 struct lttng_pipe *stream_pipe;
208
209 /* Get the right pipe where the stream will be sent. */
210 if (stream->metadata_flag) {
211 ret = consumer_add_metadata_stream(stream);
212 if (ret) {
213 ERR("Consumer add metadata stream %" PRIu64 " failed.",
214 stream->key);
215 goto error;
216 }
217 stream_pipe = ctx->consumer_metadata_pipe;
218 } else {
219 ret = consumer_add_data_stream(stream);
220 if (ret) {
221 ERR("Consumer add stream %" PRIu64 " failed.",
222 stream->key);
223 goto error;
224 }
225 stream_pipe = ctx->consumer_data_pipe;
226 }
227
228 /*
229 * From this point on, the stream's ownership has been moved away from
230 * the channel and becomes globally visible.
231 */
232 stream->globally_visible = 1;
233
234 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
235 if (ret < 0) {
236 ERR("Consumer write %s stream to pipe %d",
237 stream->metadata_flag ? "metadata" : "data",
238 lttng_pipe_get_writefd(stream_pipe));
239 if (stream->metadata_flag) {
240 consumer_del_stream_for_metadata(stream);
241 } else {
242 consumer_del_stream_for_data(stream);
243 }
244 }
245 error:
246 return ret;
247 }
248
249 /*
250 * Create streams for the given channel using liblttng-ust-ctl.
251 *
252 * Return 0 on success else a negative value.
253 */
254 static int create_ust_streams(struct lttng_consumer_channel *channel,
255 struct lttng_consumer_local_data *ctx)
256 {
257 int ret, cpu = 0;
258 struct ustctl_consumer_stream *ustream;
259 struct lttng_consumer_stream *stream;
260
261 assert(channel);
262 assert(ctx);
263
264 /*
265 * While a stream is available from ustctl. When NULL is returned, we've
266 * reached the end of the possible stream for the channel.
267 */
268 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
269 int wait_fd;
270 int ust_metadata_pipe[2];
271
272 health_code_update();
273
274 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
275 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
276 if (ret < 0) {
277 ERR("Create ust metadata poll pipe");
278 goto error;
279 }
280 wait_fd = ust_metadata_pipe[0];
281 } else {
282 wait_fd = ustctl_stream_get_wait_fd(ustream);
283 }
284
285 /* Allocate consumer stream object. */
286 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
287 if (!stream) {
288 goto error_alloc;
289 }
290 stream->ustream = ustream;
291 /*
292 * Store it so we can save multiple function calls afterwards since
293 * this value is used heavily in the stream threads. This is UST
294 * specific so this is why it's done after allocation.
295 */
296 stream->wait_fd = wait_fd;
297
298 /*
299 * Increment channel refcount since the channel reference has now been
300 * assigned in the allocation process above.
301 */
302 if (stream->chan->monitor) {
303 uatomic_inc(&stream->chan->refcount);
304 }
305
306 /*
307 * Order is important this is why a list is used. On error, the caller
308 * should clean this list.
309 */
310 cds_list_add_tail(&stream->send_node, &channel->streams.head);
311
312 ret = ustctl_get_max_subbuf_size(stream->ustream,
313 &stream->max_sb_size);
314 if (ret < 0) {
315 ERR("ustctl_get_max_subbuf_size failed for stream %s",
316 stream->name);
317 goto error;
318 }
319
320 /* Do actions once stream has been received. */
321 if (ctx->on_recv_stream) {
322 ret = ctx->on_recv_stream(stream);
323 if (ret < 0) {
324 goto error;
325 }
326 }
327
328 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
329 stream->name, stream->key, stream->relayd_stream_id);
330
331 /* Set next CPU stream. */
332 channel->streams.count = ++cpu;
333
334 /* Keep stream reference when creating metadata. */
335 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
336 channel->metadata_stream = stream;
337 stream->ust_metadata_poll_pipe[0] = ust_metadata_pipe[0];
338 stream->ust_metadata_poll_pipe[1] = ust_metadata_pipe[1];
339 }
340 }
341
342 return 0;
343
344 error:
345 error_alloc:
346 return ret;
347 }
348
349 /*
350 * Create an UST channel with the given attributes and send it to the session
351 * daemon using the ust ctl API.
352 *
353 * Return 0 on success or else a negative value.
354 */
355 static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
356 struct ustctl_consumer_channel **chanp)
357 {
358 int ret;
359 struct ustctl_consumer_channel *channel;
360
361 assert(attr);
362 assert(chanp);
363
364 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
365 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
366 "switch_timer_interval: %u, read_timer_interval: %u, "
367 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
368 attr->num_subbuf, attr->switch_timer_interval,
369 attr->read_timer_interval, attr->output, attr->type);
370
371 channel = ustctl_create_channel(attr);
372 if (!channel) {
373 ret = -1;
374 goto error_create;
375 }
376
377 *chanp = channel;
378
379 return 0;
380
381 error_create:
382 return ret;
383 }
384
385 /*
386 * Send a single given stream to the session daemon using the sock.
387 *
388 * Return 0 on success else a negative value.
389 */
390 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
391 {
392 int ret;
393
394 assert(stream);
395 assert(sock >= 0);
396
397 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
398
399 /* Send stream to session daemon. */
400 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
401 if (ret < 0) {
402 goto error;
403 }
404
405 error:
406 return ret;
407 }
408
409 /*
410 * Send channel to sessiond.
411 *
412 * Return 0 on success or else a negative value.
413 */
414 static int send_sessiond_channel(int sock,
415 struct lttng_consumer_channel *channel,
416 struct lttng_consumer_local_data *ctx, int *relayd_error)
417 {
418 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
419 struct lttng_consumer_stream *stream;
420 uint64_t net_seq_idx = -1ULL;
421
422 assert(channel);
423 assert(ctx);
424 assert(sock >= 0);
425
426 DBG("UST consumer sending channel %s to sessiond", channel->name);
427
428 if (channel->relayd_id != (uint64_t) -1ULL) {
429 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
430
431 health_code_update();
432
433 /* Try to send the stream to the relayd if one is available. */
434 ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
435 if (ret < 0) {
436 /*
437 * Flag that the relayd was the problem here probably due to a
438 * communicaton error on the socket.
439 */
440 if (relayd_error) {
441 *relayd_error = 1;
442 }
443 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
444 }
445 if (net_seq_idx == -1ULL) {
446 net_seq_idx = stream->net_seq_idx;
447 }
448 }
449 }
450
451 /* Inform sessiond that we are about to send channel and streams. */
452 ret = consumer_send_status_msg(sock, ret_code);
453 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
454 /*
455 * Either the session daemon is not responding or the relayd died so we
456 * stop now.
457 */
458 goto error;
459 }
460
461 /* Send channel to sessiond. */
462 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
463 if (ret < 0) {
464 goto error;
465 }
466
467 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
468 if (ret < 0) {
469 goto error;
470 }
471
472 /* The channel was sent successfully to the sessiond at this point. */
473 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
474
475 health_code_update();
476
477 /* Send stream to session daemon. */
478 ret = send_sessiond_stream(sock, stream);
479 if (ret < 0) {
480 goto error;
481 }
482 }
483
484 /* Tell sessiond there is no more stream. */
485 ret = ustctl_send_stream_to_sessiond(sock, NULL);
486 if (ret < 0) {
487 goto error;
488 }
489
490 DBG("UST consumer NULL stream sent to sessiond");
491
492 return 0;
493
494 error:
495 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
496 ret = -1;
497 }
498 return ret;
499 }
500
501 /*
502 * Creates a channel and streams and add the channel it to the channel internal
503 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
504 * received.
505 *
506 * Return 0 on success or else, a negative value is returned and the channel
507 * MUST be destroyed by consumer_del_channel().
508 */
509 static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
510 struct lttng_consumer_channel *channel,
511 struct ustctl_consumer_channel_attr *attr)
512 {
513 int ret;
514
515 assert(ctx);
516 assert(channel);
517 assert(attr);
518
519 /*
520 * This value is still used by the kernel consumer since for the kernel,
521 * the stream ownership is not IN the consumer so we need to have the
522 * number of left stream that needs to be initialized so we can know when
523 * to delete the channel (see consumer.c).
524 *
525 * As for the user space tracer now, the consumer creates and sends the
526 * stream to the session daemon which only sends them to the application
527 * once every stream of a channel is received making this value useless
528 * because we they will be added to the poll thread before the application
529 * receives them. This ensures that a stream can not hang up during
530 * initilization of a channel.
531 */
532 channel->nb_init_stream_left = 0;
533
534 /* The reply msg status is handled in the following call. */
535 ret = create_ust_channel(attr, &channel->uchan);
536 if (ret < 0) {
537 goto end;
538 }
539
540 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
541
542 /*
543 * For the snapshots (no monitor), we create the metadata streams
544 * on demand, not during the channel creation.
545 */
546 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
547 ret = 0;
548 goto end;
549 }
550
551 /* Open all streams for this channel. */
552 ret = create_ust_streams(channel, ctx);
553 if (ret < 0) {
554 goto end;
555 }
556
557 end:
558 return ret;
559 }
560
561 /*
562 * Send all stream of a channel to the right thread handling it.
563 *
564 * On error, return a negative value else 0 on success.
565 */
566 static int send_streams_to_thread(struct lttng_consumer_channel *channel,
567 struct lttng_consumer_local_data *ctx)
568 {
569 int ret = 0;
570 struct lttng_consumer_stream *stream, *stmp;
571
572 assert(channel);
573 assert(ctx);
574
575 /* Send streams to the corresponding thread. */
576 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
577 send_node) {
578
579 health_code_update();
580
581 /* Sending the stream to the thread. */
582 ret = send_stream_to_thread(stream, ctx);
583 if (ret < 0) {
584 /*
585 * If we are unable to send the stream to the thread, there is
586 * a big problem so just stop everything.
587 */
588 /* Remove node from the channel stream list. */
589 cds_list_del(&stream->send_node);
590 goto error;
591 }
592
593 /* Remove node from the channel stream list. */
594 cds_list_del(&stream->send_node);
595
596 }
597
598 error:
599 return ret;
600 }
601
602 /*
603 * Flush channel's streams using the given key to retrieve the channel.
604 *
605 * Return 0 on success else an LTTng error code.
606 */
607 static int flush_channel(uint64_t chan_key)
608 {
609 int ret = 0;
610 struct lttng_consumer_channel *channel;
611 struct lttng_consumer_stream *stream;
612 struct lttng_ht *ht;
613 struct lttng_ht_iter iter;
614
615 DBG("UST consumer flush channel key %" PRIu64, chan_key);
616
617 rcu_read_lock();
618 channel = consumer_find_channel(chan_key);
619 if (!channel) {
620 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
621 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
622 goto error;
623 }
624
625 ht = consumer_data.stream_per_chan_id_ht;
626
627 /* For each stream of the channel id, flush it. */
628 cds_lfht_for_each_entry_duplicate(ht->ht,
629 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
630 &channel->key, &iter.iter, stream, node_channel_id.node) {
631
632 health_code_update();
633
634 ustctl_flush_buffer(stream->ustream, 1);
635 }
636 error:
637 rcu_read_unlock();
638 return ret;
639 }
640
641 /*
642 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
643 * RCU read side lock MUST be acquired before calling this function.
644 *
645 * Return 0 on success else an LTTng error code.
646 */
647 static int close_metadata(uint64_t chan_key)
648 {
649 int ret = 0;
650 struct lttng_consumer_channel *channel;
651
652 DBG("UST consumer close metadata key %" PRIu64, chan_key);
653
654 channel = consumer_find_channel(chan_key);
655 if (!channel) {
656 /*
657 * This is possible if the metadata thread has issue a delete because
658 * the endpoint point of the stream hung up. There is no way the
659 * session daemon can know about it thus use a DBG instead of an actual
660 * error.
661 */
662 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
663 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
664 goto error;
665 }
666
667 pthread_mutex_lock(&consumer_data.lock);
668 pthread_mutex_lock(&channel->lock);
669
670 if (cds_lfht_is_node_deleted(&channel->node.node)) {
671 goto error_unlock;
672 }
673
674 lttng_ustconsumer_close_metadata(channel);
675
676 error_unlock:
677 pthread_mutex_unlock(&channel->lock);
678 pthread_mutex_unlock(&consumer_data.lock);
679 error:
680 return ret;
681 }
682
683 /*
684 * RCU read side lock MUST be acquired before calling this function.
685 *
686 * Return 0 on success else an LTTng error code.
687 */
688 static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
689 {
690 int ret;
691 struct lttng_consumer_channel *metadata;
692
693 DBG("UST consumer setup metadata key %" PRIu64, key);
694
695 metadata = consumer_find_channel(key);
696 if (!metadata) {
697 ERR("UST consumer push metadata %" PRIu64 " not found", key);
698 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
699 goto end;
700 }
701
702 /*
703 * In no monitor mode, the metadata channel has no stream(s) so skip the
704 * ownership transfer to the metadata thread.
705 */
706 if (!metadata->monitor) {
707 DBG("Metadata channel in no monitor");
708 ret = 0;
709 goto end;
710 }
711
712 /*
713 * Send metadata stream to relayd if one available. Availability is
714 * known if the stream is still in the list of the channel.
715 */
716 if (cds_list_empty(&metadata->streams.head)) {
717 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
718 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
719 goto error_no_stream;
720 }
721
722 /* Send metadata stream to relayd if needed. */
723 if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
724 ret = consumer_send_relayd_stream(metadata->metadata_stream,
725 metadata->pathname);
726 if (ret < 0) {
727 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
728 goto error;
729 }
730 ret = consumer_send_relayd_streams_sent(
731 metadata->metadata_stream->net_seq_idx);
732 if (ret < 0) {
733 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
734 goto error;
735 }
736 }
737
738 ret = send_streams_to_thread(metadata, ctx);
739 if (ret < 0) {
740 /*
741 * If we are unable to send the stream to the thread, there is
742 * a big problem so just stop everything.
743 */
744 ret = LTTCOMM_CONSUMERD_FATAL;
745 goto error;
746 }
747 /* List MUST be empty after or else it could be reused. */
748 assert(cds_list_empty(&metadata->streams.head));
749
750 ret = 0;
751 goto end;
752
753 error:
754 /*
755 * Delete metadata channel on error. At this point, the metadata stream can
756 * NOT be monitored by the metadata thread thus having the guarantee that
757 * the stream is still in the local stream list of the channel. This call
758 * will make sure to clean that list.
759 */
760 consumer_stream_destroy(metadata->metadata_stream, NULL);
761 cds_list_del(&metadata->metadata_stream->send_node);
762 metadata->metadata_stream = NULL;
763 error_no_stream:
764 end:
765 return ret;
766 }
767
768 /*
769 * Snapshot the whole metadata.
770 *
771 * Returns 0 on success, < 0 on error
772 */
773 static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
774 struct lttng_consumer_local_data *ctx)
775 {
776 int ret = 0;
777 struct lttng_consumer_channel *metadata_channel;
778 struct lttng_consumer_stream *metadata_stream;
779
780 assert(path);
781 assert(ctx);
782
783 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
784 key, path);
785
786 rcu_read_lock();
787
788 metadata_channel = consumer_find_channel(key);
789 if (!metadata_channel) {
790 ERR("UST snapshot metadata channel not found for key %" PRIu64,
791 key);
792 ret = -1;
793 goto error;
794 }
795 assert(!metadata_channel->monitor);
796
797 health_code_update();
798
799 /*
800 * Ask the sessiond if we have new metadata waiting and update the
801 * consumer metadata cache.
802 */
803 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
804 if (ret < 0) {
805 goto error;
806 }
807
808 health_code_update();
809
810 /*
811 * The metadata stream is NOT created in no monitor mode when the channel
812 * is created on a sessiond ask channel command.
813 */
814 ret = create_ust_streams(metadata_channel, ctx);
815 if (ret < 0) {
816 goto error;
817 }
818
819 metadata_stream = metadata_channel->metadata_stream;
820 assert(metadata_stream);
821
822 if (relayd_id != (uint64_t) -1ULL) {
823 metadata_stream->net_seq_idx = relayd_id;
824 ret = consumer_send_relayd_stream(metadata_stream, path);
825 if (ret < 0) {
826 goto error_stream;
827 }
828 } else {
829 ret = utils_create_stream_file(path, metadata_stream->name,
830 metadata_stream->chan->tracefile_size,
831 metadata_stream->tracefile_count_current,
832 metadata_stream->uid, metadata_stream->gid, NULL);
833 if (ret < 0) {
834 goto error_stream;
835 }
836 metadata_stream->out_fd = ret;
837 metadata_stream->tracefile_size_current = 0;
838 }
839
840 do {
841 health_code_update();
842
843 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
844 if (ret < 0) {
845 goto error_stream;
846 }
847 } while (ret > 0);
848
849 error_stream:
850 /*
851 * Clean up the stream completly because the next snapshot will use a new
852 * metadata stream.
853 */
854 consumer_stream_destroy(metadata_stream, NULL);
855 cds_list_del(&metadata_stream->send_node);
856 metadata_channel->metadata_stream = NULL;
857
858 error:
859 rcu_read_unlock();
860 return ret;
861 }
862
863 /*
864 * Take a snapshot of all the stream of a channel.
865 *
866 * Returns 0 on success, < 0 on error
867 */
868 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
869 uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
870 {
871 int ret;
872 unsigned use_relayd = 0;
873 unsigned long consumed_pos, produced_pos;
874 struct lttng_consumer_channel *channel;
875 struct lttng_consumer_stream *stream;
876
877 assert(path);
878 assert(ctx);
879
880 rcu_read_lock();
881
882 if (relayd_id != (uint64_t) -1ULL) {
883 use_relayd = 1;
884 }
885
886 channel = consumer_find_channel(key);
887 if (!channel) {
888 ERR("UST snapshot channel not found for key %" PRIu64, key);
889 ret = -1;
890 goto error;
891 }
892 assert(!channel->monitor);
893 DBG("UST consumer snapshot channel %" PRIu64, key);
894
895 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
896
897 health_code_update();
898
899 /* Lock stream because we are about to change its state. */
900 pthread_mutex_lock(&stream->lock);
901 stream->net_seq_idx = relayd_id;
902
903 if (use_relayd) {
904 ret = consumer_send_relayd_stream(stream, path);
905 if (ret < 0) {
906 goto error_unlock;
907 }
908 } else {
909 ret = utils_create_stream_file(path, stream->name,
910 stream->chan->tracefile_size,
911 stream->tracefile_count_current,
912 stream->uid, stream->gid, NULL);
913 if (ret < 0) {
914 goto error_unlock;
915 }
916 stream->out_fd = ret;
917 stream->tracefile_size_current = 0;
918
919 DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
920 stream->name, stream->key);
921 }
922 if (relayd_id != -1ULL) {
923 ret = consumer_send_relayd_streams_sent(relayd_id);
924 if (ret < 0) {
925 goto error_unlock;
926 }
927 }
928
929 ustctl_flush_buffer(stream->ustream, 1);
930
931 ret = lttng_ustconsumer_take_snapshot(stream);
932 if (ret < 0) {
933 ERR("Taking UST snapshot");
934 goto error_unlock;
935 }
936
937 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
938 if (ret < 0) {
939 ERR("Produced UST snapshot position");
940 goto error_unlock;
941 }
942
943 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
944 if (ret < 0) {
945 ERR("Consumerd UST snapshot position");
946 goto error_unlock;
947 }
948
949 /*
950 * The original value is sent back if max stream size is larger than
951 * the possible size of the snapshot. Also, we assume that the session
952 * daemon should never send a maximum stream size that is lower than
953 * subbuffer size.
954 */
955 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
956 produced_pos, nb_packets_per_stream,
957 stream->max_sb_size);
958
959 while (consumed_pos < produced_pos) {
960 ssize_t read_len;
961 unsigned long len, padded_len;
962
963 health_code_update();
964
965 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
966
967 ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
968 if (ret < 0) {
969 if (ret != -EAGAIN) {
970 PERROR("ustctl_get_subbuf snapshot");
971 goto error_close_stream;
972 }
973 DBG("UST consumer get subbuf failed. Skipping it.");
974 consumed_pos += stream->max_sb_size;
975 continue;
976 }
977
978 ret = ustctl_get_subbuf_size(stream->ustream, &len);
979 if (ret < 0) {
980 ERR("Snapshot ustctl_get_subbuf_size");
981 goto error_put_subbuf;
982 }
983
984 ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
985 if (ret < 0) {
986 ERR("Snapshot ustctl_get_padded_subbuf_size");
987 goto error_put_subbuf;
988 }
989
990 read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
991 padded_len - len, NULL);
992 if (use_relayd) {
993 if (read_len != len) {
994 ret = -EPERM;
995 goto error_put_subbuf;
996 }
997 } else {
998 if (read_len != padded_len) {
999 ret = -EPERM;
1000 goto error_put_subbuf;
1001 }
1002 }
1003
1004 ret = ustctl_put_subbuf(stream->ustream);
1005 if (ret < 0) {
1006 ERR("Snapshot ustctl_put_subbuf");
1007 goto error_close_stream;
1008 }
1009 consumed_pos += stream->max_sb_size;
1010 }
1011
1012 /* Simply close the stream so we can use it on the next snapshot. */
1013 consumer_stream_close(stream);
1014 pthread_mutex_unlock(&stream->lock);
1015 }
1016
1017 rcu_read_unlock();
1018 return 0;
1019
1020 error_put_subbuf:
1021 if (ustctl_put_subbuf(stream->ustream) < 0) {
1022 ERR("Snapshot ustctl_put_subbuf");
1023 }
1024 error_close_stream:
1025 consumer_stream_close(stream);
1026 error_unlock:
1027 pthread_mutex_unlock(&stream->lock);
1028 error:
1029 rcu_read_unlock();
1030 return ret;
1031 }
1032
1033 /*
1034 * Receive the metadata updates from the sessiond.
1035 */
1036 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
1037 uint64_t len, struct lttng_consumer_channel *channel,
1038 int timer, int wait)
1039 {
1040 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1041 char *metadata_str;
1042
1043 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
1044
1045 metadata_str = zmalloc(len * sizeof(char));
1046 if (!metadata_str) {
1047 PERROR("zmalloc metadata string");
1048 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1049 goto end;
1050 }
1051
1052 health_code_update();
1053
1054 /* Receive metadata string. */
1055 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1056 if (ret < 0) {
1057 /* Session daemon is dead so return gracefully. */
1058 ret_code = ret;
1059 goto end_free;
1060 }
1061
1062 health_code_update();
1063
1064 pthread_mutex_lock(&channel->metadata_cache->lock);
1065 ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
1066 if (ret < 0) {
1067 /* Unable to handle metadata. Notify session daemon. */
1068 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1069 /*
1070 * Skip metadata flush on write error since the offset and len might
1071 * not have been updated which could create an infinite loop below when
1072 * waiting for the metadata cache to be flushed.
1073 */
1074 pthread_mutex_unlock(&channel->metadata_cache->lock);
1075 goto end_free;
1076 }
1077 pthread_mutex_unlock(&channel->metadata_cache->lock);
1078
1079 if (!wait) {
1080 goto end_free;
1081 }
1082 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
1083 DBG("Waiting for metadata to be flushed");
1084
1085 health_code_update();
1086
1087 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1088 }
1089
1090 end_free:
1091 free(metadata_str);
1092 end:
1093 return ret_code;
1094 }
1095
1096 /*
1097 * Receive command from session daemon and process it.
1098 *
1099 * Return 1 on success else a negative value or 0.
1100 */
1101 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1102 int sock, struct pollfd *consumer_sockpoll)
1103 {
1104 ssize_t ret;
1105 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1106 struct lttcomm_consumer_msg msg;
1107 struct lttng_consumer_channel *channel = NULL;
1108
1109 health_code_update();
1110
1111 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1112 if (ret != sizeof(msg)) {
1113 DBG("Consumer received unexpected message size %zd (expects %zu)",
1114 ret, sizeof(msg));
1115 /*
1116 * The ret value might 0 meaning an orderly shutdown but this is ok
1117 * since the caller handles this.
1118 */
1119 if (ret > 0) {
1120 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
1121 ret = -1;
1122 }
1123 return ret;
1124 }
1125
1126 health_code_update();
1127
1128 /* deprecated */
1129 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
1130
1131 health_code_update();
1132
1133 /* relayd needs RCU read-side lock */
1134 rcu_read_lock();
1135
1136 switch (msg.cmd_type) {
1137 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1138 {
1139 /* Session daemon status message are handled in the following call. */
1140 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
1141 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
1142 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
1143 msg.u.relayd_sock.relayd_session_id);
1144 goto end_nosignal;
1145 }
1146 case LTTNG_CONSUMER_DESTROY_RELAYD:
1147 {
1148 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
1149 struct consumer_relayd_sock_pair *relayd;
1150
1151 DBG("UST consumer destroying relayd %" PRIu64, index);
1152
1153 /* Get relayd reference if exists. */
1154 relayd = consumer_find_relayd(index);
1155 if (relayd == NULL) {
1156 DBG("Unable to find relayd %" PRIu64, index);
1157 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
1158 }
1159
1160 /*
1161 * Each relayd socket pair has a refcount of stream attached to it
1162 * which tells if the relayd is still active or not depending on the
1163 * refcount value.
1164 *
1165 * This will set the destroy flag of the relayd object and destroy it
1166 * if the refcount reaches zero when called.
1167 *
1168 * The destroy can happen either here or when a stream fd hangs up.
1169 */
1170 if (relayd) {
1171 consumer_flag_relayd_for_destroy(relayd);
1172 }
1173
1174 goto end_msg_sessiond;
1175 }
1176 case LTTNG_CONSUMER_UPDATE_STREAM:
1177 {
1178 rcu_read_unlock();
1179 return -ENOSYS;
1180 }
1181 case LTTNG_CONSUMER_DATA_PENDING:
1182 {
1183 int ret, is_data_pending;
1184 uint64_t id = msg.u.data_pending.session_id;
1185
1186 DBG("UST consumer data pending command for id %" PRIu64, id);
1187
1188 is_data_pending = consumer_data_pending(id);
1189
1190 /* Send back returned value to session daemon */
1191 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
1192 sizeof(is_data_pending));
1193 if (ret < 0) {
1194 DBG("Error when sending the data pending ret code: %d", ret);
1195 goto error_fatal;
1196 }
1197
1198 /*
1199 * No need to send back a status message since the data pending
1200 * returned value is the response.
1201 */
1202 break;
1203 }
1204 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1205 {
1206 int ret;
1207 struct ustctl_consumer_channel_attr attr;
1208
1209 /* Create a plain object and reserve a channel key. */
1210 channel = allocate_channel(msg.u.ask_channel.session_id,
1211 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
1212 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
1213 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
1214 (enum lttng_event_output) msg.u.ask_channel.output,
1215 msg.u.ask_channel.tracefile_size,
1216 msg.u.ask_channel.tracefile_count,
1217 msg.u.ask_channel.session_id_per_pid,
1218 msg.u.ask_channel.monitor,
1219 msg.u.ask_channel.live_timer_interval,
1220 msg.u.ask_channel.root_shm_path,
1221 msg.u.ask_channel.shm_path);
1222 if (!channel) {
1223 goto end_channel_error;
1224 }
1225
1226 /*
1227 * Assign UST application UID to the channel. This value is ignored for
1228 * per PID buffers. This is specific to UST thus setting this after the
1229 * allocation.
1230 */
1231 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1232
1233 /* Build channel attributes from received message. */
1234 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1235 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1236 attr.overwrite = msg.u.ask_channel.overwrite;
1237 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1238 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
1239 attr.chan_id = msg.u.ask_channel.chan_id;
1240 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
1241 strncpy(attr.shm_path, channel->shm_path,
1242 sizeof(attr.shm_path));
1243 attr.shm_path[sizeof(attr.shm_path) - 1] = '\0';
1244
1245 /* Match channel buffer type to the UST abi. */
1246 switch (msg.u.ask_channel.output) {
1247 case LTTNG_EVENT_MMAP:
1248 default:
1249 attr.output = LTTNG_UST_MMAP;
1250 break;
1251 }
1252
1253 /* Translate and save channel type. */
1254 switch (msg.u.ask_channel.type) {
1255 case LTTNG_UST_CHAN_PER_CPU:
1256 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
1257 attr.type = LTTNG_UST_CHAN_PER_CPU;
1258 /*
1259 * Set refcount to 1 for owner. Below, we will
1260 * pass ownership to the
1261 * consumer_thread_channel_poll() thread.
1262 */
1263 channel->refcount = 1;
1264 break;
1265 case LTTNG_UST_CHAN_METADATA:
1266 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
1267 attr.type = LTTNG_UST_CHAN_METADATA;
1268 break;
1269 default:
1270 assert(0);
1271 goto error_fatal;
1272 };
1273
1274 health_code_update();
1275
1276 ret = ask_channel(ctx, sock, channel, &attr);
1277 if (ret < 0) {
1278 goto end_channel_error;
1279 }
1280
1281 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1282 ret = consumer_metadata_cache_allocate(channel);
1283 if (ret < 0) {
1284 ERR("Allocating metadata cache");
1285 goto end_channel_error;
1286 }
1287 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1288 attr.switch_timer_interval = 0;
1289 } else {
1290 consumer_timer_live_start(channel,
1291 msg.u.ask_channel.live_timer_interval);
1292 }
1293
1294 health_code_update();
1295
1296 /*
1297 * Add the channel to the internal state AFTER all streams were created
1298 * and successfully sent to session daemon. This way, all streams must
1299 * be ready before this channel is visible to the threads.
1300 * If add_channel succeeds, ownership of the channel is
1301 * passed to consumer_thread_channel_poll().
1302 */
1303 ret = add_channel(channel, ctx);
1304 if (ret < 0) {
1305 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1306 if (channel->switch_timer_enabled == 1) {
1307 consumer_timer_switch_stop(channel);
1308 }
1309 consumer_metadata_cache_destroy(channel);
1310 }
1311 if (channel->live_timer_enabled == 1) {
1312 consumer_timer_live_stop(channel);
1313 }
1314 goto end_channel_error;
1315 }
1316
1317 health_code_update();
1318
1319 /*
1320 * Channel and streams are now created. Inform the session daemon that
1321 * everything went well and should wait to receive the channel and
1322 * streams with ustctl API.
1323 */
1324 ret = consumer_send_status_channel(sock, channel);
1325 if (ret < 0) {
1326 /*
1327 * There is probably a problem on the socket.
1328 */
1329 goto error_fatal;
1330 }
1331
1332 break;
1333 }
1334 case LTTNG_CONSUMER_GET_CHANNEL:
1335 {
1336 int ret, relayd_err = 0;
1337 uint64_t key = msg.u.get_channel.key;
1338 struct lttng_consumer_channel *channel;
1339
1340 channel = consumer_find_channel(key);
1341 if (!channel) {
1342 ERR("UST consumer get channel key %" PRIu64 " not found", key);
1343 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1344 goto end_msg_sessiond;
1345 }
1346
1347 health_code_update();
1348
1349 /* Send everything to sessiond. */
1350 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
1351 if (ret < 0) {
1352 if (relayd_err) {
1353 /*
1354 * We were unable to send to the relayd the stream so avoid
1355 * sending back a fatal error to the thread since this is OK
1356 * and the consumer can continue its work. The above call
1357 * has sent the error status message to the sessiond.
1358 */
1359 goto end_nosignal;
1360 }
1361 /*
1362 * The communicaton was broken hence there is a bad state between
1363 * the consumer and sessiond so stop everything.
1364 */
1365 goto error_fatal;
1366 }
1367
1368 health_code_update();
1369
1370 /*
1371 * In no monitor mode, the streams ownership is kept inside the channel
1372 * so don't send them to the data thread.
1373 */
1374 if (!channel->monitor) {
1375 goto end_msg_sessiond;
1376 }
1377
1378 ret = send_streams_to_thread(channel, ctx);
1379 if (ret < 0) {
1380 /*
1381 * If we are unable to send the stream to the thread, there is
1382 * a big problem so just stop everything.
1383 */
1384 goto error_fatal;
1385 }
1386 /* List MUST be empty after or else it could be reused. */
1387 assert(cds_list_empty(&channel->streams.head));
1388 goto end_msg_sessiond;
1389 }
1390 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1391 {
1392 uint64_t key = msg.u.destroy_channel.key;
1393
1394 /*
1395 * Only called if streams have not been sent to stream
1396 * manager thread. However, channel has been sent to
1397 * channel manager thread.
1398 */
1399 notify_thread_del_channel(ctx, key);
1400 goto end_msg_sessiond;
1401 }
1402 case LTTNG_CONSUMER_CLOSE_METADATA:
1403 {
1404 int ret;
1405
1406 ret = close_metadata(msg.u.close_metadata.key);
1407 if (ret != 0) {
1408 ret_code = ret;
1409 }
1410
1411 goto end_msg_sessiond;
1412 }
1413 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1414 {
1415 int ret;
1416
1417 ret = flush_channel(msg.u.flush_channel.key);
1418 if (ret != 0) {
1419 ret_code = ret;
1420 }
1421
1422 goto end_msg_sessiond;
1423 }
1424 case LTTNG_CONSUMER_PUSH_METADATA:
1425 {
1426 int ret;
1427 uint64_t len = msg.u.push_metadata.len;
1428 uint64_t key = msg.u.push_metadata.key;
1429 uint64_t offset = msg.u.push_metadata.target_offset;
1430 struct lttng_consumer_channel *channel;
1431
1432 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
1433 len);
1434
1435 channel = consumer_find_channel(key);
1436 if (!channel) {
1437 /*
1438 * This is possible if the metadata creation on the consumer side
1439 * is in flight vis-a-vis a concurrent push metadata from the
1440 * session daemon. Simply return that the channel failed and the
1441 * session daemon will handle that message correctly considering
1442 * that this race is acceptable thus the DBG() statement here.
1443 */
1444 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1445 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
1446 goto end_msg_sessiond;
1447 }
1448
1449 health_code_update();
1450
1451 /* Tell session daemon we are ready to receive the metadata. */
1452 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
1453 if (ret < 0) {
1454 /* Somehow, the session daemon is not responding anymore. */
1455 goto error_fatal;
1456 }
1457
1458 health_code_update();
1459
1460 /* Wait for more data. */
1461 health_poll_entry();
1462 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1463 health_poll_exit();
1464 if (ret) {
1465 goto error_fatal;
1466 }
1467
1468 health_code_update();
1469
1470 ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
1471 len, channel, 0, 1);
1472 if (ret < 0) {
1473 /* error receiving from sessiond */
1474 goto error_fatal;
1475 } else {
1476 ret_code = ret;
1477 goto end_msg_sessiond;
1478 }
1479 }
1480 case LTTNG_CONSUMER_SETUP_METADATA:
1481 {
1482 int ret;
1483
1484 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1485 if (ret) {
1486 ret_code = ret;
1487 }
1488 goto end_msg_sessiond;
1489 }
1490 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1491 {
1492 if (msg.u.snapshot_channel.metadata) {
1493 ret = snapshot_metadata(msg.u.snapshot_channel.key,
1494 msg.u.snapshot_channel.pathname,
1495 msg.u.snapshot_channel.relayd_id,
1496 ctx);
1497 if (ret < 0) {
1498 ERR("Snapshot metadata failed");
1499 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1500 }
1501 } else {
1502 ret = snapshot_channel(msg.u.snapshot_channel.key,
1503 msg.u.snapshot_channel.pathname,
1504 msg.u.snapshot_channel.relayd_id,
1505 msg.u.snapshot_channel.nb_packets_per_stream,
1506 ctx);
1507 if (ret < 0) {
1508 ERR("Snapshot channel failed");
1509 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
1510 }
1511 }
1512
1513 health_code_update();
1514 ret = consumer_send_status_msg(sock, ret_code);
1515 if (ret < 0) {
1516 /* Somehow, the session daemon is not responding anymore. */
1517 goto end_nosignal;
1518 }
1519 health_code_update();
1520 break;
1521 }
1522 default:
1523 break;
1524 }
1525
1526 end_nosignal:
1527 rcu_read_unlock();
1528
1529 health_code_update();
1530
1531 /*
1532 * Return 1 to indicate success since the 0 value can be a socket
1533 * shutdown during the recv() or send() call.
1534 */
1535 return 1;
1536
1537 end_msg_sessiond:
1538 /*
1539 * The returned value here is not useful since either way we'll return 1 to
1540 * the caller because the session daemon socket management is done
1541 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1542 */
1543 ret = consumer_send_status_msg(sock, ret_code);
1544 if (ret < 0) {
1545 goto error_fatal;
1546 }
1547 rcu_read_unlock();
1548
1549 health_code_update();
1550
1551 return 1;
1552 end_channel_error:
1553 if (channel) {
1554 /*
1555 * Free channel here since no one has a reference to it. We don't
1556 * free after that because a stream can store this pointer.
1557 */
1558 destroy_channel(channel);
1559 }
1560 /* We have to send a status channel message indicating an error. */
1561 ret = consumer_send_status_channel(sock, NULL);
1562 if (ret < 0) {
1563 /* Stop everything if session daemon can not be notified. */
1564 goto error_fatal;
1565 }
1566 rcu_read_unlock();
1567
1568 health_code_update();
1569
1570 return 1;
1571 error_fatal:
1572 rcu_read_unlock();
1573 /* This will issue a consumer stop. */
1574 return -1;
1575 }
1576
1577 /*
1578 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1579 * compiled out, we isolate it in this library.
1580 */
1581 int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
1582 unsigned long *off)
1583 {
1584 assert(stream);
1585 assert(stream->ustream);
1586
1587 return ustctl_get_mmap_read_offset(stream->ustream, off);
1588 }
1589
1590 /*
1591 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1592 * compiled out, we isolate it in this library.
1593 */
1594 void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
1595 {
1596 assert(stream);
1597 assert(stream->ustream);
1598
1599 return ustctl_get_mmap_base(stream->ustream);
1600 }
1601
1602 /*
1603 * Take a snapshot for a specific fd
1604 *
1605 * Returns 0 on success, < 0 on error
1606 */
1607 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
1608 {
1609 assert(stream);
1610 assert(stream->ustream);
1611
1612 return ustctl_snapshot(stream->ustream);
1613 }
1614
1615 /*
1616 * Get the produced position
1617 *
1618 * Returns 0 on success, < 0 on error
1619 */
1620 int lttng_ustconsumer_get_produced_snapshot(
1621 struct lttng_consumer_stream *stream, unsigned long *pos)
1622 {
1623 assert(stream);
1624 assert(stream->ustream);
1625 assert(pos);
1626
1627 return ustctl_snapshot_get_produced(stream->ustream, pos);
1628 }
1629
1630 /*
1631 * Get the consumed position
1632 *
1633 * Returns 0 on success, < 0 on error
1634 */
1635 int lttng_ustconsumer_get_consumed_snapshot(
1636 struct lttng_consumer_stream *stream, unsigned long *pos)
1637 {
1638 assert(stream);
1639 assert(stream->ustream);
1640 assert(pos);
1641
1642 return ustctl_snapshot_get_consumed(stream->ustream, pos);
1643 }
1644
1645 void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
1646 int producer)
1647 {
1648 assert(stream);
1649 assert(stream->ustream);
1650
1651 ustctl_flush_buffer(stream->ustream, producer);
1652 }
1653
1654 int lttng_ustconsumer_get_current_timestamp(
1655 struct lttng_consumer_stream *stream, uint64_t *ts)
1656 {
1657 assert(stream);
1658 assert(stream->ustream);
1659 assert(ts);
1660
1661 return ustctl_get_current_timestamp(stream->ustream, ts);
1662 }
1663
1664 /*
1665 * Called when the stream signal the consumer that it has hang up.
1666 */
1667 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
1668 {
1669 assert(stream);
1670 assert(stream->ustream);
1671
1672 ustctl_flush_buffer(stream->ustream, 0);
1673 stream->hangup_flush_done = 1;
1674 }
1675
1676 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
1677 {
1678 assert(chan);
1679 assert(chan->uchan);
1680
1681 if (chan->switch_timer_enabled == 1) {
1682 consumer_timer_switch_stop(chan);
1683 }
1684 consumer_metadata_cache_destroy(chan);
1685 ustctl_destroy_channel(chan->uchan);
1686 /* Try to rmdir all directories under shm_path root. */
1687 if (chan->root_shm_path[0]) {
1688 (void) utils_recursive_rmdir(chan->root_shm_path);
1689 }
1690 }
1691
1692 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
1693 {
1694 assert(stream);
1695 assert(stream->ustream);
1696
1697 if (stream->chan->switch_timer_enabled == 1) {
1698 consumer_timer_switch_stop(stream->chan);
1699 }
1700 ustctl_destroy_stream(stream->ustream);
1701 }
1702
1703 int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
1704 {
1705 assert(stream);
1706 assert(stream->ustream);
1707
1708 return ustctl_stream_get_wakeup_fd(stream->ustream);
1709 }
1710
1711 int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
1712 {
1713 assert(stream);
1714 assert(stream->ustream);
1715
1716 return ustctl_stream_close_wakeup_fd(stream->ustream);
1717 }
1718
1719 /*
1720 * Populate index values of a UST stream. Values are set in big endian order.
1721 *
1722 * Return 0 on success or else a negative value.
1723 */
1724 static int get_index_values(struct ctf_packet_index *index,
1725 struct ustctl_consumer_stream *ustream)
1726 {
1727 int ret;
1728
1729 ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
1730 if (ret < 0) {
1731 PERROR("ustctl_get_timestamp_begin");
1732 goto error;
1733 }
1734 index->timestamp_begin = htobe64(index->timestamp_begin);
1735
1736 ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
1737 if (ret < 0) {
1738 PERROR("ustctl_get_timestamp_end");
1739 goto error;
1740 }
1741 index->timestamp_end = htobe64(index->timestamp_end);
1742
1743 ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
1744 if (ret < 0) {
1745 PERROR("ustctl_get_events_discarded");
1746 goto error;
1747 }
1748 index->events_discarded = htobe64(index->events_discarded);
1749
1750 ret = ustctl_get_content_size(ustream, &index->content_size);
1751 if (ret < 0) {
1752 PERROR("ustctl_get_content_size");
1753 goto error;
1754 }
1755 index->content_size = htobe64(index->content_size);
1756
1757 ret = ustctl_get_packet_size(ustream, &index->packet_size);
1758 if (ret < 0) {
1759 PERROR("ustctl_get_packet_size");
1760 goto error;
1761 }
1762 index->packet_size = htobe64(index->packet_size);
1763
1764 ret = ustctl_get_stream_id(ustream, &index->stream_id);
1765 if (ret < 0) {
1766 PERROR("ustctl_get_stream_id");
1767 goto error;
1768 }
1769 index->stream_id = htobe64(index->stream_id);
1770
1771 error:
1772 return ret;
1773 }
1774
1775 /*
1776 * Write up to one packet from the metadata cache to the channel.
1777 *
1778 * Returns the number of bytes pushed in the cache, or a negative value
1779 * on error.
1780 */
1781 static
1782 int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
1783 {
1784 ssize_t write_len;
1785 int ret;
1786
1787 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
1788 if (stream->chan->metadata_cache->contiguous
1789 == stream->ust_metadata_pushed) {
1790 ret = 0;
1791 goto end;
1792 }
1793
1794 write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
1795 &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
1796 stream->chan->metadata_cache->contiguous
1797 - stream->ust_metadata_pushed);
1798 assert(write_len != 0);
1799 if (write_len < 0) {
1800 ERR("Writing one metadata packet");
1801 ret = -1;
1802 goto end;
1803 }
1804 stream->ust_metadata_pushed += write_len;
1805
1806 assert(stream->chan->metadata_cache->contiguous >=
1807 stream->ust_metadata_pushed);
1808 ret = write_len;
1809
1810 end:
1811 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
1812 return ret;
1813 }
1814
1815
1816 /*
1817 * Sync metadata meaning request them to the session daemon and snapshot to the
1818 * metadata thread can consumer them.
1819 *
1820 * Metadata stream lock MUST be acquired.
1821 *
1822 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
1823 * is empty or a negative value on error.
1824 */
1825 int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
1826 struct lttng_consumer_stream *metadata)
1827 {
1828 int ret;
1829 int retry = 0;
1830
1831 assert(ctx);
1832 assert(metadata);
1833
1834 /*
1835 * Request metadata from the sessiond, but don't wait for the flush
1836 * because we locked the metadata thread.
1837 */
1838 ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
1839 if (ret < 0) {
1840 goto end;
1841 }
1842
1843 ret = commit_one_metadata_packet(metadata);
1844 if (ret <= 0) {
1845 goto end;
1846 } else if (ret > 0) {
1847 retry = 1;
1848 }
1849
1850 ustctl_flush_buffer(metadata->ustream, 1);
1851 ret = ustctl_snapshot(metadata->ustream);
1852 if (ret < 0) {
1853 if (errno != EAGAIN) {
1854 ERR("Sync metadata, taking UST snapshot");
1855 goto end;
1856 }
1857 DBG("No new metadata when syncing them.");
1858 /* No new metadata, exit. */
1859 ret = ENODATA;
1860 goto end;
1861 }
1862
1863 /*
1864 * After this flush, we still need to extract metadata.
1865 */
1866 if (retry) {
1867 ret = EAGAIN;
1868 }
1869
1870 end:
1871 return ret;
1872 }
1873
1874 /*
1875 * Return 0 on success else a negative value.
1876 */
1877 static int notify_if_more_data(struct lttng_consumer_stream *stream,
1878 struct lttng_consumer_local_data *ctx)
1879 {
1880 int ret;
1881 struct ustctl_consumer_stream *ustream;
1882
1883 assert(stream);
1884 assert(ctx);
1885
1886 ustream = stream->ustream;
1887
1888 /*
1889 * First, we are going to check if there is a new subbuffer available
1890 * before reading the stream wait_fd.
1891 */
1892 /* Get the next subbuffer */
1893 ret = ustctl_get_next_subbuf(ustream);
1894 if (ret) {
1895 /* No more data found, flag the stream. */
1896 stream->has_data = 0;
1897 ret = 0;
1898 goto end;
1899 }
1900
1901 ret = ustctl_put_subbuf(ustream);
1902 assert(!ret);
1903
1904 /* This stream still has data. Flag it and wake up the data thread. */
1905 stream->has_data = 1;
1906
1907 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
1908 ssize_t writelen;
1909
1910 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
1911 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
1912 ret = writelen;
1913 goto end;
1914 }
1915
1916 /* The wake up pipe has been notified. */
1917 ctx->has_wakeup = 1;
1918 }
1919 ret = 0;
1920
1921 end:
1922 return ret;
1923 }
1924
1925 /*
1926 * Read subbuffer from the given stream.
1927 *
1928 * Stream lock MUST be acquired.
1929 *
1930 * Return 0 on success else a negative value.
1931 */
1932 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
1933 struct lttng_consumer_local_data *ctx)
1934 {
1935 unsigned long len, subbuf_size, padding;
1936 int err, write_index = 1;
1937 long ret = 0;
1938 struct ustctl_consumer_stream *ustream;
1939 struct ctf_packet_index index;
1940
1941 assert(stream);
1942 assert(stream->ustream);
1943 assert(ctx);
1944
1945 DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
1946 stream->name);
1947
1948 /* Ease our life for what's next. */
1949 ustream = stream->ustream;
1950
1951 /*
1952 * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
1953 * error if we cannot read this one byte (read returns 0), or if the error
1954 * is EAGAIN or EWOULDBLOCK.
1955 *
1956 * This is only done when the stream is monitored by a thread, before the
1957 * flush is done after a hangup and if the stream is not flagged with data
1958 * since there might be nothing to consume in the wait fd but still have
1959 * data available flagged by the consumer wake up pipe.
1960 */
1961 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
1962 char dummy;
1963 ssize_t readlen;
1964
1965 readlen = lttng_read(stream->wait_fd, &dummy, 1);
1966 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
1967 ret = readlen;
1968 goto end;
1969 }
1970 }
1971
1972 retry:
1973 /* Get the next subbuffer */
1974 err = ustctl_get_next_subbuf(ustream);
1975 if (err != 0) {
1976 /*
1977 * Populate metadata info if the existing info has
1978 * already been read.
1979 */
1980 if (stream->metadata_flag) {
1981 ret = commit_one_metadata_packet(stream);
1982 if (ret <= 0) {
1983 goto end;
1984 }
1985 ustctl_flush_buffer(stream->ustream, 1);
1986 goto retry;
1987 }
1988
1989 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
1990 /*
1991 * This is a debug message even for single-threaded consumer,
1992 * because poll() have more relaxed criterions than get subbuf,
1993 * so get_subbuf may fail for short race windows where poll()
1994 * would issue wakeups.
1995 */
1996 DBG("Reserving sub buffer failed (everything is normal, "
1997 "it is due to concurrency) [ret: %d]", err);
1998 goto end;
1999 }
2000 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
2001
2002 if (!stream->metadata_flag) {
2003 index.offset = htobe64(stream->out_fd_offset);
2004 ret = get_index_values(&index, ustream);
2005 if (ret < 0) {
2006 goto end;
2007 }
2008 } else {
2009 write_index = 0;
2010 }
2011
2012 /* Get the full padded subbuffer size */
2013 err = ustctl_get_padded_subbuf_size(ustream, &len);
2014 assert(err == 0);
2015
2016 /* Get subbuffer data size (without padding) */
2017 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
2018 assert(err == 0);
2019
2020 /* Make sure we don't get a subbuffer size bigger than the padded */
2021 assert(len >= subbuf_size);
2022
2023 padding = len - subbuf_size;
2024 /* write the subbuffer to the tracefile */
2025 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
2026 /*
2027 * The mmap operation should write subbuf_size amount of data when network
2028 * streaming or the full padding (len) size when we are _not_ streaming.
2029 */
2030 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
2031 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
2032 /*
2033 * Display the error but continue processing to try to release the
2034 * subbuffer. This is a DBG statement since any unexpected kill or
2035 * signal, the application gets unregistered, relayd gets closed or
2036 * anything that affects the buffer lifetime will trigger this error.
2037 * So, for the sake of the user, don't print this error since it can
2038 * happen and it is OK with the code flow.
2039 */
2040 DBG("Error writing to tracefile "
2041 "(ret: %ld != len: %lu != subbuf_size: %lu)",
2042 ret, len, subbuf_size);
2043 write_index = 0;
2044 }
2045 err = ustctl_put_next_subbuf(ustream);
2046 assert(err == 0);
2047
2048 /*
2049 * This will consumer the byte on the wait_fd if and only if there is not
2050 * next subbuffer to be acquired.
2051 */
2052 if (!stream->metadata_flag) {
2053 ret = notify_if_more_data(stream, ctx);
2054 if (ret < 0) {
2055 goto end;
2056 }
2057 }
2058
2059 /* Write index if needed. */
2060 if (!write_index) {
2061 goto end;
2062 }
2063
2064 if (stream->chan->live_timer_interval && !stream->metadata_flag) {
2065 /*
2066 * In live, block until all the metadata is sent.
2067 */
2068 err = consumer_stream_sync_metadata(ctx, stream->session_id);
2069 if (err < 0) {
2070 goto end;
2071 }
2072 }
2073
2074 assert(!stream->metadata_flag);
2075 err = consumer_stream_write_index(stream, &index);
2076 if (err < 0) {
2077 goto end;
2078 }
2079
2080 end:
2081 return ret;
2082 }
2083
2084 /*
2085 * Called when a stream is created.
2086 *
2087 * Return 0 on success or else a negative value.
2088 */
2089 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
2090 {
2091 int ret;
2092
2093 assert(stream);
2094
2095 /* Don't create anything if this is set for streaming. */
2096 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
2097 ret = utils_create_stream_file(stream->chan->pathname, stream->name,
2098 stream->chan->tracefile_size, stream->tracefile_count_current,
2099 stream->uid, stream->gid, NULL);
2100 if (ret < 0) {
2101 goto error;
2102 }
2103 stream->out_fd = ret;
2104 stream->tracefile_size_current = 0;
2105
2106 if (!stream->metadata_flag) {
2107 ret = index_create_file(stream->chan->pathname,
2108 stream->name, stream->uid, stream->gid,
2109 stream->chan->tracefile_size,
2110 stream->tracefile_count_current);
2111 if (ret < 0) {
2112 goto error;
2113 }
2114 stream->index_fd = ret;
2115 }
2116 }
2117 ret = 0;
2118
2119 error:
2120 return ret;
2121 }
2122
2123 /*
2124 * Check if data is still being extracted from the buffers for a specific
2125 * stream. Consumer data lock MUST be acquired before calling this function
2126 * and the stream lock.
2127 *
2128 * Return 1 if the traced data are still getting read else 0 meaning that the
2129 * data is available for trace viewer reading.
2130 */
2131 int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
2132 {
2133 int ret;
2134
2135 assert(stream);
2136 assert(stream->ustream);
2137
2138 DBG("UST consumer checking data pending");
2139
2140 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
2141 ret = 0;
2142 goto end;
2143 }
2144
2145 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
2146 uint64_t contiguous, pushed;
2147
2148 /* Ease our life a bit. */
2149 contiguous = stream->chan->metadata_cache->contiguous;
2150 pushed = stream->ust_metadata_pushed;
2151
2152 /*
2153 * We can simply check whether all contiguously available data
2154 * has been pushed to the ring buffer, since the push operation
2155 * is performed within get_next_subbuf(), and because both
2156 * get_next_subbuf() and put_next_subbuf() are issued atomically
2157 * thanks to the stream lock within
2158 * lttng_ustconsumer_read_subbuffer(). This basically means that
2159 * whetnever ust_metadata_pushed is incremented, the associated
2160 * metadata has been consumed from the metadata stream.
2161 */
2162 DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
2163 contiguous, pushed);
2164 assert(((int64_t) (contiguous - pushed)) >= 0);
2165 if ((contiguous != pushed) ||
2166 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
2167 ret = 1; /* Data is pending */
2168 goto end;
2169 }
2170 } else {
2171 ret = ustctl_get_next_subbuf(stream->ustream);
2172 if (ret == 0) {
2173 /*
2174 * There is still data so let's put back this
2175 * subbuffer.
2176 */
2177 ret = ustctl_put_subbuf(stream->ustream);
2178 assert(ret == 0);
2179 ret = 1; /* Data is pending */
2180 goto end;
2181 }
2182 }
2183
2184 /* Data is NOT pending so ready to be read. */
2185 ret = 0;
2186
2187 end:
2188 return ret;
2189 }
2190
2191 /*
2192 * Stop a given metadata channel timer if enabled and close the wait fd which
2193 * is the poll pipe of the metadata stream.
2194 *
2195 * This MUST be called with the metadata channel acquired.
2196 */
2197 void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
2198 {
2199 int ret;
2200
2201 assert(metadata);
2202 assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
2203
2204 DBG("Closing metadata channel key %" PRIu64, metadata->key);
2205
2206 if (metadata->switch_timer_enabled == 1) {
2207 consumer_timer_switch_stop(metadata);
2208 }
2209
2210 if (!metadata->metadata_stream) {
2211 goto end;
2212 }
2213
2214 /*
2215 * Closing write side so the thread monitoring the stream wakes up if any
2216 * and clean the metadata stream.
2217 */
2218 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
2219 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
2220 if (ret < 0) {
2221 PERROR("closing metadata pipe write side");
2222 }
2223 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
2224 }
2225
2226 end:
2227 return;
2228 }
2229
2230 /*
2231 * Close every metadata stream wait fd of the metadata hash table. This
2232 * function MUST be used very carefully so not to run into a race between the
2233 * metadata thread handling streams and this function closing their wait fd.
2234 *
2235 * For UST, this is used when the session daemon hangs up. Its the metadata
2236 * producer so calling this is safe because we are assured that no state change
2237 * can occur in the metadata thread for the streams in the hash table.
2238 */
2239 void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
2240 {
2241 struct lttng_ht_iter iter;
2242 struct lttng_consumer_stream *stream;
2243
2244 assert(metadata_ht);
2245 assert(metadata_ht->ht);
2246
2247 DBG("UST consumer closing all metadata streams");
2248
2249 rcu_read_lock();
2250 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
2251 node.node) {
2252
2253 health_code_update();
2254
2255 pthread_mutex_lock(&stream->chan->lock);
2256 lttng_ustconsumer_close_metadata(stream->chan);
2257 pthread_mutex_unlock(&stream->chan->lock);
2258
2259 }
2260 rcu_read_unlock();
2261 }
2262
2263 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
2264 {
2265 int ret;
2266
2267 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
2268 if (ret < 0) {
2269 ERR("Unable to close wakeup fd");
2270 }
2271 }
2272
2273 /*
2274 * Please refer to consumer-timer.c before adding any lock within this
2275 * function or any of its callees. Timers have a very strict locking
2276 * semantic with respect to teardown. Failure to respect this semantic
2277 * introduces deadlocks.
2278 */
2279 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
2280 struct lttng_consumer_channel *channel, int timer, int wait)
2281 {
2282 struct lttcomm_metadata_request_msg request;
2283 struct lttcomm_consumer_msg msg;
2284 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
2285 uint64_t len, key, offset;
2286 int ret;
2287
2288 assert(channel);
2289 assert(channel->metadata_cache);
2290
2291 memset(&request, 0, sizeof(request));
2292
2293 /* send the metadata request to sessiond */
2294 switch (consumer_data.type) {
2295 case LTTNG_CONSUMER64_UST:
2296 request.bits_per_long = 64;
2297 break;
2298 case LTTNG_CONSUMER32_UST:
2299 request.bits_per_long = 32;
2300 break;
2301 default:
2302 request.bits_per_long = 0;
2303 break;
2304 }
2305
2306 request.session_id = channel->session_id;
2307 request.session_id_per_pid = channel->session_id_per_pid;
2308 /*
2309 * Request the application UID here so the metadata of that application can
2310 * be sent back. The channel UID corresponds to the user UID of the session
2311 * used for the rights on the stream file(s).
2312 */
2313 request.uid = channel->ust_app_uid;
2314 request.key = channel->key;
2315
2316 DBG("Sending metadata request to sessiond, session id %" PRIu64
2317 ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64,
2318 request.session_id, request.session_id_per_pid, request.uid,
2319 request.key);
2320
2321 pthread_mutex_lock(&ctx->metadata_socket_lock);
2322
2323 health_code_update();
2324
2325 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
2326 sizeof(request));
2327 if (ret < 0) {
2328 ERR("Asking metadata to sessiond");
2329 goto end;
2330 }
2331
2332 health_code_update();
2333
2334 /* Receive the metadata from sessiond */
2335 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
2336 sizeof(msg));
2337 if (ret != sizeof(msg)) {
2338 DBG("Consumer received unexpected message size %d (expects %zu)",
2339 ret, sizeof(msg));
2340 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
2341 /*
2342 * The ret value might 0 meaning an orderly shutdown but this is ok
2343 * since the caller handles this.
2344 */
2345 goto end;
2346 }
2347
2348 health_code_update();
2349
2350 if (msg.cmd_type == LTTNG_ERR_UND) {
2351 /* No registry found */
2352 (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
2353 ret_code);
2354 ret = 0;
2355 goto end;
2356 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
2357 ERR("Unexpected cmd_type received %d", msg.cmd_type);
2358 ret = -1;
2359 goto end;
2360 }
2361
2362 len = msg.u.push_metadata.len;
2363 key = msg.u.push_metadata.key;
2364 offset = msg.u.push_metadata.target_offset;
2365
2366 assert(key == channel->key);
2367 if (len == 0) {
2368 DBG("No new metadata to receive for key %" PRIu64, key);
2369 }
2370
2371 health_code_update();
2372
2373 /* Tell session daemon we are ready to receive the metadata. */
2374 ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
2375 LTTCOMM_CONSUMERD_SUCCESS);
2376 if (ret < 0 || len == 0) {
2377 /*
2378 * Somehow, the session daemon is not responding anymore or there is
2379 * nothing to receive.
2380 */
2381 goto end;
2382 }
2383
2384 health_code_update();
2385
2386 ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
2387 key, offset, len, channel, timer, wait);
2388 if (ret >= 0) {
2389 /*
2390 * Only send the status msg if the sessiond is alive meaning a positive
2391 * ret code.
2392 */
2393 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
2394 }
2395 ret = 0;
2396
2397 end:
2398 health_code_update();
2399
2400 pthread_mutex_unlock(&ctx->metadata_socket_lock);
2401 return ret;
2402 }
2403
2404 /*
2405 * Return the ustctl call for the get stream id.
2406 */
2407 int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
2408 uint64_t *stream_id)
2409 {
2410 assert(stream);
2411 assert(stream_id);
2412
2413 return ustctl_get_stream_id(stream->ustream, stream_id);
2414 }
This page took 0.076539 seconds and 5 git commands to generate.